Source code for reana_job_controller.job_manager
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2019, 2020, 2021 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Job Manager."""
import json
from reana_commons.utils import calculate_file_access_time
from reana_db.database import Session
from reana_db.models import Job as JobTable, JobCache, JobStatus, Workflow
from reana_job_controller.config import CACHE_ENABLED
[docs]class JobManager:
"""Job management interface."""
def __init__(
self,
docker_img="",
cmd=[],
prettified_cmd="",
env_vars={},
workflow_uuid=None,
workflow_workspace=None,
job_name=None,
):
"""Instanciates basic job.
:param docker_img: Docker image.
:type docker_img: str
:param cmd: Command to execute.
:type cmd: list
:param prettified_cmd: pretified version of command to execute.
:type prettified_cmd: str
:param env_vars: Environment variables.
:type env_vars: dict
:param workflow_uuid: Unique workflow id.
:type workflow_uuid: str
:param workflow_workspace: Absolute path to workspace
:type workflow_workspace: str
:param job_name: Name of the job.
:type job_name: str
"""
self.docker_img = docker_img or ""
self.cmd = cmd
self.prettified_cmd = prettified_cmd
self.workflow_uuid = workflow_uuid
self.workflow_workspace = workflow_workspace
self.job_name = job_name
self.env_vars = self._extend_env_vars(env_vars)
[docs] def execution_hook(fn):
"""Add before execution hooks and DB operations."""
def wrapper(inst, *args, **kwargs):
inst.before_execution()
backend_job_id = fn(inst, *args, **kwargs)
inst.create_job_in_db(backend_job_id)
if CACHE_ENABLED:
inst.cache_job()
return backend_job_id
return wrapper
[docs] def before_execution(self):
"""Before job submission hook."""
pass
[docs] def after_execution(self):
"""After job submission hook."""
pass
@execution_hook
def execute(self):
"""Execute a job.
:returns: Job ID.
:rtype: str
"""
raise NotImplementedError
[docs] def get_status(self):
"""Get job status.
:returns: job status.
:rtype: str
"""
raise NotImplementedError
[docs] @classmethod
def get_logs(cls, backend_job_id, **kwargs):
"""Return job logs if log files are present.
:param backend_job_id: ID of the job in the backend.
:param kwargs: Additional parameters needed to fetch logs.
These depend on the chosen compute backend.
:return: String containing the job logs.
"""
raise NotImplementedError
[docs] def stop(self):
"""Stop a job."""
raise NotImplementedError
[docs] def create_job_in_db(self, backend_job_id):
"""Create job in db."""
job_db_entry = JobTable(
backend_job_id=backend_job_id,
workflow_uuid=self.workflow_uuid,
status=JobStatus.created,
compute_backend=self.compute_backend,
cvmfs_mounts=self.cvmfs_mounts or "",
shared_file_system=self.shared_file_system or False,
docker_img=self.docker_img,
cmd=json.dumps(self.cmd),
env_vars=json.dumps(self.env_vars),
deleted=False,
job_name=self.job_name,
prettified_cmd=self.prettified_cmd,
)
Session.add(job_db_entry)
Session.commit()
self.job_id = str(job_db_entry.id_)
[docs] def cache_job(self):
"""Cache a job."""
workflow = (
Session.query(Workflow).filter_by(id_=self.workflow_uuid).one_or_none()
)
access_times = calculate_file_access_time(workflow.workspace_path)
prepared_job_cache = JobCache()
prepared_job_cache.job_id = self.job_id
prepared_job_cache.access_times = access_times
Session.add(prepared_job_cache)
Session.commit()
[docs] def update_job_status(self):
"""Update job status in DB."""
pass
def _extend_env_vars(self, env_vars):
"""Extend environment variables with REANA specific ones."""
prefix = "REANA"
env_vars[prefix + "_WORKSPACE"] = self.workflow_workspace
env_vars[prefix + "_WORKFLOW_UUID"] = str(self.workflow_uuid)
return env_vars