Workflow definitions are made using XML structures and XPath 2 expressions. This XML is used during the execution of the workflow to keep the current state of tasks and jobs updated. This is the internal representation used by evQueue.
A workflow instance represents a workflow being executed. It contains data related to this particular execution (including input parameters and tasks output). You will have one new instance each time a workflow is launched. This instance is initially created by copying the workflow XML and replacing workflow parameters. Nodes are then modified as tasks are launched or terminated.
EvQueue uses several concepts to define workflows:
The smallest unit. It refers to a program that will be executed and whose output will be integrated in the workflow instance.
A job is a group of tasks. The job is considered executed when all its tasks are executed. This concept allows a synchronization of (generally) parallel tasks.
A subjob is executed when its parent job is terminated.
A condition allows a job to be skipped. This can be useful to skip tasks (or group of tasks) depending of workflow parameters or tasks output.
A loop allows jobs or tasks to be replicated using an XPath expression. Task input can depend of the loop iterations.
All tasks take input parameters which can be fixed parameters (a static string in workflow definition) or dynamic. A dynamic parameter uses an XPath expression to refer to workflow instance data. It can possibly refer to any of its parent jobs or tasks, including tasks output. Using a task output as input for another task is very useful for executing dependent actions. For example compressing a file created by a task with a name depending on the time.
A task can be declared as “XML” or as “Text”. An XML task is a task with XML output. A text task is a task with standard text output. A task which output will be used in the workflow should be declared “XML” and return well formated XML. Including a root element is not necessary as an “output” element is appended in the task definition by evQueue before appending task output. Invalid XML output will stop the workflow execution as the nodes can't be imported in the workflow instance.
As a first example, we will create a workflow with a single task, which lists the files in a directory passed in parameter. We will use the existing “ls” command to do this. This command has to be declared with command line arguments and text output.
Here is the workflow definition:
Analyse of the workflow definition
The first section is used to declare the workflow input parameters. These parameters are passed to the workflow instance when it is launched and are mandatory. The empty “parameter” nodes will be filled with their values at the instantiation.
The “subjobs” node is the beginning tasks definitions. It contains a list of jobs which will be executed in parallel. In this example we have only one job. The job itself does not define actions to do. It acts as a container for one or more tasks (which will execute the commands and do the real work).
So in the job we have the tasks that should be executed for this job to be completed. Again, here we have only one task “ls”. This task takes one command line parameter, defined with the “input” node. In this case, instead of using a static parameter, we use an XPath expression to read the workflow input parameter “filename”. All input parameters must have a name, even if it is not used in command line mode. This name may be empty, however it is not recommended as it helps workflow understanding.
Analyse of the workflow instance
So what happens when this workflow is instantiated?
First, the parameters will be filled with their values. Tasks will then be executed as described in the workflow definition. Before a task is executed, all its input parameters are parsed, and XPath expressions are replaced by their value. Here is the workflow instance at the end of the execution:
We can see the that the value of the filename parameter is “/var”. The input of the “ls” task has also been changed by evaluating the XPath expression. A new node “output” has been added by evQueue and filled with the task output. Here we can see the directories under “/var”. We can also see information about the task execution, as execution time and return code.
The “parameters” node must be under the workflow root element. It can contain any number of parameters (including none) that must be passed to the workflow for instantiation. It is not possible to instantiate a workflow without specifying all of the input parameters.
One “subjobs” node (and exactly one) must be under root element. It is the entry point of the workflow. Subjobs can only contain jobs. Execution will start with all the jobs under this node. All the jobs of a “subjobs” node are executed in parallel.
Each “job” node can optionnaly contain a “subjobs” node, defining which jobs should be executed when the current job is terminated.
A job is a container for tasks. It contains at least 1 task but can contain an unlimited number of tasks. All the tasks in a job are executed in parallel. These tasks are generally subdivisions of a task and parallelism is used for speeding things up. The job can contain one or more subjobs that are defined in the “subjobs” node. The subjobs are executed when the job is terminated, that is: when all of its tasks are terminated. If an error occurs (non zero return code) in any task, the subjobs are not executed.
A job can have 3 optional attributes:
An attribute that can be used in XPath expressions to identify the job. It is not used by evQueue.
An XPath expression that will be evaluated. If it evaluates to “false” the job will be skipped (no task will be executed). All subjobs of a skipped job are also skipped.
An XPath expression which should return two or more nodes. The job will be duplicated as many times as nodes are returned by this expression. A loop changes the XPath context to the current node being evaluated (this is important for tasks inputs).
In this example, we have one job that will output an XML containing a list of databases. Its subjob will be duplicated to backup all the databases. We can see that the backup task uses relative XPath value to get the name of the current database in the loop.
The “tasks” node must be present only once under each “job” node. It contains the list of tasks to execute for the current job. This node must contains at least one task (you cannot create a job that does nothing).
A task is the base unit of treatment in evQueue. A task is identified by a name which refers to a declared task in evQueue. The execution of a task results in the execution of a program with the specified input parameters. The output of this command is appended to the task in the “output” node. All the tasks of a job are executed in parallel, following the concurrency rules defined by their respective queues.
A task node has the following attributes:
The name of the task to be executed. This name must have been declared in evQueue. This parameter is mandatory.
The queue to which the task will be attached. The queue is used for limiting the maximum concurrency of tasks (or defining mutual exclusion by using a concurrency of 1). This parameter is mandatory.
The retry schedule associated to the task. If a retry schedule is associated to a task and the execution fails (non zero return code), the task will be retried as specified by the retry schedule. If any of the execution is successful, the task is considered successful. If all the executions fail, the task is considered failed. The output corresponding to each of the executions is appended, so it is possible to have multiple “output” nodes. This parameter is optional.
An XPath expression that should return at least two nodes. The task will be duplicated once for each node returned by the evaluation. This loop also changes the XPath context.
The task node may contain “input” nodes that will be used to specify the parameters passed to the task. These nodes are optional. The “value” node can be used in an input to refer to an XPath expression that should be evaluated at runtime. The “copy” node can be used in an input to copy the fragment of XML refered by its XPath expression (unlike “value” that only selects text nodes). These expressions are affected by the loops in jobs or tasks, meaning their relative path “.” is changed by loops.
The “stdin” node is used to select the data that will be piped to the stdin stream of the task. This node is optional. If absent, no data will be piped to stdin. Like the “input” node, it may contain “value” and “copy” nodes as well as literal strings (or a mix of these nodes). To see how it works, have a look at the ls | wc -l example.
Here we suppose we are treating zip files uploaded by the user. These zip files contain pictures, that must be resized and converted to jpg. Once all the pictures are resized, an album must be created in the databases. The name of the album to create is passed as a workflow parameter.
We have a task (not shown on the XML) named “unzip” that will unzip the zip file and output an XML with each file unzipped. The first job of the example will be skipped by the condition if no files are present in the zip file. Multiple tasks will then be created, one for each file. This allows the execution of resize processes in parallel, with a maximum concurrency according to the queue “photo-resize”. This means that if the zip files contains 100 pictures and the concurrency of the “photo-resize” queue is 4, all the pictures will be resized 4 at a time. This job will be terminated onec all the pictures are resized.
We then have a subjob that will insert the album in the database. This subjob will be executed only when all pictures are resized. If any of the resize fails, the album will not be created and the workflow will return an error status.