Leave feedback
  • InDepth Part 6 - StreamServer Runtime Engine - Job Execution

Write an Article
Wednesday 10 February, 2010
Stefan Cohen Stefan Cohen StreamServe Employee Administrator
7 likes 6867 views

This is part six in a series of articles where I'm going to describe the internal workings of the StreamServer.

This part will focus on the job execution logic of the StreamServer Runtime Engine.

The concepts introduced in this article are essential for understanding big parts of the scripting logic that I will write about in later articles, so read carefully.

Job Execution

Overview

This article will describe in detail what happens inside the StreamServer during the execution of a job. The job concept and the different phases of the job execution will be introduced first and these phases will be described in detail after that.

Note: Job execution in Persuasion is slightly different compared to previous releases (3.X and 4.x). This is to facilitate more flexible job execution and better audit trails of job execution in the future. A new and improved reconnect and recovery model and the move to a flexible choice of repositories also made this inevitable. The main difference is that the jobs are created to form a tree with one parent and multilevel siblings In StreamServer 5.X.


The StreamServe Job

This section will describe the different levels of jobs that exist in the streamserver: Trackers, Input Jobs and Output Jobs. It will also explore the relationship between jobs, job identifiers and queues.
All Queued jobs have a unique ID stored in the repository. To be backwards compatible and provide a more human friendly notation there is also a sequence number for the job, this is for obvious reasons not guaranteed to be globally unique but is unique within one application domain.

Top Jobs or Trackers

A Tracker or a "Top Job" is created by an input connector when it receives an incoming datastream for processing. This can for example be a file found by a directory scan Input Connector.

Processing / Formatting  Job

At some point in time; usually when an input connector has stored an incoming stream of data, the Steamserver is notified that there is a queue item for it to process. Many options can be made here but only the simplest scenario is described here.

When the StreamServer receives the notification it creates a job and de-queues (reads) the data. The datastream is processed and formatting begins. In the current implementation of the StreamServer there is normally (there are exceptions, but they will not be covered here) a 1 to 1 releation ship with the incoming stream on the connector and the StreamServer processing job. This is for backwards compatability with 4.X. In saying this there is already a notable exception to this which is if a job is resent on the inqueue (from streamstudio or via a web service). Then a new job is created (with the same tracker as a parent but with a new processing job. This facilitates individual tracking on all jobs.

The normal case is that an incoming datastream equals to one processing job. This means that all Events (i.e documents) found within a file that was received by connector will be processed within the same Input Job context.

Each Job is treated separatelly from other Jobs and there is no direct exchange of information between the Jobs. If data from one Job should be used in another Job then this data has to be written to a shared intermediate storage during the execution of the Job from where the data originates and read from this storage in the Job that needs the data. This intermediate storage is normally the queues.

 

Output Jobs

An Output Job is one output entity (think file) that the StreamServer has created and that should be delivered to some destination. One procesing (input) Job can result in many Output Jobs. Each Output Job can contain several documents. Since the Output Job has always been created by an Input Job there is a dependency from the Output Job to the Input Job.

Queues

Queuing in 5.X differs compared to 4.X and 3.X as persisting and queueing are separated. Queuing is essentially the sending of an event to a queue (normally enqueue or dequeue events, but could also be delete event etc) the data itself is already persisted in a earlier stage. One reason for this that it separates the writing to the repository (which can and often is several gigabytes of data and the actual queuing which must  be instant. It also enables multiple subscribers to an event and any number of persistence stages in during the execution.

 

Job Identifiers

Each Job, (trackers, input and output) must have a unique identifier. Each server assigns a unique identifier to every Job when it is creates the job.
The Output Jobs (or an sibling job) is created and sets it parent jobid to its parent. This means it is always possible to follow the link upwards to the top. In addition the tracker (the top job) is also stored for performance reasons. Both a sequential number is created and a globally unique id.

The unique job identifier is an internal JobID, but there is also the possibility (optional) to set an external JobID. The external JobID can be used to track data sent to and from streamserve by third-party applications. These IDs can be read and set using scripts (see the GetExtJobId(), GetIntJobId(), GetTopJobId(), SetExtJobId() functions)

 

Execution Phases

The processing of an Input Job is divided into three execution phases, first the input phase, then the formatting phase and finally the output phase. These three phases corresponds to the highest level of job status. This makes it important for client applications to understand what is being done in each of these phases. The following sections will describe each of the three phases in more detail.

 

image

 

Input phase

The input phase starts as soon as an Input Connector finds data and ends when the complete Input Job has been stored in the Input Queue. A top job or Tracker is created in the repository to keep track of this incoming data and all output that might come from this input item. No parsing or formatting of the data takes place in the input phase. The data is seen as a binary object during this phase.

The input phase deals with receiving the complete input data stream and have it stored in the Input Queue as fast as possible. When it has been stored in the Queue, the Input Job has been created and has been assigned a job identifier. For a client application this means that the Job has been successfully posted into the StreamServe system and even if the system should go down, StreamServe will be able to process the Job later. The Job has now the StreamServe status "received" in the eyes of the client application.

Formatting phase

During the formatting phase all transformations and formatting is done. Simply put, this is where the StreamServer does the actual work and processes the Job. The formatting phase starts when a Job is picked from an Input Queue for processing. The Job has been stored in the Queue during the input phase and has until this point just been a binary object. From the moment when the formatting phase starts this binary object will be viewed and treated like a batch of structured business documents.
The primary tasks that must be performed during the formatting phase are to detect documents (Events) and extract information from the documents into messages, run processes that will create new output and store this output in an Output Queue. The incoming data is also prepared in the input pipeline in this phase. Business logic and conditional formatting in the shape of scripts can also be applied. The scripts also make it possible to interact with external applications and data sources.


The Input Job in the Input Queue will be transformed into a number of Output Jobs in the Output Queue during the execution of the formatting phase. The formatting phase will end when the complete incoming Job has been processed and all Output Jobs created during the job execution has successfully been stored in the Output Queue. For a client application this means that the job has been successfully transformed and formatted and is ready to be delivered and even if the system goes down StreamServe will be able to deliver the Output Jobs later. The job has now status "formatted" in the eyes of the client.


The Input Job will be marked as aborted if an error occurs during the execution and it will be persisted in the Input Queue until an administrator either resends or deletes the job. For a client application this means that the job was "aborted" and the administrator of the system should be notified.


The rest of this article will describe in detail what is happening during the execution of the formatting phase. The execution phase is separated into three steps, Collect, Pre-Process and Execute.

image

 

Job Engine

The Job Engine controls the execution of the Job. It will make sure that the three execution steps are executed in the right order. It will also launch the different processes and scripts that need to be executed. The Job Engine takes all its decisions by applying the rules defined in the Execution Model to the data in the Job.
In the description below all other components in the StreamServer will be referred to as processing units. This is to focus on the important parts of the Job execution and not dig in too much in all details.

Execution steps

The StreamServer processes jobs synchronously. Basically, data is sent through a number of processing units resulting in a transformation of the incoming data stream to the desired output stream. Each of the three execution steps is executed in whole before the next step is started, which means that all data in the Job is processed by each step before the next step starts its processing and that the data is processed three times. Each step will transform the data to a new format and after the last step it will have the desired structure and format.

The collect step is executed first. The incoming data is sent through a number of processing units. The stream is processed synchronously, resulting in an intermediate format that is stored in a temporary storage. Next the Pre-Process step starts where the intermediate file created by the collect step is read and synchronously processed. The result is a new intermediate format in the temporary storage. Finally the execute phase reads this file and synchronously processes this file and creates the final output stream.


The execution scheme described above means that the job being formatted always has a specific state. The different processing units get notified about the state by receiving signals. These are the signals that tell the processing units that we are entering or leaving a specific execution step.

Collect

The Collect step is responsible for analyzing the incoming data, to detect business documents in form of StreamServe Events and to create a StreamServe message for each event. Sorting of incoming documents (Events) within a job is also done during the collect step. The description below assumes that the input analyzer has already decided which Agent to use (This was covered in article number 3 in this series).

The units responsible for detecting the Events are the Agents. When the Agent analyses the input stream and recognizes it as something that should trigger an Event, the Job begins internally in the communication server. This means that the Job Engine signals JobBegin and CollectBegin to all processing units. The Agent will now start to scan the input stream for fields and new Events. All fields found are stored in a message structure that is associated with the current Event. If, in the Execution Model, the field is designated to create a script variable, this is done at this stage. If a new Event is found then it will be added to a list of found Events and any fields found after this will be associated with the Message for this Event. This procedure continues until a list of all Events, with all fields, in the input job has been created. The Messages created is stored in a sequence in an intermediate temporary storage. As the last step in the Collect phase the created (and stored) Messages can be sorted.


Information about whether sorting should be carried out is stored in the Execution Model and the Job Engine will sort the Messages if sorting has been switched on. The sorting is based on the values of the script variables. To add more logic to sorting, a script is run after each Event has been collected. This is called the Event Retrieved Script (since the fields and variables have been retrieved). Here, the variables for the Event have already been created and the variables can be conditionally updated in the script, which in turn affects the result of sorting. The Skip() script function can also be called to delete this Event from the Job.


The Job Engine now signals CollectEnd to the rest of the StreamServer.
It is important to note that the size of the Message will affect performance since the active message will be kept in memory. The Message will be written down to the temporary storage before the next message is created. The message is written down to a storage and it will be read and recreated during the Pre-Process and Process. This means that it is very important to only extract the information that actually is going to be used when creating the desired output.

Pre-process

When the Collect step is finished the Job Engine will execute the pre process step. The purpose of the Pre-Process step is to retrieve information about how all data driven dynamic processing is evaluated. This is necessary since the device drivers need to know which resources (e.g. fonts, overlays) are going to be used before they start to actually create the output. We also need to know the total number of pages for page-oriented output before we start to actually produce the output. This means that we have to find out how many output jobs that will be created from the current input job and exactly where the boundaries, in terms of processes, between these output jobs will be located. It is also possible to decide in a script that a particular process execution should be skipped, i.e. not executed. This information is also needed before starting to actually create output.

In the Pre-Process step the intermediate storage created during the Collect step is read one Message at a time. For each Message read, the Job Engine recreates the Message in memory using the information in the Collect storage. All Processes and scripts belonging to that Event are executed next using the recreated Message. Nothing is however sent to the output pipeline from the pre-process phase. For each Message the message is again written down into an intermediate storage together with the information that was gathered during the Pre-Process.
Note that the size of the message again affects performance.

Note: All scripts associated with a Process are executed during the pre-process phase. All calculated values and data written to transactional data sources are rolled back after the pre-process, but external connections and data commited to external non-transactional data sources (read files) can't be rolled back, so it becomes the Project developer's responsibility to manage this in his scripts. See the preprocess() function for more info.

Execute

Now all information needed to actually create the desired output is available. In the Execute step the Processes and scripts will be executed as in the Pre-Process step, but now output can be created since we with the information gathered during Pre-Process can look ahead in the processing.
Since we have executed all the scripts in the Pre-Process we have to rollback all the variables of the scripting language to the value that they had before the Pre-Process step. We also have to do a rollback of all external transactional data sources that the script language might have updated. E.g. databases updated with the ODBC scripting functions, etc.


Before starting to create any output the signal OutputBegin is sent to all processing units from the Job Engine. During the Execution step the intermediate storage create in the Pre-Process is read one Message at a time. For each read Message the internal message structure is recreated and then the Processes and scripts defined in the Execution Model are executed. Normally this execution is much faster in the Execution step than in the Pre-Process step. This since we have stored a lot of information about the processing during the Pre-processing that we now do not have to do again. Each process sends the output it creates to an output pipeline. The output pipeline will handle the split of the output into output jobs and store the output jobs in an output queue. If needed the output pipeline will apply a device driver.


When all output jobs have been written down to the output queue the signal OutputEnd is sent, followed by the signal JobEnd. The JobEnd signal tells all processing units that the input job has been formatted. This marks the end of the formatting phase and we now have transformed the Input Job in the Input Queue to a number of Output Jobs, with desired structure and format, in the Output Queue.

Output Phase

The Output Phase starts as soon as Output Jobs have been written to the Output Queue and ends when all Output Jobs belonging to the Input Job has been delivered. No parsing or manipulation of the data takes place in the output phase. The data is seen as a binary object during this phase.
The only task that should be fulfilled during the output phase is to deliver all output jobs created by the input job in a fast, safe and controlled way.
If the output connector fails to deliver an Output Job it will wait and retry later. If the maximum number of retries (configurable) has been reached, the Job is marked as "aborted" and can later be resent.
When all Output Jobs have been delivered this means, for a client application, that the Input Job has been successfully delivered and that the StreamServer has finished processing the Job. The Job has now the StreamServe status "successfully processed" in the eyes of the client application.

InDepth Part 7 - StreamServer Runtime Engine - Threads and Scalability

Comments (1)

  • Hi Stefan,

    Nice article!

    could you explain me something. I want to share some variable between two jobs what do you think is the simplest way of doing that, you mentioned that we can do it by using intermediate storage of queues, could you explain how. I need to share one varaible in all the Jobs.

     

    also i have a strange situation with streamserve"Jobs"

    we  have one test eniornment running in version 4.1 where we have differnt project for different documents and offcourse the JJOB is also different but we are able to access all the variables in all the JOBS.

    But in the new version we are unable to do so.

    here is the log from old version

    0429-155553: (2391) Start:dir=../input/*.*,2,0

    0429-155553: (2391) Start:dir=../input/record_schefenacker/*.*,0,0

    0429-162241: (2440) Found: ..\input/d1400853042676.dsi

    0429-162241: (4290) StrsID completed successfully (StrsID):6157

    0429-162241: (1719) JobBegin:dir=../input/*.*,2,0

    0429-162241: (1726) Collect Event:DEVICE_PROCESS_alveole < dir=../input/*.*,2,0

    0429-162241: (1726) Collect Event:OA_MFG_EVENT < dir=../input/*.*,2,0

    0429-162241: (1155) ******VN DEVICEmail

     

    DEVICE_PROCESS_alveole and OA_MFG_EVENT are in different designcenter project but i am able to access variable DEVICE from DEVICE_PROCESS_alveol to OA_MFG_EVENT

    but in new version i have different log it is more like you explained in your article.

    0429 162022 (1719) 2 JobBegin:directoryscanner=input_page

    0429 162022 (1726) 2 Collect Event:DEVICE_PROCESS_alveole < directoryscanner=input_page

    0429 162022 (5026) 3 Sender of job set to:

    0429 162022 (1730) 3 preproc message:DEVICE_PROCESS_alveole < directoryscanner=input_page

    0429 162022 (1731) 2 Doing message:DEVICE_PROCESS_alveole < directoryscanner=input_page

    0429 162022 (1723) 2 JobEnd:directoryscanner=input_page

     

    0429 162022 (1719) 2 JobBegin:directoryscanner=input_page

    0429 162022 (1726) 2 Collect Event:OA_MFG_EVENT < directoryscanner=input_page

    0429 162022 (1726) 2 Collect Event:OA_MFG_EVENT < directoryscanner=input_page

    0429 162022 (1726) 2 Collect Event:OA_MFG_EVENT < directoryscanner=input_page

    0429 162022 (1726) 2 Collect Event:OA_MFG_EVENT < directoryscanner=input_page

    0429 162022 (1726) 2 Collect Event:OA_MFG_EVENT < directoryscanner=input_page

    0429 162022 (1726) 2 Collect Event:OA_MFG_EVENT < directoryscanner=input_page

    we are migrating the version and we want it to be like we have in old version to have access to all the variables

    Is there any way you can manipulate the processing of Job engine to first collect all the event from all the JOBS and then process them?

     

    Please let me know if you have any idea on this.

    Wednesday 30 April, 2014 by vishvash Nigam

   


Post comment