How Job Control Works

Introduction

In this section we examine how Job Control is implemented. In addition to the things we have seen so far in the other examples, Job Control demonstrates the following:

  • Running a short-lived process and picking up its exit status
  • Killing a process
  • Using conditional lock/wait/notify concurrency constructs
  • More complex Inq event processing, including handling events as a batch
  • Using nested transactions to isolate pieces of work
  • Managing correct code flow with try/catch/finally
  • Storing blobs in a database column
  • Miscellaneous stream and file handling

Overview

The jobdispatcher Process

Job Control starts a detached process called jobdispatcher. This process is responsible for

  1. Managing the job tree timers. Each top-level job requires a timer to schedule it. This timer is either defined by the job itself or is the earliest of any defined by the job's children.
  2. Reacting to changes in the job tree as Job instances are created, mutated or destroyed.
  3. Starting a process to run a job tree (or sub-tree) and managing its termination.

Job Control is running when the jobdispatcher process is initialised and awaiting events.

Typedefs

Job control defines two typedefs. The Job type is bound to the database while JobVolatile contains data that is only relevant when Job Control is running, so it is in-memory only.

Miscellaneous

Like petstore, Job Control has a script file defs.inq to declare useful typedef aliases. As well as this, Job Control includes the file common.inq. This file contains script that is used by both the GUI client and the server, so it is parsed by both.

Note
In the Inq environment, typedef metadata is sent to the client during the login process. Typedefs themselves are never parsed by the client environment.

Inq Script Inventory

jobBoot.inq - a file that #includes all the server-side functionality, providing the single point to boot Job Control into a server.

defs.inq - typedef aliases and enumerations used elsewhere.

common.inq - script used both by Job Control client and server.

Job.inq - the Job typedef.

JobVolatile.inq - the JobVolatile typedef.

jobControl.inq - all of the server-side functionality

testSetup.inq - establish some test jobs

gui/jcAdmin.inq - administrative client

The Job Type

We look the Job type to examine its implementation

Primary Key

There is nothing intrinsic to a Job instance that makes a natural primary key, so an integer field, called Job according to convention is used.

Blob Usage

A Job that has a non-null FunctionExpr is a task; otherwise it is a box that can have child boxes and tasks.

The way a Job specifies its execution time is to have a TimerExpr that returns a suitably initialised timer. The jobs established in testSetup.inq give some examples. Both these fields are of type blob

A blob requires a stream type to define the way the value will be stored. Valid streams are:

  • ioPrint stores the value as plain text
  • ioXML stores the value using Inq's native XML format
  • ioXMLX stores the value using Inq's configurable XML stream
  • ioByte stores the value as bytes

In this example of blob usage, the value is a string, so ioPrint is the appropriate stream type. If storing node structures then ioXML or ioXMLX would be used.

Constructor

Here is Job's construct statement with some note-worthy lines highlighted:

construct (
{
  // Validate the tree state to ensure the integrity of jobDispatcher
  call isTreeState(requiredState = enum(JobTreeState, USER_START));
  
  // Validate the fields we are expecting to be initialised already
  if (isnull($this.ShortName))
    throw ($catalog.{$root.i18n}.errortype.JOB_CREATE,
           $catalog.{$root.i18n}.errormsg.NULL_SHORTNAME);


  /*
  No function expression implies a box.
  */

  $this.Job           = call Uniq:getUniqueId(Name="JC");
  $this.ShortName    += " " + $this.Job;
  $this.ExitStatus    = 0;

  $this.LastUpdated   = getdate();
  $this.User          = $process.loginName;

  // Create associated JobVolatile
  create(new(JobVolatile, $this));

})

The call to isTreeState() relates to Job Control's use of Inq's locking and concurrency features. We look at this in detail later on.

The instance's primary key is initialised by call Uniq:getUniqueId(Name="JC"). Job.inq imports inq.util making Util a symbolic equivalent. The unique ID allocator discussed in transactions is implemented in the inq.util package and part of the Inq distribution.

Finally, the constructor is a good place to ensure any related instances are created. Remember that any instance flagged in a transaction for creation will not come into actual existence unless the transaction commits successfully.

There is a 1-to-1 relationship between Job and JobVolatile instances, so it always makes sense to create the corresponding JobVolatile here. Correspondingly, Job also has a destroy statement, where its JobVolatile is deleted.

In this example, the optional second argument to new() is included. This is the new instance's initial value, that is the instance of Job being created. Typedef instances implement assignment by assigning commonly named fields, so the result is to make JobVolatile.Job equal to Job.Job.

Join and Mutate

As a reminder, join and mutate relate to modifying an existing managed instance. The join statement is executed when an instance is first assigned to; mutate runs as the transaction in which the instance is joined commits.

When join runs the instance has not yet been modified. Job's join statement calls isTreeState(). Again, we look at this in more detail later. Just to say at this point that any process can mutate a Job instance but when jobdispatcher is running there is concurrency mediation between it and the modifying process. Using join allows Job Control to catch any problems early and abort the transaction if the requirements are not being obeyed.

Parent Link

A Job has a link to its parent. It defines the following field:

// The box this job is in, null if top-level.
Job           ParentJob;

The type of Parent is determined by resolving the reference Job. The result is Job.Job so Parent is (as you would expect) the same type as the primary key field, Job.

There is also a key defined so that all children of a given Job can be retrieved:

key ByParent
(
  fields(ParentJob)

  #include <{db}/Job.ByParent.sql>
)

The remainder of this discussion focuses on the functionality of Job Control, all of which is coded in the file jobControl.inq.

Startup

Starting Job Control is performed in the local function startup(). A node is placed in the global $catalog node space at $catalog.jobcontrol and this is used elsewhere in the code to verify whether Job Control is running. The following data is stored here:

$catalog.jobcontrol.logger         // Job Control's logger "inq.jobcontrol"
                   .jobTreeState   // A variable of type JobTreeState
                   .process        // The "jobdispatcher" process
                    

The jobdispatcher process is made available because, in many cases, when clients invoke Job Control functionality the work must be carried out by this process. To do that a service request must be sent to it, so there are various places where lines like this are found:

// Run the service in the jobDispatcher process and with its
// context as $root
send restartJob(@channel = $catalog.jobcontrol.process.ichannel,
                @context = "$root",
                Job);

Job Tree State

The variable at $catalog.jobcontrol.jobTreeState is used to manage cooperation between jobdispatcher and any other process using Job Control. Referring to defs.inq it is an enumeration whose symbolic values can be one of STARTUP, IDLE, USER_START, USER_END or DISPATCHER. We look at the various states and how cooperation is achieved later.

During initialisation the job tree state is STARTUP:

// Create the state flag for synchronisation between user processes
// and the dispatcher process. User processes cannot modify,
// create or delete Job instances while the dispatcher is manipulating
// the job tree.
// Note - default initial value is "S"tartup
any jobcontrol.jobTreeState = new(JobTreeState);

Job Dispatcher Process

After starting the jobdispatcher process the executing process instructs it to run the initialise service:

// Create the dispatcher process. We can't specify initialise() as
// the "start" function because that is run in the caller's thread
// and initialisation includes establishing event listeners in the
// dispatcher process
any jobcontrol.process = spawn("jobdispatcher",
                               type = PROCESS_DETACHED,
                               end  = call shutdown());

any $catalog.jobcontrol = jobcontrol;

send initialise(@channel = jobcontrol.process.ichannel);    

Initialisation is carried out in the local function initialise(). The first thing this function does is arbitrate to proceed by calling awaitTreeState():

local function initialise
{
  .
  .
  try
  {
  
    // Mutually exclusive with startup - see comment there
    call awaitTreeState(requiredState = enum(JobTreeState, STARTUP),
                        newState      = enum(JobTreeState, DISPATCHER));

This is a helper function that implements a blocking condition variable. We look at its implementation detail later; at this stage we can make do with stating what it does:

awaitTreeState
Wait until $catalog.jobcontrol.jobTreeState becomes equal to requiredState and when it does set it to newState.

As jobTreeState already has the value STARTUP jobdispatcher will not have to wait and it can establish the state DISPATCHER. Again, we look closer at these states and state transitions later.

Establishing Listeners

Job Control makes more extensive use of event listeners than we have seen so far in the petstore and its client. Here are the listeners it establishes:

  // Listen for new Job instances.
  any $this.listeners.newJobListener = listen ($catalog,
                                     func f = call newJobCreated(Job = @eventData),
                                     event = (create),
                                     typedef = Job);

  // Listen to the function used to modify Job instances.
  // The client may modify several instances at a time and we want one
  // event to handle all such processing, as opposed to the several update
  // events we could get.
  any $this.listeners.modJobListener1 = listen ($catalog,
                                      func f = call jobsModified(jobEvents = @eventData),
                                      event  = (complete),
                                      exec   = modifyJobs);

  any $this.listeners.modJobListener2 = listen ($catalog,
                                      func f = call jobsModified(jobEvents = @eventData),
                                      event  = (complete),
                                      exec   = modifyJob);

  // Listen to the root of the job tree for deletion events from within
  // By specifying the typedef we will only get events that relate to
  // automatic pruning of the node-set structure when Job instances are
  // deleted.
  any $this.listeners.deleteJobListener = listen (jobTree,
                                        func f = { call jobDeleted(@eventId, @eventData); },
                                        event = (remove),
                                        typedef = Job);

Create

The create listener is the same as we have seen before. We look at the processing that it performs when new Job instances are created later.

Update

The next two listen statements introduce a new listener type - one that is primed to fire when the functions modifyJobs or modifyJob are executed. Listeners like these deliver all create, update and delete events as a batch in the argument @eventData on the stack when the transaction that generated them commits. Put another way, when a process executes either of these functions its transaction will deliver any typedef events to listeners that specify those functions.

In our case the listening process is jobdispatcher and we want a batch because, as we see shortly, there is a hand-off between any client process and jobdispatcher driven by these events. This hand-off requires that one transaction in a client process causes one transaction in jobdispatcher. If a client modifies several jobs at a time (as the administrative GUI does) it is important that jobdispatcher likewise processes the events that arise in a single invocation of its event loop.

Delete

When Job instances are deleted a delete event is raised on its instance and Inq propagates this event through any event-live node structures. If the event passes through a node-set and the originating typedef is marked in the node-set root then the instance's node-set child is removed from the structure. Inq then raises a remove event on this child which itself propagates from its point of origin.

When deciding how to process deletions, an application design can consider whether to listen to typedef instance delete or node remove events. For Job Control the following issues arise:

  1. When a Job is deleted any children it has are deleted also (otherwise there would be orphaned jobs). This would generate a series of delete events. These could be batched by using a function listener as we do for update but events from children deleted as a consequence are of no interest in any case.
  2. While delete events are transactional (that is they are raised when the enclosing transaction commits), remove events are not - they are raised within the structure at the time they happen. Even if they arise as a consequence of a delete event they cannot be batched.
  3. When a node is removed from a structure, any events that may arise beneath it cannot propagate any further than the node that was removed. As the listen statement shows, the remove event is solicited from the jobTree root. If we arrange to delete jobs in order of their depth then we can arrange for only one event to arise, at least for jobs deleted within a given subtree.

Given that we want one event per transaction for jobdispatcher to pick up on, the compromise is that jobs are deleted per job tree and in order of their depth, with a given job tree being handled in its own transaction. The listener subscribes to the single remove event that will emanate from the jobTree root as a result. Again, we put all this together with how jobdispatcher and other server processes cooperate in a moment.

Creating The Volatile Data

During startup jobdispatcher creates the JobVolatile instances:

  // Create in-memory data
  call createVolatile();
  loginfo($catalog.jobcontrol.logger, "Volatile data created");

This is a simple function that reads all Job instances and creates their corresponding JobVolatile. It is only required on startup as any Jobs that are created while Job Control is running have their JobVolatile created then.

The createVolatile() function is straightforward:

/**
 * Create the volatile data for all the Job instances
 */
local function createVolatile()
{
  // Read the jobs
  any all = new(Job.All);
  read(Job, all, setname="allJobs");

  transaction
  {
    foreach(allJobs)
    {
      // Create associated JobVolatile. If we are being restarted then
      // volatile data instances will already exist, so check for
      // this.
      if (!read(JobVolatile, $loop.Job))
        create(new(JobVolatile, $loop.Job));
    }
  }
}

Building the Job Tree

During startup jobdispatcher builds the job tree in memory

  // Build the job tree
  hmap  m;  // Seed map
  any jobTree = call buildJobTree(root = m);
  loginfo($catalog.jobcontrol.logger, "Job tree created");

The seed map is of type hmap so the structure is event-live. As discussed above, this means listeners can be established on it to process events that arise within the structure, especially as a consequence of work performed by other processes.

Building the job tree is an example of recursion and use of the aggregate function. Here is buildJobTtree and a helper function addNextLevel():

local function buildJobTree(any root)
{
  // Read the top-level jobs, those with a ParentJob of null
  any k = new(Job.ByParent);
  read(Job, k, setname="topLevelJobs", target=root);
  aggregate(JobVolatile, root.topLevelJobs[@first].Job);

  // Start recursion to add successive child job levels
  call addNextLevel(root = root.topLevelJobs);

  // Unencumber the result from the seed root parameter while returning it
  remove(root.topLevelJobs);
}

local function addNextLevel(any root)
{
  any k = new(Job.ByParent);
  aggregate(Job,
            root[@first].Job,
            setname = "childJobs",
            key = cfunc f0 = {
                               // Set the key for each parent job,
                               // because we are joining self to ParentJob
                               k.ParentJob    = $loop.Job.Job;

                               k;
                             }
           );

  // Recurse for each child node set
  foreach(root)
  {
    aggregate(JobVolatile, $loop.childJobs[@first].Job);
    call addNextLevel(root = $loop.childJobs);
  }
}

The entry point to the recursion is buildJobTree() and the helper function addNextLevel() then recurses to create the node structure as shown:

jobtree

Starting The Job Timers

The last thing for initialisation to do is evaluate and start timers that will schedule the jobs. There is one timer per top-level job in the job tree, assuming, that is, that this job or a descendent defines a TimerExpr.

  // Evaluate the timers underneath each top-level job.
  call evaluateChildTimers(root = jobTree);
  loginfo($catalog.jobcontrol.logger, "Timers evaluated, starting any timers");

  // Start the timer of the root jobs that have defined (or inherited) one.
  // Even if the job is inactive, we'll fire the timer so that the NextRuns
  // field is always updated.
  foreach(jobTree)
  {
    if ($loop.Job.Active == enum(Active, Y) && !isnull($loop.aux.jobTimer))
      call startTimer($loop.Job, $loop.aux.jobTimer, userInfo = $loop);
  }
  loginfo($catalog.jobcontrol.logger, "Root timers started");

In summary, this involves:

  • traversing the job tree from the bottom up;
  • parse any timer expressions at the current tree level using the Inq compile() function;
  • on the way out of the recursion, sort the current level by the timers' nextRuns property and the Jobs' JobOrder field;
  • if a Job does not define a timer, inherit the earliest one from the (now sorted) children.

This leaves the job tree sorted throughout its depth and any timer available at the top level. These timers are then started as shown.