Order Processor

Order Processor

Booting the petstore server is achieved by asking the server to parse the file psBoot.inq. The command to do this was shown in the blueprint summary. There is not much to say about psBoot.inq other than the fact that it acts as a single script to load all of petstore. One of those files however is psProcessOrders.inq and this is interesting because it starts a process in the server, run by a timer, to process new orders.

An Order has a number of associated OrderStatus instances that record the status of each Item in the Order. This status starts as Open, progresses to Allocated and ends up as Shipped. The process wakes every so often and does the following:

  • alternately look for OrderStatus instances that are Open or Allocated
  • selects one item from each the order and moves it to the next state
  • if all OrderStatus within a given Order are Allocated then move them and the Order to Shipped.
  • when an Order reaches the Shipped state report it to the Pet International Exchange

The processing involved demonstrates the following Inq language and envionment features

  • starting a detached process and using lock/wait/notify to coordinate its initialisation
  • timers, showing how a timer carries a piece of user information acting as a parameter to the timer event
  • the use of groupby as a means of processing the OrderStatus instances according to the Order they belong to
  • placing script in a mutator so that specific actions can be taken when managed instances are updated in a certain way
  • using JMS to integrate with external processes via a message broker
  • creating XML as the broker message payload
  • generating random numbers
  • dealing with exceptions

The Order Processor simulates events likely to originate external to petstore, so we call it the spoof processor.

Startup

The spoof processor is started by the following script at the end of psProcessOrders.inq:

// Start the spoof processor.
// Notes:
//   1. Wait for the new process to start - it will notify on the
//      monitor "spoof". We give it 5 seconds to start before logging
//      a severe message. 
lock("spoof");
if (!$catalog.ps.spoofProcessor)
{
  any spoofProcessor = spawn("SpoofProcessor",
                             type  = PROCESS_DETACHED,
                             start = call spoofStart(),
                             syncext = true);
  
  if (wait("spoof", 5000))  // If all ok - lodge process id in $catalog
    any $catalog.ps.spoofProcessor = spoofProcessor.id;
  else
    logsevere($catalog.ps.logger, "Spoof order processor not started after 5 seconds, may be");
  
}

This code appears outside of a function or service definition, so runs as it is parsed.

  1. Lock the string "spoof" to act as a guard. Locks are released when the enclosing transaction terminates, so while possible, it is often not necessary to unlock such a monitor.
  2. Start the detached process, called "SpoofProcessor". The function spoofStart() runs immediately in the current process but with $this as the root of the new process's node space. Any initialisation required before the new process begins can be performed in the start function.
  3. The syncext argument is set to true because as well as its timer, the new process will also be processing price feed messages from the exchange simulator. For more about this see the JMS Mini Guide.
  4. Use the wait function to await a corresponding notify from the new process - returns true if a notification arrives in the specified time or false otherwise.
  5. Put the new process's id at a well known location.

The spoofStart() function does not need to do anything to initialise the new process before it runs, so it just sends it a service request:

local function spoofStart(any process)
{
  // Just send the service request to the new process.
  send doSpoof(@channel = process.ichannel);
}

For its part, the doSpoof() service (running in the new process, not the one that started it) is shown below and does the following

  1. Establish an exception handler for the process. Anything uncaught will be logged.
  2. Initialise the messaging environment to send completed orders and receive prices (see psRegulatory.inq for initRegulatoryReporting() and initSendPrices())
  3. Initialise the timer
  4. Notify the starting process we are done.
service doSpoof()
{
  // Log uncaught exceptions
  setexceptionhandler(cfunc f = {
                                  // Just log the whole stack
                                  logsevere($catalog.ps.logger, "{0}", .);
                                }
                     );

  // Set up $root.i18n in case anyone references it. Assume en
  any $root.i18n = "en";
  
  // Initialise the Regulatory Reporting system. See psRegulatory.inq
  try
  {
    call initRegulatoryReporting();
    call initSendPrices();
  }
  catch
  {
    logsevere($catalog.ps.logger, @exception);
    logsevere($catalog.ps.logger, @stackTrace);
  }
  
  // Create the timer.
  // Notes:
  //   1. The function processOrders() will be called after 10 seconds.
  //   2. This timer does not specify a period so it will not restart automatically.
  //      It is restarted in the handler function.
  //   3. The optional 'start' argument defaults to true. We don't want to start the
  //      timer before we have set its userInfo property, which is used to pass
  //      the state of the items to be processed and their desired new state.
  //   4. Start the timer - it will fire after a 10 second delay 
  any $this.timer = createtimer(cfunc f = call processOrders(fromTimer), 10000, start=false);
  any userInfo.oldState = new(OStatus, enum(OStatus, O));
  any userInfo.newState = new(OStatus, enum(OStatus, A));
  $this.timer.properties.userInfo = userInfo;
  starttimer($this.timer, 10000);

  // Tell the invoking process we are up and running. 
  notify("spoof");
  
  loginfo($catalog.ps.logger, "Spoof order processor started");
}

Notice that initRegulatoryReporting() and initSendPrices() are protected with a try...catch block, because it is important that the starting process is always notified.

The Timer

The timer runs after an initial delay of 10 seconds. Its handler function (which does not have to be a call statement) sees $stack.fromTimer, which is the timer that has fired.

The timer has a userInfo property which can be initialised to any data item. In the handler function it can then be accessed at fromTimer.properties.userInfo.

Handler Function

The timer's handler function is to call processOrders(any fromTimer). When a timer fires it places itself on the stack as fromTimer. The job of a handler function is obviously to perform the desired tasks for the timer and, if the timer is one shot, restart it if required.

The timer's userInfo property is a map containing the current and new state to move the OrderStatus instances to. The key OrderStatus.ByStatus is used to retrieve the instances as a node set. If the set is not empty it is processed by progressOrders(any items, any newState).

Grouping the OrderStatus Instances

Because we want to process one OrderStatus instance per Order, having read all those with the required status progressOrders() uses groupby to transform the structure as depicted here:

groupby

Here is the usage of groupby that achieves this:

smap grouped;
groupby(items,
        cfunc distinctF = {
                            // Return the value that defines the grouping
                            // criteria. This is available as @name in startF
                            // and foreachF. In this example this is the Order
                            // field.
                            $loop.OrderStatus.Order;
                          },
        cfunc startF    = {
                            // Create a child container for the items that will
                            // reside in this group, defined by @name. Later, when
                            // accessing an individual item we require vector access
                            // so use an omap.
                            omap grouped.{@name};
                          },
        foreach = cfunc foreachF  =
                          {
                            // Place the current child in the group. Managed
                            // typedef instances carry their primary key, which
                            // is used as a path element in the grouped structure.
                            // For any iteration, if the Order field is 12345 then
                            // the result is a structure like
                            //     grouped.12345.{k}.OrderStatus
                            any k = getprimarykey($loop.OrderStatus);
                            any grouped.{@name}.{k} = $loop;
                          }
       );

The output structure (called grouped in psProcessOrders.inq) is a two-level structure where the second level contains only OrderStatius instances relating to the same Order.

Random Number Generation

When moving from Open to Allocated, progressOrders() calls allocateOne() for each Order:

  switch
  {
    when(newState == enum(OStatus, A))
    {
      foreach(grouped)
        call allocateOne(items = $loop);
    }
    .
    .

This function uses Inq's random function to pick one of the OrderStatus instances from the current list being processed:

random(int i, count(items));

any OrderStatus = items[i].OrderStatus;

In turn, this function calls the support function psOrders.inq:modOrderStatus().

Mutators

OrderStatus.<mutate>

The function psOrders.inq:modOrderStatus() is a state machine to manage the allowable transitions of OrderStatus.Status. It takes out a lock on the instance so that it can safely test and then possibly modify the field.

In OrderStatus.inq the typedef's mutate statement is:

  if ($this.new.Status == enum(OStatus, A))
  {
    if (!call takeFromInventory(OrderStatus = $this.new))
      $this.new.Status = $this.old.Status;
  }
  else if ($this.old.Status == enum(OStatus, A) && $this.new.Status != enum(OStatus, S))
    call returnToInventory(OrderStatus = $this.new);

  $this.new.Timestamp = getdate();

It looks at the instance's new state in order to manage petstore's item inventory.

Conventional script and the mutator statement are alternatives when coding application logic. What are the reasons for choosing one over the other?

  • The mutate statement is executed during a transaction's commit phase. While it can throw an exception, the only place such an exception can be handled is in the process's default exception handler. In any practical sense the transaction can only be aborted.
  • A function offers more flexibility and can deal with any problems specifically and earlier on in the transaction.
  • The mutate statement is more suited to adding further processing to the transaction, such as inventory processing in this example, or vetoing the change. Exceptions should be considered terminal.
  • When the mutate statement runs the instance is locked within the committing transaction, unlike functions that must manage any required lock themselves.

Order.<mutate>

The spoof order processor sets an Order's status to Shipped when all items have been allocated. When this happens the Order type's mutate statement runs. It is a convenient place to report the order to the exchange, calling reportOrder():

  // We can't change certain fields. If we try to, throw
  if ($this.new.Account != $this.old.Account ||
      $this.new.OrderDate != $this.old.OrderDate)
    throw("Illegal Order mutation\nOld: " + $this.old + "\nNew: " + $this.new);

  // If the Status of the order is changed to Shipped then
  // we need to report it to the Pets International Exchange
  if ($this.old.Status != $this.new.Status &&
      $this.new.Status == enum(OStatus, S))
    call reportOrder(Order = $this.new); 

Looking at reportOrder() (see psRegulatory.inq), this has no exception handling of its own, so if the reporting fails the Order will not be shipped.

Order Reporting

Part of order processing is to report shipped orders to an exchange. The Pet International Exchange is simulated in petstore and scripted in exchangeSimulator.inq. This runs separately from the petstore server and instuctions for starting it are given in the blueprint summary.

The spoof order processor is also responsible for receiving item prices made by the exchange. Orders are reported by sending a broker message (whose payload is XML text) to the queue FILL.PIE and prices received by subscribing to the topic FEED.PIE, where they are posted by the exchange. Price messages are also XML so the order processor has to parse these in order to handle them.

exchange

Inq's support for operating with message brokers is covered in the the JMS Mini Guide and fully exercised with the examples included in the distribution. In this section we focus on the production and consumption of the message's XML payload. This further illustrates XML handling, in a different way to that shown when generating printed reports.

Generating an Order Report

Here is an example report sent to PIE:

orderreport

It contains limited fields from the Order and LineItem instances comprising the order, anonymised, if you will. It can be seen from the expanded XML that the Inq structure is a one-to-many aggregation from Order to LineItem:

function reportOrder(any Order)
{
  // Create the Inq structure that will make the XML message
  any xml.Order = Order;
  
  // Aggregate this Order's LineItem instances and double-check
  // there are some.
  aggregate(LineItem, xml.Order, setname="items", keyname="ByOrder");

The ioXMLX Inq stream type is covered in the discussion of printed reports in My Orders. For exchange reporting our XML production is quite a lot simpler. We start by setting up the Inq structure to look like the desired XML production by creating root.reportorder and using ioXMLX's includes property to prune the output:

  if (count(xml.items) > 0)
  {
    // Prepend root element. Rename root child
    any root.reportorder = xml;
    
    // Prepare the stream to create the XML.
    
    // Set up those paths we wish to be included in the XML production.
    // Other paths that would have been generated in the traversal to
    // produce the XML structure will not be output.
    set includes = (path($this*Order.Order),
                    path($this*LineItem.Item),
                    path($this*LineItem.Qty),
                    path($this*LineItem.UnitPrice));
    
    // Declare the stream
    ioXMLX xmlStrm;
    
    // Apply the includes created above
    xmlStrm.properties.includes = includes;

Using the lazy evaluation paths like $this*LineItem.Qty means that the path will match with anything that ends with LineItem.Qty at the current node. In this way all LineItem instances in the structure will be subject to the required pruning.

The XML produced is going to a foreign system, so all metadata generation is switched off (in fact this the default so just for emphasis):

    // Other properties
    xmlStrm.properties.writeMeta     = false;
    xmlStrm.properties.inqAttributes = false;
    xmlStrm.properties.childName     = "item";

Lastly the tag name used for node set children is set to item. Since there is only one node set in the structure this is a suitable way to get what we want. If the structure was more complex then the names of generated tags can be controlled using either the tagNames or tagFuncs properties.

To generate the XML as a broker message payload the stream is opened for write access, specifying its sink as a string variable. Inq's string:// URL protocol takes the trailing characters as a node path, defalting to the $stack node space, as for normal references:

    string msg;
    open(xmlStrm, "string://msg", OPEN_WRITE);
    writestream(xmlStrm, root);
    close(xmlStrm);
    
    // Send to the Exchange
    $process.ps.txtMsg.properties.text= msg;
    mqsend($process.ps.toExchange, $process.ps.txtMsg);
    
    loginfo($catalog.ps.logger, "Reporting Order as {0}", msg);
  }

It is then simply a matter of assigning the variable, now containing the produced XML, to the broker message's text property and sending it. The destination and reused message were created in initSendPrices().

Processing a Reported Order

The exchange simulator is coded in exchangeSimulator.inq. It establishes a message listener on the queue FILL.PIE to await filled orders from petstores.

As we have seen, an order report contains one or more items. For each item, Exchange Simulator makes a new price (by randomly adjusting it +/- 10%) and publishes that price to the topic FEED.PIE. We look at this is a little detail, again not worrying too much about the message broker aspects.

Configuring ioXMLX

The tagNames and tagFuncs properties are honoured by ioXMLX when it reads XML text. Indeed, when considering how XML and DOM differ, ioXMLX's default action when encountering repeated elements at a given level is to ignore second and subsequent ones. Given that an order report message may contain several <item> tags we need to intervene during parsing to handle this.

During initialisation (see the exchangeSimulator.inq:doSimulator() service) Exchange Simulator sets up the stream it uses to parse the payload like this:

// Set up a stream for parsing received orders
ioXMLX $this.xmlStrm;
object p = path($this*items.item);
any tagFuncs.{p} = cfunc f = call itemTag();
$this.xmlStrm.properties.tagFuncs = tagFuncs;

The path match is on *items.item. While it might appear that *item would also be suitable, the stream is reused to produce the price feed message. This has the form itemsold.item which would also match the path, calling the tag function unexpectedly.

In this example the tag function (whose arguments are the same as those for XML production) simply has to make the element name unique to satisfy Inq's structure requirements. It does this by suffixing the ordinal value to the given name ("item") and returning the resulting string:

local function itemTag(any node,
                       any parent,
                       any nodeName,
                       any ordinal,
                       any content,
                       any last,    // always false - SAX parser does not know
                       any descend,
                       any attributes)
{
  nodeName + ordinal;
}

Parsing the XML

Exchange Simulator sets up the function fillReceived() as its message listener. The first task in processing an order report is to parse the XML payload using the pre-configured ioXMLX stream:

local function fillReceived(any message)
{
  if (any xmlMsg = message.properties.text)
  {
    // writeln($catalog.system.out, "Reading message: " + message.properties.text);
    
    // Parse the message
    open($this.xmlStrm, "string://xmlMsg", OPEN_READ);
    any fill = readstream($this.xmlStrm);
    close($this.xmlStrm);
Note
Only broker messages that are text messages support the text property, so we can use this fact to ignore messages of any other type sent to the queue.

When parsing XML text, ioXMLX builds a structure for the entire document in a single call to readstream(). Note that, at present, properties such as includes and excludesBelow are ignored during reading.

Generating a Price Feed Message

The exchange produces one message for every item it receives, so it loops over the batch within the order:

    foreach(fill.reportorder.items)
    {
      // Spoof a new price
      decimal:2 price = $loop.LineItem.UnitPrice;
      call adjustment(price);
      
      // Replace the UnitPrice in the parsed XML (which is a string)
      // with the new price (which is decimal:2)
      any $loop.LineItem.UnitPrice = price;
      
      // Create the XML content
      any root.itemsold.item = $loop.LineItem;
      open($this.xmlStrm, "string://xmlMsg", OPEN_WRITE);
      writestream($this.xmlStrm, root);
      close($this.xmlStrm);
      
      // Reuse the received message and publish
      mqclearmessage(message);
      message.properties.text = xmlMsg;
      mqsend($this.priceFeed, message);
    }
  }
}

Here is what the resulting message looks like:

pricefeed

Handling Price Feed Messages

In psRegulatory.inq:initSendPrices() petstore sets up a message listener of its own, in this case attached to the topic FEED.PIE. The function established is priceReceived(). It parses the message, retrieves the Item and updates its LastPrice and LastPriceMove fields:

local function priceReceived(any message)
{
  // Extract the xml text
  any xmlMsg = message.properties.text;
  
  // Parse it
  ioXMLX xmlStrm;
  open(xmlStrm, "string://xmlMsg", OPEN_READ);
  any priceMsg = readstream(xmlStrm);
  close(xmlStrm);
  
  loginfo($catalog.ps.logger, "PriceFeed {0}", priceMsg);
  
  // Update the LastPrice of the given Item
  // Read the server's instance
  read(Item, priceMsg.itemsold.item);
  
  // The received price in the XML is a string so convert it
  $process.ps.price = priceMsg.itemsold.item.UnitPrice;
  
  // Check the price change and update the LastPriceMove field too.
  // We cannot do this in Item.<mutate> because the mutator is
  // not run if no fields have actually changed.
  if (isnull(Item.LastPrice))
  {
    Item.LastPriceMove = enum(LastPriceMove, SAME);
    Item.LastPrice = $process.ps.price;
  }
  else
  {
    if (Item.LastPrice != $process.ps.price)
    {
      if (Item.LastPrice > $process.ps.price)
        Item.LastPriceMove = enum(LastPriceMove, DOWN);
      else
        Item.LastPriceMove = enum(LastPriceMove, UP);

      Item.LastPrice = $process.ps.price;
    }
    else
      Item.LastPriceMove = enum(LastPriceMove, SAME);
  }
}