Contents

  1. Introduction
  2. First example
  3. Parameters
  4. Subjobs
  5. Job
  6. Tasks
  7. Task

Introduction

Workflow definitions are made using XML structures and XPath 2 expressions. This XML is used during the execution of the workflow to keep the current state of tasks and jobs updated. This is the internal representation used by evQueue.

A workflow instance represents a workflow being executed. It contains data related to this particular execution (including input parameters and tasks output). You will have one new instance each time a workflow is launched. This instance is initially created by copying the workflow XML and replacing workflow parameters. Nodes are then modified as tasks are launched or terminated.

EvQueue uses several concepts to define workflows:

All tasks take input parameters which can be fixed parameters (a static string in workflow definition) or dynamic. A dynamic parameter uses an XPath expression to refer to workflow instance data. It can possibly refer to any of its parent jobs or tasks, including tasks output. Using a task output as input for another task is very useful for executing dependent actions. For example compressing a file created by a task with a name depending on the time.

A task can be declared as “XML” or as “Text”. An XML task is a task with XML output. A text task is a task with standard text output. A task which output will be used in the workflow should be declared “XML” and return well formated XML. Including a root element is not necessary as an “output” element is appended in the task definition by evQueue before appending task output. Invalid XML output will stop the workflow execution as the nodes can't be imported in the workflow instance.

First example

As a first example, we will create a workflow with a single task, which lists the files in a directory passed in parameter. We will use the existing “ls” command to do this. This command has to be declared with command line arguments and text output.

Here is the workflow definition:

<workflow>
<parameters>
<parameter name="filename"></parameter>
</parameters>
<subjobs>
<job>
<tasks>
<task name="lsqueue="default">
<input name="filename">
<value select="/workflow/parameters/parameter[@name='filename']"></value>
</input>
</task>
</tasks>
</job>
</subjobs>
</workflow>

Analyse of the workflow definition

The first section is used to declare the workflow input parameters. These parameters are passed to the workflow instance when it is launched and are mandatory. The empty “parameter” nodes will be filled with their values at the instantiation.

The “subjobs” node is the beginning tasks definitions. It contains a list of jobs which will be executed in parallel. In this example we have only one job. The job itself does not define actions to do. It acts as a container for one or more tasks (which will execute the commands and do the real work).

So in the job we have the tasks that should be executed for this job to be completed. Again, here we have only one task “ls”. This task takes one command line parameter, defined with the “input” node. In this case, instead of using a static parameter, we use an XPath expression to read the workflow input parameter “filename”. All input parameters must have a name, even if it is not used in command line mode. This name may be empty, however it is not recommended as it helps workflow understanding.

Analyse of the workflow instance

So what happens when this workflow is instantiated?

First, the parameters will be filled with their values. Tasks will then be executed as described in the workflow definition. Before a task is executed, all its input parameters are parsed, and XPath expressions are replaced by their value. Here is the workflow instance at the end of the execution:

<workflow end_time="2015-02-26 13:48:29errors="0id="143name="test_lsstart_time="2015-02-26 13:48:29status="TERMINATED">
<parameters>
<parameter name="filename">/var</parameter>
</parameters>
<subjobs>
<job>
<tasks>
<task execution_time="2015-02-26 13:48:29name="lsqueue="defaultretval="0status="TERMINATED">
<input name="filename">/var</input>
<output execution_time="2015-02-26 13:48:29exit_time="2015-02-26 13:48:29method="textretval="0">agentx backups cache chef lib local lock log lost+found mail opt run spool tmp www </output>
</task>
</tasks>
</job>
</subjobs>
</workflow>

We can see the that the value of the filename parameter is “/var”. The input of the “ls” task has also been changed by evaluating the XPath expression. A new node “output” has been added by evQueue and filled with the task output. Here we can see the directories under “/var”. We can also see information about the task execution, as execution time and return code.

Parameters

The “parameters” node must be under the workflow root element. It can contain any number of parameters (including none) that must be passed to the workflow for instantiation. It is not possible to instantiate a workflow without specifying all of the input parameters.

Example:

<parameters>
<parameter name="filename"></parameter>
<parameter name="extension"></parameter>
</parameters>

Subjobs

One “subjobs” node (and exactly one) must be under root element. It is the entry point of the workflow. Subjobs can only contain jobs. Execution will start with all the jobs under this node. All the jobs of a “subjobs” node are executed in parallel.

Each “job” node can optionnaly contain a “subjobs” node, defining which jobs should be executed when the current job is terminated.

Example:

<job>
<tasks>
<task name="dump_databasequeue="default"></task>
</tasks>
<subjobs>
<job>
<tasks>
<task name="bzipqueue="default">
<input name="filename">
<value select="tasks/task[@name='dump_database']/output/dump_filename"></value>
</input>
</task>
</tasks>
</job>
</subjobs>
</job>

Job

A job is a container for tasks. It contains at least 1 task but can contain an unlimited number of tasks. All the tasks in a job are executed in parallel. These tasks are generally subdivisions of a task and parallelism is used for speeding things up. The job can contain one or more subjobs that are defined in the “subjobs” node. The subjobs are executed when the job is terminated, that is: when all of its tasks are terminated. If an error occurs (non zero return code) in any task, the subjobs are not executed.

A job can have 3 optional attributes:

Example:

<job>
<tasks>
<task name="list_db_mysqlqueue="default"></task>
</tasks>
<subjobs>
<job loop="tasks/task[@name='list_db_mysql']/output/databases/database">
<tasks>
<task name="dump_db_mysqlqueue="local-dump">
<input name="dbname">
<value select="."></value>
</input>
</task>
</tasks>
</job>
</subjobs>
</job>

In this example, we have one job that will output an XML containing a list of databases. Its subjob will be duplicated to backup all the databases. We can see that the backup task uses relative XPath value to get the name of the current database in the loop.

Tasks

The “tasks” node must be present only once under each “job” node. It contains the list of tasks to execute for the current job. This node must contains at least one task (you cannot create a job that does nothing).

Task

A task is the base unit of treatment in evQueue. A task is identified by a name which refers to a declared task in evQueue. The execution of a task results in the execution of a program with the specified input parameters. The output of this command is appended to the task in the “output” node. All the tasks of a job are executed in parallel, following the concurrency rules defined by their respective queues.

A task node has the following attributes:

The task node may contain “input” nodes that will be used to specify the parameters passed to the task. These nodes are optional. The “value” node can be used in an input to refer to an XPath expression that should be evaluated at runtime. The “copy” node can be used in an input to copy the fragment of XML refered by its XPath expression (unlike “value” that only selects text nodes). These expressions are affected by the loops in jobs or tasks, meaning their relative path “.” is changed by loops.

The “stdin” node is used to select the data that will be piped to the stdin stream of the task. This node is optional. If absent, no data will be piped to stdin. Like the “input” node, it may contain “value” and “copy” nodes as well as literal strings (or a mix of these nodes). To see how it works, have a look at the ls | wc -l example.

Example:

<job condition="count(tasks/task[@name='unzip']/output/images/image) > 0">
<tasks>
<task name="resizequeue="photo-resizeloop="tasks/task[@name='unzip']/output/images/image">
<input name="filename">
<value select="."></value>
</input>
<input name="format">jpg</input>
</task>
</tasks>
<subjobs>
<job>
<tasks>
<task name="insert_dbqueue="default">
<input name="album">
<value select="/workflow/parameters/parameter[@name='album']"></value>
</input>
</task>
</tasks>
</job>
</subjobs>
</job>

Here we suppose we are treating zip files uploaded by the user. These zip files contain pictures, that must be resized and converted to jpg. Once all the pictures are resized, an album must be created in the databases. The name of the album to create is passed as a workflow parameter.

We have a task (not shown on the XML) named “unzip” that will unzip the zip file and output an XML with each file unzipped. The first job of the example will be skipped by the condition if no files are present in the zip file. Multiple tasks will then be created, one for each file. This allows the execution of resize processes in parallel, with a maximum concurrency according to the queue “photo-resize”. This means that if the zip files contains 100 pictures and the concurrency of the “photo-resize” queue is 4, all the pictures will be resized 4 at a time. This job will be terminated onec all the pictures are resized.

We then have a subjob that will insert the album in the database. This subjob will be executed only when all pictures are resized. If any of the resize fails, the album will not be created and the workflow will return an error status.