Concurrency and Events

Introduction

In earlier sections we have have briefly mentioned Job Control's concurrency requirements. In many cases Inq's transaction and locking model will be adequate in order to ensure data integrity and cooperation. However the Job Control application has some specific requirements that demonstrate how Inq's monitors and locks can be used to effect complex interactions between processes.

The jobdispatcher process is what runs Job Control. Like any process, it responds to events. jobdispatcher uses all three types as follows:

  1. Timer Events Timers are used to schedule jobs.
  2. Service Requests jobdispatcher can receive requests to shut it down or notify it a job has completed
  3. Listeners As we saw in Establishing Listeners, listeners are used to monitor changes taking place in the prevailing set of Job instances.

During initialisation Job Control builds the job tree. Thereafter jobdispatcher is waiting for a timer to fire, however other processes in the Inq environment may want to modify the Job set, perhaps creating a new sub-tree, deleting or setting job inactive. While Job Control is running such actions may require the jobdispatcher to re-evaluate the prevailing timers or manage changes to its in-memory job tree.

The Inq event model allows processes to interact without knowing about one-another. Inq raises events when a transaction commits or a node structure is modified. The listen() function allows a process to await transaction events in the environment or structure events arising within its node space. The petstore example listens for new orders being created and its client filters particular update events according to the field(s) changed.

Analysing the Event Model

So it can react to other processes altering the Job set, jobdispatcher solicits the necessary events by establishing suitable listeners during startup. However transaction and node structure events are delivered discretely - a single transaction by some client process could give rise to several events arriving at jobdispatcher. With any number of such processes able to manipulate disparate sets of Job instances simultaneously we must analyse the event flow to consider how many events are raised, when they are raised and how jobsidpatcher should process them.

The number of Jobs in an application is small. It is not an entity like, say, a bank statement entry whose numbers will run into the millions. When types have a large number of instances it is unlikely that overlapping sets of them would be viewed or manipulated by different processes at the same time. Atomicity is the main concern and Inq provides this automatically.

Note
In Inq, a statement like foo.bar += account.balance where foo is a managed instance is atomic because Inq locks foo before it reads the value of foo.bar.

Types like bank accounts do not generally have an impact on some other part of a system that require events to drive it. The join or mutate statements would likely be a better place to handle specific cases, such as going overdrawn.

With Jobs it would be reasonable to modify all existing instances at once, for example setting them inactive (with the statement Job.Active = enum(Active, N) looping over all instances). Life-cycle analysis may determine that an entity instance cannot be deleted, but this is not a restriction that need apply to Jobs. Lastly, whether Job Control is currently running and specific processing it should perform are not things that should clutter even API functions. Events are a better choice in this case.

When jobdispatcher receives an event it should be able to work on a stable set of Job instances. Unlike a bank account, with a small number of instances in existence it could be quite likely that several processes are acting on the same (or, since Jobs can have children) related instances. Overall, it makes sense to constrain any single client process and the jobdispatcher to run in sequence, with execution across these two processes completing before another client can start. The following diagram depicts such a flow:

sync

The diagram describes updating a set of Jobs but the same set of states is used when creating and deleting as well. We look at each of these scenarios below. The general flow is as follows:

  1. Some process modifies a set of Job instances; it must wait for the job tree state to be IDLE.
  2. When the tree state is IDLE the process can set it to USER_START, meaning it has commenced operating on the Job set.
  3. On completion, the process sets the tree state to USER_END and raises a notification. This notification takes place on a monitor mutually agreed between such client processes and jobdispacher.
  4. At the same time Inq will raise an event that jobdispatcher is listening for. When it awakes to process this event it checks the job tree state, if necessary waiting for the notification that it has the value .
  5. When jobdispacher sees the tree state is USER_END it sets it to DISPATCHER. This state means any other client process cannot embark on step 1 above, because the state is not IDLE.
  6. Lastly when jobdispacher has completed its processing it sets the tree state to IDLE and raises a notification. If a client process is waiting to commence some other action it will be expecting this state or the notification of it.

Communicating Sequential Processes

This pattern is one form of communicating sequential processes. Inq is providing the events passed between processes as their output and input and the variable $catalog.jobcontrol.jobTreeState represents the current state of the system. How are the synchronisation and transition between the states provided?

Locks and Notifications

Within jobControl.inq there are two related functions: awaitTreeState() and notifyTreeState():

/**
 * Obtain a lock on the mutex when (if not already) the jobtree state variable
 * attains requiredState. Then set it to newState.
 * @param requiredState the desired state
 * @param newState the new state
 * @param timeout the length of time in milliseconds the process is
 * willing to wait for the state to prevail
 */
local function awaitTreeState(JobTreeState requiredState,
                              JobTreeState newState,
                              long         timeout = -1)
{
  if (call isRunning())
  {
    if (lock("__jobtree",   // the mutex we are syncing on
             timeout,       // default timeout is indefinite
             func f = $catalog.jobcontrol.jobTreeState == requiredState))
    {
      // The lock() returns true if the lock is acquired and the expression
      // evaluates to true within the specified timeout. If the process is
      // willing to wait indefinitely then lock() will never return false.
      // It may be aborted with a system exception if deadlock is detected
      // and this process is chosen as the deadlock victim.
  
      $catalog.jobcontrol.jobTreeState = newState;
  
      // Once the new state is set unlock the mutex
      unlock("__jobtree");
    }
    else
      throw("Timeout while waiting for job tree state",
            enumext(JobTreeState, $stack.newState));
  }
}

The Inq lock function accepts up to three a arguments:

"lock" "(" <mutex> [ "," <timeout> [ "," <condition> ] ] ")"
mutex = <expression>
timeout = <expression>
condition = <func expression>

mutex is the variable on which to take out the lock.

Note
Inq identifies lock variables by equality so if string s has the value "__jobtree" the statement lock(s) is equivalent to lock("__jobtree").

timeout is the period the caller is willing to wait to acquire the lock in ms. If absent or -1 the wait is indefinite.

condition any condition that must be true for the lock to be acquired.

The return value of lock() is true if the lock is obtained, false if it is not because the timeout expired.

When a condition is present Inq evaluates the expression while holding the lock. If the expression evaluates to false the lock is released and a notification is awaited on mutex. Each time a notification is received the lock is re-acquired and the condition tested again. If it evaluates to true the lock is retained and lock() returns true.

In awaitTreeState() the mutex lock is not retained. It is only required to safely test and set $catalog.jobcontrol.jobTreeState, after which it is released using unlock().

The complimentary function is notifyTreeState():

/**
 * Obtain a lock on the mutex and set the jobtree state variable
 * to a new state value.
 * @param newState the new state
 * @param timeout the length of time in milliseconds the process is
 * willing to wait for the state to prevail
 */
local function notifyTreeState(JobTreeState newState, long timeout = -1)
{
  if (call isRunning())
  {
    if (lock("__jobtree",   // the mutex we are syncing on
             timeout))      // default timeout is indefinite
    {
      // Notify any process waiting on the jobtree mutex. The expression
      // is executed before the notification.
      notify("__jobtree", func f = $catalog.jobcontrol.jobTreeState = newState);
  
      // Having performed a notify() any waiting process performing a wait()
      // or a lock() with a condition as above is awakened. For conditional
      // locks the condition is rechecked then.
      unlock("__jobtree");
    }
    else
      throw("Timeout while waiting to lock job tree",
            enumext(JobTreeState, $stack.newState));
  }
}

Again, a lock is acquired on the mutex. This time there is no condition and the lock is being taken out just so that notify() can be called:

("notify" | "notifyall") "(" <mutex> [ "," <expression> ] ")"
mutex = <expression>

If there is an expression then this is executed before the notification is performed. This could equally well be a separate statement - placing the expression as an argument to notify() is just formalising it and for symmetry with the three argument version of lock().

If there are processes waiting on the mutex then notify() will wake one of them; notifyall() will wake them all.

Job Tree States

The following diagram shows how $catalog.jobcontrol.jobTreeState progresses through its various states and the process that executes in each case.

sync

In fact, we are not quite done in considering the event model. The diagram makes clear what we have only alluded to so far; that is that a single event must be dispatched by jobdispatcher's listeners for a given transaction committed by a client process.

A successful state transition requires that the process to execute in that state receives some impetus to do so. In Job Control these are

  • jobdispatcher - an event ( + a notification if not already USER_END)
  • client process - a notification (if not already IDLE)

Batching Events

A single event can be achieved by either choosing the amount of work contained in the transaction (that is manipulating only one Job instance) or batching the events into a single delivery bundle. The first is straightforward enough, albeit restricting the atomicity to a single instance. How does batching work?

In Inq a function can schedule to raise an event when its enclosing transaction commits. This is called a complete event and its payload is the bundle of create, update or delete events that arise in the transaction, The function must be called at least once for the event to be raised.

Functions raise the complete event when their declaration includes the raises clause. The events it carries are any combination of create, update and delete. If a function wishes to raise all event types it can specify any. Here are some examples:

local function foo(any arg) raises (update)
 .
 .

function bar() raises (create, delete)
 .
 .

local function fooobar() raises (any)

Listening to node structures for update and delete events, or $catalog for create events, dispatches those events to any listeners discretely, waking the process for each event. Using raises on a function allows a particular flow of execution to trigger the dispatch of that transaction's events as a bundle. In general, which is most appropriate will depend on the application's requirements.

Tying It All Together

Looking back at the code that establishes the listsners, what decisions have we made in respect of the foregoing? We look at how the events are processed in the next section. Here is the script that will raise them.

Create

When creating Jobs we decide to support only one instance at a time within its own transaction:

local function createJob(any Job, any parent)
{
  if (parent && call isTask(Job=parent))
    throw("parent must be a box", parent);
    
  if (parent)
    Job.ParentJob = parent.Job;

  transaction
  {
    // Acquire the appropriate job tree state - user activity starting
    call awaitTreeState(requiredState = enum(JobTreeState, IDLE),
                        newState      = enum(JobTreeState, USER_START));

    // Submit the instance to the transaction for creation
    create(Job);
  }
  catch
  {
    // If something went wrong then the job dispatcher will not be
    // woken by a create event.  Return the tree state to idle....
    call notifyTreeState(newState = enum(JobTreeState, IDLE));
    throw();
  }

  // ...On the other hand if creation is successful tell the job
  // dispatcher we have finished and allow it to re-evaluate the
  // job tree.
  call notifyTreeState(newState = enum(JobTreeState, USER_END));
  
  // Return the (managed instance) created Job
  read(Job, Job);
}

This is not so much of a compromise for a type whose set is small and not volatile. This function is self-explanatory, performing some precondition checks and using simple exception handling to ensure the tree state protocol is adhered to.

Update

For updates we can support modifying any number of instances by using the raises clause. There are two functions - modifyJob for updating a field of a single instance and modifyJobs for updating multiple instances.

/**
 * Update a Job instance. This function updates a Job
 * instance from the supplied argument. It is illegal
 * for a server process other than jobdispatcher to update
 * instances directly and this is enforced by Job.<join>.
 * Instead this function must be used so that an event is
 * raised to hand off to jobdispatcher, where any necessary
 * rescheduling is performed.
 * 
 * If called from a connected client the instance will not
 * be the server managed one. This is retrieved and assigned
 * in its entirety.
 * 
 * If called passing a specific field just that field
 * is updated.
 * 
 */
local function modifyJob(any Job, any field, any value) raises (update)
{
  logfine($catalog.jobcontrol.logger, "Updating with {0}, field {1}, value {2}", Job, field, value);
  transaction
  {

    // Lock the job tree in the child transaction
    call awaitTreeState(requiredState = enum(JobTreeState, IDLE),
                        newState      = enum(JobTreeState, USER_START));
                        
    if (field)
    {
      if (field == "FunctionExpr" ||
          field == "TimerExpr")
        setblob(Job.{field}, value);
      else
        Job.{field} = value;
    }
    else
    {
  
      // Alias the argument as we are about to replace it with the
      // managed instance. 
      any job = Job;
  
      if (read(Job, job))
        Job = job;
    }

    // Check if the Job was modified at all. If not there will be
    // no update events and no event raised on the execution of
    // this service. This means there will be no hand-off to the
    // job dispatcher.
    any modified = ismodifying(Job);
  }
  catch
  {
    // If something went wrong then the job dispatcher will not be
    // woken by the raises(update) event.  Return the tree state to idle....
    call notifyTreeState(newState = enum(JobTreeState, IDLE));
    throw();
  }

  // Modification was successful. If anything was actually changed tell
  // the job dispatcher we have finished and allow it to re-evaluate the
  // job tree.
  if (modified == true)
    call notifyTreeState(newState = enum(JobTreeState, USER_END));
  else
    call notifyTreeState(newState = enum(JobTreeState, IDLE));
}

Like the create createJob this function uses simple exception handling to correctly manage the tree state. However in addition to that we need to know if anything is actually being modified at all. The Inq ismodifying(<managed_instance>) function returns true if a managed instance is joined in the transaction and has at least one modified field; false otherwise.

modifyJobs() is intended to be called from a GUI client and is similar. It accepts a list of unmanaged Jobs and sets the server-side ones to them:

local function modifyJobs(any jobs) raises (update)
{
  logfine($catalog.jobcontrol.logger, "Updating with {0}", jobs);
  transaction
  {
    // The client could choose to send us jobs that are not, in fact,
    // modified at all. In this case there will be no update events
    // and no event raised on the execution of this function. This means
    // there will be no hand-off to the job dispatcher
    boolean modified;

    // Lock the job tree in the child transaction
    call awaitTreeState(requiredState = enum(JobTreeState, IDLE),
                        newState      = enum(JobTreeState, USER_START));
    foreach(jobs)
    {
      if (read(Job, $loop.Job))
      {
        Job = $loop.Job;
        modified ||= ismodifying(Job);
      }
    }
  }
  catch
  {
    // If something went wrong then the job dispatcher will not be
    // woken by the raises(update) event.  Return the tree state to idle
    // before rethrowing....
    call notifyTreeState(newState = enum(JobTreeState, IDLE));
    throw();
  }

  // Modification was successful. If anything was actually changed tell
  // the job dispatcher we have finished and allow it to re-evaluate the
  // job tree.
  if (modified)
    call notifyTreeState(newState = enum(JobTreeState, USER_END));
  else
    call notifyTreeState(newState = enum(JobTreeState, IDLE));
}

It also uses simple exception handling to ensure proper state transition management.

Note
transaction{...} catch {...} finally {...}

is the same as

try{...} catch {...} finally {...}

but with a nested transaction. The transaction is committed when the transaction{...} code block closes.

Delete

Deleting Jobs is more interesting. For create and update it's not important how many instances are affected or in what order they are involved in the transaction.

When listening out for deletions, rather than listening for the delete event raised on the instance we chose to listen for remove events. Here is the code again:

  // Listen to the root of the job tree for deletion events from within
  // By specifying the typedef we will only get events that relate to
  // automatic pruning of the node-set structure when Job instances are
  // deleted.
  any $this.listeners.deleteJobListener = listen (jobTree,
                                        func f = { call jobDeleted(@eventId, @eventData); },
                                        event = (remove),
                                        typedef = Job);

The remove event occurs when a node is removed from within an event-live structure. Inq prunes node-set children when their primary typedef instance is deleted. In this case the remove or delete seem to serve the same purpose. Why choose this over the delete event?

When a Job is deleted, if it is a box we must delete all its children too. Remembering that a client transaction must raise just one event for whatever it does, provided the child jobs are deleted after their ancestor any remove events that arise from them will not propagate beyond the point in tree where the ancestor resided, because its container has already been removed.

Here is the deleteJobs() function and its associated helper:

/**
 * Delete the given set of jobs. If a job is a box then its children are
 * deleted also.
 */
local function deleteJobs(any jobs)
{

  // Deleting a job causes any child jobs to be deleted also. In
  // case any of the set are descendants of one another take some trouble
  // to sort them by their depth.

  omap m;  // map must be orderable for sort, below.
  foreach(jobs)
  {
    any k = getprimarykey($loop.Job);
    any m.{k}.Job = $loop.Job;
    int m.{k}.aux.depth = call findDepth($loop.Job);
  }

  sort(m, $loop.aux.depth);

  foreach(m)
  {
    // If the current job is a descendant of one deleted earlier it will
    // already have been deleted within the transaction. This means read()
    // will not return it.
    if (read(Job, $loop.Job))
      call deleteJobTree(Job);
  }
}
/**
 * Delete the specified job and all its descendants. Deletion occurs from the
 * specified job downwards, and events are raised in the order instances
 * are deleted. This means any structures that are automatically pruned by Inq
 * as deletion events are propagated from the node will only result in a single
 * event emanating from the root.
 */
local function deleteJobTree(any Job)
{
  transaction
  {
    // Lock the job tree in the child transaction
    call awaitTreeState(requiredState = enum(JobTreeState, IDLE),
                        newState      = enum(JobTreeState, USER_START));

    // Delete the given root Job
    delete(Job);

    // Delete its children
    call deleteChildJobs(Job);
  }
  catch
  {
    // If something went wrong then the job dispatcher will not be
    // woken by the remove event emerging from the job tree.
    // Return the tree state to idle....
    call notifyTreeState(newState = enum(JobTreeState, IDLE));
    throw();
  }

  // ...On the other hand if deletion was successful tell the job
  // dispatcher we have finished and allow it to re-evaluate the
  // job tree.
  call notifyTreeState(newState = enum(JobTreeState, USER_END));
}

deleteJobs(any jobs) accepts a list of jobs to be deleted. There is no restriction on how the jobs in the list are related - in particular we must determine whether any job could be a descendant of another in the list.

To do that the function places each job in an ordered map in preparation for sorting and associates with each its depth in the job tree [the findDepth() function is not reproduced here - see app/examples/jobcontrol/jobControl.inq]. The map is then sorted in depth order. Each sub-tree is then deleted in its own transaction, taking into account that should any job be a descendant of one deleted earlier it will already have been deleted.

With the correct choice of event and some careful processing it is straightforward enough to support the arbitrary deletion of Jobs and achieve the desired event flow.

Handling the Events

We have seen how to listen for events and discussed how they are raised. How are events dispatched to a listener processed?

When Inq dispatches a listen event it places some items on the stack. These are

listen() Dispatch Arguments
Name Description

@eventId

The ID of the event. This is a map containing the basic event type (that is create, update, exec and others). The ID is how Inq filters the event so it also contains an instance's modified field names and the path through which it has travelled prior to the dispatch point.

@eventData

The payload the event is carrying. What this depends on the event being dispatched. For instance events (create, update or delete) the payload in the instance. For exec events it is the event bundle.

We can see these items being referenced in the dispatch function of our various listeners. In those examples they are passed as arguments to other functions, ascribing names that identify them for clarity.

Create

To process a create event jobdispatcher has to identify where beneath $this.jobTree the new Job instance should go, that is, what its parent Job is (if its not at the top-level). If the new instance is its parent's first child then a new tree level is created. This is done by the helper function findTreeLevel() (see jobControl.inq).

Here is the newJobCreated function that the listener calls, with the important lines highlighted:

local function newJobCreated(any Job)
{
  // Lock the job tree in the default transaction
  call awaitTreeState(requiredState = enum(JobTreeState, USER_END),
                      newState      = enum(JobTreeState, DISPATCHER));

  try
  {
    // A reference parameter to findTreeLevel. The index of the
    // subtree the Job resides in is returned in this variable.
    int subtreeIndex = null;

    any treeLevel = call findTreeLevel(root = $this.jobTree,
                                       Job,
                                       ParentJob = null,
                                       subtreeIndex);

    // A node-set child for the new Job:
    hmap m;

    // Place the new Job instance in it
    any m.Job = Job;
    
    // Include its volatile data
    aggregate(JobVolatile, m.Job);
    
    // Put the new job into the treeLevel node. Note we use
    // the primary key of the Job as the node set map key in the same way as
    // the Inq built-in functions like read() and aggregate() do.
    any k = getprimarykey(Job);
    any treeLevel.{k} = m;


    if (call isTopLevel(Job))
    {
      // A top-level job.

      // findTreeLevel will not have set subtreeIndex.
      subtreeIndex = indexof(treeLevel, k);
    }

    // If the new job has a timer expression then
    // re-evaluate the timer for its subtree.
    if (!isnull(Job.TimerExpr))
      call restartSubtreeTimer(subtreeIndex);
  }
  finally
  {
    // Ensure the tree state is returned to idle even if we incur a
    // system exception.
    call notifyTreeState(newState = enum(JobTreeState, IDLE));
  }
}

Update

When a Job is updated there are only certain fields whose alteration can have any effect as far as jobdispatcher is concerned. These are a Job's active state, its timer expression and its order amongst its siblings.

Job Control starts one timer for each sub-tree at the job tree root. As we discussed above, any number of instances can be modified in the client transaction and these are delivered as a bundle. Processing these updates involves determining which sub-tree the job resides in (in order to determine the distinct set) and then re-evaluating the timer for the affected sub-tree(s).

The event bundle is passed (from the listener function) as jobEvents. This is a collection of update events so the @eventData contained within each event is the Job instance. Here is the function:

/**
 * Maintain the job tree after mutation events arising from job modification.
 * This function is a handler for the "complete" event raised when functions
 * that modify Jobs are executed and their transaction commits.
 *
 * @param jobEvents the event bundle - there will be one event
 * per instance modified.
 */
local function jobsModified(any jobEvents)
{
  // Await hand-off from the process doing the modification
  call awaitTreeState(requiredState = enum(JobTreeState, USER_END),
                      newState      = enum(JobTreeState, DISPATCHER));

  try
  {
    // Jobs may have been modified in different subtrees. Make a note
    // of them as we only re-evaluate at the end
    set treeLevels;

    foreach(jobEvents)
    {
      // A new variable each time through the loop. Important as we will be
      // placing it in the treeLevels set if not previously seen
      int subtreeIndex = null;

      // Get the Job instance out of the event...
      any Job = $loop.@eventData;

      // ...find the sub tree index it resides in
      call findTreeLevel(root = $this.jobTree,
                         Job,
                         ParentJob = null,
                         subtreeIndex);

      // Check if the TimerExpr, JobOrder or Active fields have changed. If so,
      // re-evaluate the timer for this tree.
      if (contains($loop.@eventId.fields, "TimerExpr") ||
          contains($loop.@eventId.fields, "JobOrder")  ||
          contains($loop.@eventId.fields, "Active"))
      {
        // Note down the subtree. The "set" type remains distinct and does
        // not raise an exception if a duplicate is added.
        treeLevels += subtreeIndex;
      }
    }

    // Re-evaluate all affected subtrees
    foreach(treeLevels)
    {
      call restartSubtreeTimer(subtreeIndex = $loop);
    }
  }
  finally
  {
    // Ensure the tree state is returned to idle even if we incur a
    // system exception.
    call notifyTreeState(newState = enum(JobTreeState, IDLE));
  }
}

Within each event's ID the fields that were changed by the client transaction are at @eventId.fields. This is a set and we can check if the field names it contains are relevant to the listener's operation.

The alternative way to discriminate an event by the fields changed is to listen for update events and specify the fields argument. The petstore example does this. Using the exec event raises the possibility that a listener could be woken unnecessarily, however by ensuring the function only updates relevant fields in the first place means this can be avoided by the appropriate code flow.

Delete

Again, the event processing for deleting jobs is more interesting. From the foregoing discussion the event we will receive is a remove event. The payload for this event type is the node that was removed - the node-set child. One event is received for each distinct sub-tree that was deleted.

The timer that schedules a top-level job is either defined by that job or one of its descendants. As jobdispatcher builds the job tree it creates the timers and sorts each level according to its start time and job order.

Hence to process this event - if the Job was a top-level one then its associated timer must be cancelled.

If it was somewhere within the job tree then that sub-tree's timer is re-evaluated. In fact, this is only necessary if the job was the first at that level - it may have propagated a timer to the top level.

By the time the event is delivered the node-set child has been removed from jobdispatcher's job tree. This is done by the executing client process and is thread-safe.

Note
Inq structures are not, in general, thread safe. That an event-live structure built in one process can have elements within it removed via the events raised from another is part of Inq's internal concurrency implementation.

If processes pass structures between them, for example via a timer's userInfo or a service request argument, the sender should not retain a reference.

Processing create events involves adding something to the job tree structure that was not there before. We can find the appropriate level at which to place the node because the Job type includes the ID of the parent, which must already exist.

Processing update events may reorder the tree (because jobs are sorted by their run time and ordinal position) but otherwise the structure does not change.

When processing a remove event we may need to cancel or re-evaluate a timer, but how do we know which sub-tree the removal was from? We cannot traverse the tree looking for the job because it has already been removed. How can the timer be cancelled?

The payload carried by a remove event is the node that was removed. When the removal is a consequence of a manage instance deletion this is always a node-set child. In the job tree, such nodes look like this:

deletepayload

For a remove event the @eventId includes the following:

  • path the path through which the event travelled from its origin to its dispatch point (the node being listened to).
  • parent the path to the node where the removal took place (that is the parent of the node removed).
  • vector when the removed node was the child of a node set and the node set supports vector access, the index of the removed node.

All this information can be put to good use, telling us all we need to know to process the event:

local function jobDeleted(any @eventId, any @eventData)
{
  call awaitTreeState(requiredState = enum(JobTreeState, USER_END),
                      newState      = enum(JobTreeState, DISPATCHER));
  try
  {
    // If a top-level job was deleted the only thing we have to do is cancel
    // any timer.
    if (call isTopLevel(@eventData.Job))
    {
      logfine($catalog.jobcontrol.logger, "Checking timer: {0}", @eventData.aux.jobTimer);
      if (@eventData.aux.jobTimer)
      {
        logfine($catalog.jobcontrol.logger, "Canceling timer: {0}", @eventData.aux.jobTimer);
        canceltimer(@eventData.aux.jobTimer);
      }
    }
    else
    {
      // For child jobs we only need to re-evaluate the timer driving the
      // subtree the job is in if (at whatever level) the deleted job
      // was at vector position zero. Otherwise there is nothing to do.
      if (@eventId.vector == 0)
      {
        // We don't know what subtree it is in. We cannot use
        // call findTreeLevel() because the job has been removed from
        // the structure. Instead, use the path to the parent available
        // in the event and convert it to indices.
        any indices = indicesof($this.jobTree, @eventId.parent);

        // re-evaluate and start the timer for this subtree
        call restartSubtreeTimer(subtreeIndex = indices[0]);
      }
    }
  }
  finally
  {
    // Ensure the tree state is returned to idle even if we incur a
    // system exception.
    call notifyTreeState(newState = enum(JobTreeState, IDLE));
  }
}

What does indicesof() do?

indicesof(<expression>, <expression>)
Returns an array containing the indices of each node yielded as successive elements of path are applied to some root. All the nodes in the path must support vector access or an exception is thrown.

The second argument must evaluate to a path which is applied to the node given by the first. The path's elements are applied to yield successive nodes whose index in their parent is placed in the array returned.

That code branch only applies if the deleted Job was not a top-level one (and only then if it was at position zero in its own level). The parent path supplied in the @eventId makes it possible to find out which sub-tree the Job was in. [Inq also has nodesof which takes the same arguments and returns an array of the nodes found along the path].

If the timer needs to be cancelled then because it was stored in a sibling container to the Job it is available in the node that gets sent back in the remove event.

System Integrity

Inq does not have any kind of data hiding like object oriented languages do. If script has a reference to a managed instance it can simply assign to its fields. When this happens Inq locks the instance and places it into the invoking process's current transaction.

In the Job Control application though, it is important that this is done with the appropriate state transition. In particular, any client process (that is any process other than jobdispatcher) must arbitrate for and set the job tree state to USER_START.

Although the various API functions in inq.jobcontrol do this what is to stop any process from simply modifying Jobs and introducing concurrency errors or deadlocks waiting for a state transition that does not come? The various life-cycle (or put another way transaction phase) statements that a typedef has are the places to trap such eventualities.

Job.<construct>

The <construct> statement in the Job typedef calls the scripted function isTreeState():

construct (
{
  // Validate the tree state to ensure the integrity of jobDispatcher
  call isTreeState(requiredState = enum(JobTreeState, USER_START));
    .
    .    

The isTreeState() function checks the job tree state and throws an exception if it is not as expected:

/**
 * Obtain a lock on the mutex and then check the jobtree state variable
 * is requiredState.
 * @return Undefined. Returns if the state is requiredState, otherwise
 * throws. 
 */
function isTreeState(JobTreeState requiredState,
                     long         timeout = -1)
{
  if (call isRunning())
  {
    if (lock("__jobtree",    // the mutex we are syncing on
             timeout))       // default timeoout is indefinite
    {
      any ret = $catalog.jobcontrol.jobTreeState == requiredState;
      unlock("__jobtree");
      if (!ret)
      {
        throw("Illegal tree state " +
              enumext(JobTreeState, $catalog.jobcontrol.jobTreeState) +
              " wanted " +
              enumext(JobTreeState, $stack.requiredState));
      }
    }
    else
      throw("Timeout while waiting for job tree lock");
  }
}

The <join>, <mutate> and <destroy> statements do the same thing, ensuring that the system operates correctly.