Running Jobs

Introduction

Jobs can be run in two ways:

  1. When a timer fires. The top-level job associated with the timer is is executed. If the job is a box then its children are executed recursively.
  2. On demand. A job at any level in the tree can be executed on request. If the job is a box then its children are executed recursively.

jobdispatcher's role in executing a job is only to start a new process to execute the sub-tree. A check is made to ensure that any descendant (or if a manual run of some child ancestor jobs also) are not already running so that the same (or related) jobs cannot be started more than once.

When this process terminates it invokes the jobComplete service on jobdispatcher. Depending on the exit status passed back and whether the job should be rescheduled if it reports unsuccessful, jobdispatcher may restart the timer.

Timer Execution

Timers and their properties are covered in petstore. Here is the code that Job Control has to start the timers that run each top level job:

local function startTimer(any Job, any jobTimer, any userInfo)
{
  // Run all timers at FIXED_DELAY and with no repeat period.
  // This means that the timer will run at its nextRuns time
  // once only. The job dispatcher process calls the timer
  // expression to set up each new run.
  jobTimer.properties.fixedOrDelay = FIXED_DELAY;
  jobTimer.properties.period       = null;
  
  jobTimer.properties.userInfo     = userInfo; 
  
  // All timers call the job dispatcher function when they go off
  jobTimer.properties.func = func f = call jobDispatcher(fromTimer);
    
  loginfo($catalog.jobcontrol.logger, "Starting timer for job {0} : runs at {1}",
                                      Job.ShortName, jobTimer.properties.nextRuns);
  starttimer(jobTimer);
}

Job Startup

When a timer fires the stack is initialised with the timer itself as the argument fromTimer and its func property is executed. We can see this calls the local function jobDispatcher. Here is the code for that function:

/**
 * Job Control Dispatcher.
 *
 * Called when a job's timer fires. This function is set as the "func"
 * property of the timers started by initialise().
 *
 * Note: A timer is an invocation of a process's implicit event loop.
 *
 */
local function jobDispatcher(any fromTimer)
{
  try
  {
    // The userInfo property carried in the timer is the container of
    // the Job (and timer) that was set inside the timer when the job tree
    // structure was built.
    any m = fromTimer.properties.userInfo;

    // Note - the map "m" is a member of the job tree structure built
    // in this (the jobdispatcher) process. Containers of any sort are
    // not thread-safe and we don't share them between processes.
    // Create a simple map to hold the Job and JobVolatile we send
    // to the new process running the job.
    // TODO - Could do this when the userInfo is established?
    //        Would that be clearer?
    any jobInfo.Job = m.Job;
    any jobInfo.JobVolatile = m.JobVolatile;
    logfine($catalog.jobcontrol.logger, "Timer has fired for job {0}", jobInfo.Job.ShortName);


    // If this (top-level) job or child are currently running (because
    // a job has been run manually) then we can't start the job.
    // In this case, throw an exception. The catch block will try
    // to perform job completion (to restart the timer) and log the event.
    if (call parentNotIdle(m.Job) ||
        call childNotIdle(jobInfo = m))
      throw("Dispatcher - Tree Not Idle");

    // Start a process to run the job. Note that if the spawn function
    // throws then the process was not started (though its start expression
    // may have been run).
    logfine($catalog.jobcontrol.logger, "Starting process to run job {0}", jobInfo.Job.ShortName);
    any process = spawn(jobInfo.Job.ShortName, type  = PROCESS_CHILD,
                                               start = call onJobStart(jobInfo,
                                                                       immediate=false),
                                               end   = call onJobExit());
  }
  catch
  {
    loginfo($catalog.jobcontrol.logger, "Caught exception while starting job {0}\n{1}\n{2}",
                                        jobInfo.Job.ShortName,
                                        @exception,
                                        @stackTrace);

    // If process was not started perform job completion here
    call jobComplete(jobInfo.Job, exitStatus = 1, immediate=false);

    // rethrow for logging purposes
    throw();
  }
}

Some exception handling ensures that the job completion code runs even if the process to execute the job is not started. Otherwise completion is handled when the process terminates, via its end call of onJobExit().

The common pattern for using spawn() is to invoke a service in its start argument that performs initialisation of the new process in its own thread and node space [any initialisation required in the calling thread can be performed before sending such a request]. Here is onJobStart()

/**
 * This function is called when the process to run the timer's FunctionExpr
 * is started.
 * 
 * The "spawn" primitive (see above) executes the "start" expression
 * in the caller's process (thread) but with $this returning the root of
 * the new process's node space. All other paths (in particular $process)
 * refer to the calling process's environment.
 *
 * The new process starts in its own thread after the start expression
 * completes however its input channel is already established, so we can
 * send it the service request to run the job proper.
 *
 * Whether it is formally declared or not, a "start" expression
 * can access $stack.ichannel. Similarly, the process node space can also be
 * accessed via $stack.process.
 */
local function onJobStart(any process, any ichannel, any jobInfo, boolean immediate)
{
  // Put the job dispatcher's (that is our) process's input channel in the
  // new process's node space so it can send the jobComplete service request
  // back to the job dispatcher when the process exits.
  any process.jobDispatcher = $process.ichannel;

  // This is required by Job:<mutator>
  any process.loginName = "jobdispatcher";

  // Send the runJob service request through to the new process
  send runJob(@channel = ichannel, jobInfo, immediate);
}

The arguments process and ichannel are provided by Inq. ichannel is the new process's input channel. It can be used as an argument to send to specify the channel to which the service request will be directed. process is the $process node space when the new process runs. As we can see, onJobStart does both these things.

Additional arguments can be passed as required from the spawn usage. Here we have supplied jobInfo (the container holding the Job, JobVolatile and timer) and a boolean immediate which states whether the execution is via the timer (false) or on demand (true)

Job Execution

The runJob service is how the Job is executed in the new process. This service notes down the input parameters and other state before calling the local function runJob:

/**
 * When a job's timer goes off a process is created on which this
 * service is invoked to run the job's FunctionExpr
 */
service runJob(any jobInfo, boolean immediate)
{
  try
  {
    loginfo($catalog.jobcontrol.logger, "Process for job {0} running...", jobInfo.Job.ShortName);
    
    // Place the Job[Volatile] instances at $process.Job[Volatile],
    // so that onJobExit can pick them up.
    any $process.Job         = jobInfo.Job;
    any $process.JobVolatile = jobInfo.JobVolatile;
    
    // Note down the process running this job. Then we can kill it
    // later.
    call setProcessId($process.id, jobInfo.JobVolatile);
    
    // Remember whether this was a timer or manual invocation
    any $process.immediate = immediate;

    // Process the given box or task.
    any exitStatus = call runJob(jobInfo, immediate);

  }
  catch
  {
    // Ensure an exit status. If an exception occurred inside
    // runJob then we won't have one
    logsevere($catalog.jobcontrol.logger, "Exception {0}\n{1}",
                                          @exception,
                                          @stackTrace);
    int exitStatus = 1;
  }
  finally
  {
    // A process has an event loop and will return to wait at its
    // input channel unless it is killed by another process or volunteers
    // to exit, as here. The exit status is placed at $process.status
    // and any "end" function specified when the process was spawned
    // is called just prior to termination.
    // If we don't use a "finally" block then if an exception occurs
    // while executing the job the process would not terminate.
    exit(exitStatus);
  }
}

A process terminates by invoking the Inq exit() function. When it does so, any end expression specified when it was started is executed. We look at this later.

The runJob() function accepts a Job and recurses through nested boxes, executing any tasks it finds. Rather than reproduce the function here, its use of transactions, which end up being nested because of the recursion, is best illustrated by the following diagram:

runjob

Each successive Job (whether box or task) is joined in a transaction opened at its recursion depth. When a task is processed its FunctionExpr is executed in a further transaction. This use of transactions means:

  • Tasks (which are external to Job Control) are executed in their own transaction, isolating them from Job Control and each other.
  • Events relating to a given Job are emitted as that job completes and not delayed until the whole sub-tree has been processed.

The code to do this is straightforward enough and can be seen at local function runJob(any jobInfo, boolean immediate) in jobControl.inq. We examine this function further below when discussing execution flow and transaction handling when a process running a sub-tree is killed.

On Demand Execution

A Job can be executed at any time by calling runJobNow(any Job). The argument can be any job, not just a top level one but, as for timed execution, there must not already be a job running within the Job's particular sub-tree.

Unlike a timed execution, which respects timed boxes, executing a job on demand executes its children immediately in sequence according to their JobOrder.

Killing a Job

When a job is running it can be terminated prematurely by killing the process that is hosting it. In Job Control this is performed by the function killJob(any Job).

Killing a process (with the Inq kill() function) is an abrupt way of terminating a process. Normally either a process will terminate itself via the Inq exit() function or can be sent a service request by another process requesting it to do so. However if a job is somehow suspended, perhaps because it is waiting on some external resource that never becomes available or it has run system commands outside of the Inq environment that are misbehaving, then killing the process may be the only option.

When a process is killed there are semantics of transaction handling and execution flow that are important to understand. While not specific to Job Control, its use of recursion and nested transactions make it a useful example to use in explaining these. First the theory.

Execution Flow

When a process is killed execution passes as soon as possible to the nearest finally block. If there is one at the kill point then that is the nearest, otherwise intervening stack frames are skipped. What does as soon as possible mean?

The expression of the current statement will be evaluated. However an expression is a very granular level within the Inq language. For example, the statement

  a = b + c;

contains three expressions: the references to a, b and c. If the kill happens while b is being evaluated then the statement is abandoned as Inq starts to evaluate c. Hence, in general it is not possible to say whether a statement will have completed.

The finally block will execute to completion (provided it itself does not generate an exception). No catch blocks are executed. Control then passes to the next nearest finally block above, missing out any stack frames that do not contain one. Each finally block at successively higher levels above the kill point is executed.

Transactions

If a particular finally block is that of a transaction then that transaction will abort at the end of the block. Generally this action is appropriate when killing a process but as the block is entered its transaction still contains the state it had immediately prior to the kill.

Execution in the finally block provides the opportunity to perform any cleanup while the process is being killed, including committing the current transaction explicitly with the Inq commit() function. If required, a process can determine whether it is being killed by checking its killed property at $process.properties.killed. This property is true if a kill is in progress, false otherwise.

Kill and Job Control

How does Job Control manage itself when a job process is killed?

Tasks

As a job tree is executed, tasks are processed by the function processTask(any Job). This function opens a transaction within which the Job's FunctionExpr is executed. This isolates any transaction concerns the job has from Job Control itself. Here is the finally block it uses:

finally
{
  // Were we killed? If so signal with a special exit status
  // Note - if a process is killed then catch blocks
  // are not executed, but finally blocks are. Other than
  // that no code is executed apart from a spawned process's
  // "end" expression.
  
  // Note - although finally {} blocks do execute when a
  // process is killed (catch {} blocks do not) the
  // transaction is *not* committed implicitly.
  // This transaction is for the job itself (that is
  // external to jobcontrol). It is allowed to abort. 

  if ($process.properties.killed)
    exitStatus = 255;
  else
    exitStatus = 1;

  call closeLogFile();
  call pruneLogfiles(Job);
  
  if (curStrm)
    any $process.system.out = curStrm;
  
  exitStatus;
}

This code cleans up the job's log files (covered further below) and checks whether the process is being killed. If so a specific exit status is returned.

Note
The last statement in the finally block is exitStatus; making this the return value of the function.

It is important that this statement is inside the block and not outside. As soon as the block is closed execution passes out of the function and into the next nearest finally, as if by exception.

Job Control

processTask() is called from runJob() and the transaction at this level is used to encapsulate field changes to the current Job in the tree, recording its LastRan and ExitStatus. Here is its finally block:

finally
{
  // Clean up

  // If the process is killed then finally blocks at and above the kill
  // point will be executed immediately. If a finally block is associated
  // with a transaction then it is aborted at the end of the finally block.
  
  if (childExitStatus && childExitStatus == 255)
  {
    exitStatus = childExitStatus;
    
    logfine($catalog.jobcontrol.logger,
            "Stopping at {0} because process has been killed",
            jobInfo.Job.ShortName);
  }

  call setStartTime(jobInfo.JobVolatile, Started=null);

  jobInfo.Job.ExitStatus = exitStatus;
  call calcDuration(jobInfo.Job);
  
  // Explicit commit in case we were killed. We want the ExitStatus
  // and LastRan fields saved regardless.
  commit();
  
  // Return the exit status. This statement must be inside the finally
  // block or (if the process is being killed) it will not execute.
  // When a process is being killed execution passes immediately to
  // the next finally block up the stack.
  exitStatus;
}

This code checks for the exit status indicating a kill (first checking that childExitStatus is defined at all in case the kill happened at this level), performs some other house keeping and then explicitly commits the transaction. This processing will continue in any higher levels of runJob there are on the call stack.

By committing the transaction Job Control ensures the Job instance audit fields are correctly persisted and not discarded, in particular leaving the ExitStatus unassigned (and probably zero).

Capturing Job Output

When a task is executed (in processTask()) Job Control opens file which the Job's FunctionExpr can use for standard output or error:

  // Set up the output stream
  any outStream = call openLogFile(Job);
  any $process.system.out = outStream;

Here are the related functions for interest. You can find them in jobControl.inq:

/**
 * Opens a log file for the given job.
 * Returns an open print stream.
 */
local function openLogFile(any Job)
{
  any filename = call generateLogfileName(Job);

  file logFile = $catalog.jobcontrol.properties.logroot + filename;

  ioPrint p;

  open(p, logFile, OPEN_WRITE);

  p;
}
/**
 * Create a name for the Job's log file.
 *
 * If a top-level job defines logging then a file name is generated
 * of the form:
 *    SomeJob_20081218_050000.log
 */
local function generateLogfileName(any Job)
{
  // File name is composed from the job's short name (spaces are
  // removed) and the start time to the granularity of 1 second.

  any filename = call rootLogfileName(Job);

  // Pattern is Java MessageFormat
  renderf("{0}_{1,date,yyyyMMdd_HHmmss}.log", filename, getdate());
}
local function rootLogfileName(any Job)
{
  // Create root string by stripping space characters from Job.ShortName
  // After all, who really wants spaces in file names?
  any filename = gsub("", " ", Job.ShortName);
}