Archive for December, 2013

taskrun – An easy-to-use python package for running tasks with dependencies and process management

Lately I’ve been running lots of network simulations. I’m always running the simulator over and over while varying the simulation parameters. I’ve also written some programs that parse the simulator output file and generate some CSV files and graphs. Each block of simulations is created in a new directory. Running the simulations by hand, and even by shell scripts, has gotten to be VERY tedious.

All my simulations have the same basic style: create a directory to hold a block of simulations, create sub-directories for each simulation, after the simulation completes run the first parsing program, after all simulations in a block have run and the corresponding first parsing program, run a second parsing program to generate graphs from the aggregate data from all simulations in the block. This process is often also parallelized many times across a second-level simulation parameter. As you can see, there is a great deal of parallelism, however, there are also a lot of dependencies. The dependencies create a simple directed acyclic graph (DAG).

In attempt at making the process of running simulations easier and faster, I created a Python package called taskrun. Taskrun has the following features:

  • Task dependency chaining: Each created task can list other tasks as its dependencies and can itself be a dependency for other tasks.
  • Parallelism throttling: A task manager is used to wait until a processor is available before starting a new task. Although the number of ready tasks might be large, it is more efficient to only run as many tasks at one time as there is processors on the machine. This reduces unnecessary cache thrashing and context switching. This can also be used to nicely share a community machine.
  • Simple task declaration: Tasks are easily declared and dependencies are easily chained. The syntax is easy to use and integrates very easily into for loops.
  • Easy to read output: The output is configurable to optionally show progress status, task commands, and task output. Each task also has the option of redirected the stdout and stderr streams to a file rather than the console.

As an example, I’ll present a sample task dependency graph and corresponding taskrun usage code. For the example, I’ll be running a network simulator and varying two input parameters: network topology and buffer size. I’ll hold the network size constant at 1000 endpoints. The simulator generates a lot of output debugging information so I want to redirect the stdout and stderr streams to a file. Here is the network simulator syntax:

netsim -s num_endpoints -t topology -b buffer_size -o output_file

The simulator outputs a large data file that needs to be parsed based on the statistics of interest. I’ve created a parsing program that extracts packet latencies and writes a CSV file. It has the following syntax:

parsesim -i input_file -o output_file

I have 3 topologies I’d like to test: “fat_tree”, “mesh”, and “torus”. For each topology I want to try 4 buffer sizes: 1k, 2k, 4k, and 8k. After these 4 simulations have ended for a particular topology, the results must be summarized and a graph needs to be generated. I’ve created a parsing program that extracts the data from 4 parsesim outputs, summarizes the results, and generates a graph. It has the following syntax:

graphsim -o output_file [input directory]

Before simulating anything, I like to create a new directory for the entire simulation run. I also create a directory for each topology and each buffer size within each topology. Along with all the simulations and parsing programs, creating the necessary directories are also tasks. I have created a dependency graph for this process as follows:

Process 1 creates a directory called “sims” holding all outputs, processes 2-4 create topology specific directories beneath “sims”, and processes 5-16 create directories beneath the corresponding topology directory for the corresponding buffer size. Processes 17-28 are the actual network simulations (./netsim). Processes 29-40 extract packet latencies from the simulation outputs and write CSV files (./parsesim). Processes 41-43 summarize the data of their corresponding topology and generate graphs.

The following code shows how to use the taskrun package to generate and run the process dependency graph described above:

#!/usr/bin/env python

import os
import taskrun

# instantiate a Task Manager by which all processes will be controlled
manager = taskrun.Task.Manager(
    numProcs = 8,        # this defaults to the number of processors on the machine
    showCommands = True, # print each command as it is run
    runTasks = True,     # actually run the command (False is good for testing)
    showProgress = True) # show progress as a percentage

# these will guide the for loops
topologies     = [ 'fat_tree', 'mesh', 'torus' ]
buffer_sizes   = [ '1024', '2048', '4096', '8192' ]
root_dir_name  = 'sims'

# create a task that will create a root directory for all the simulation data
root_dir = manager.task_new('make root', 'mkdir ' + root_dir_name)

for topology in topologies:

    # create a task that will create a topology directory
    topo_dir = manager.task_new(topology + ' dir', 'mkdir ' + os.path.join(root_dir_name, topology))

    # create a task for generating topology summary graphs
    cmd = 'graphsim -o ' + os.path.join(root_dir_name, topology, 'graph.png') + \
        ' ' + os.path.join(root_dir_name, topology)
    out = os.path.join(root_dir_name, topology, 'graph.out')
    topo_graph = manager.task_new(topology + ' summary', cmd, out)

    for buffer_size in buffer_sizes:

        # create a task that will create a buffer size directory
        size_dir = manager.task_new(topology + '-' + buffer_size + ' dir',
                                    'mkdir ' + os.path.join(root_dir_name, topology, buffer_size))

        # create a task for a simulation
        cmd = 'netsim -s 1000 -t ' + topology + ' -b ' + buffer_size + ' -o ' + \
            os.path.join(root_dir_name, topology, buffer_size, 'sim.dat')
        out = os.path.join(root_dir_name, topology, buffer_size, 'sim.out')
        simulation = manager.task_new(topology + '-' + buffer_size + ' sim', cmd, out)

        # create a task for
        cmd = 'parsesim -i ' + os.path.join(root_dir_name, topology, buffer_size, 'sim.dat') + \
            ' -o ' + os.path.join(root_dir_name, topology, buffer_size, 'latency.csv')
        out = os.path.join(root_dir_name, topology, buffer_size, 'latency.out')
        parse = manager.task_new(topology + '-' + buffer_size + ' parse', cmd, out)

        # link the 'topo_graph' task to all 'parse' tasks of this topology

# run all processes from the task manager in dependency order

There are a few interesting things to note in this code sample. First, I’ve set the parallelization parameter ‘numProcs’ to 8, so there will be at most 8 processes running at a time. If this parameter is None or not given, the default value is set to the number of processors on the machine, which is generally what is wanted anyway. The second thing to notice is that taskrun works very well with for loops, which is very common for simulation runs where simulation parameters are being swept.

The progress status and error codes that are generated by taskrun print to the console in color. The colored output is utilized by a package called termcolor. Taskrun will run without termcolor, but the output will not be colored. Termcolor can be found at:

Taskrun is still very new, but I have found it to be extremely useful. I recently used it as part of a simulation sequence that had dependency chains up to 8 deep and ran a total of over 500 simulations. The total simulation run took days to complete, and taskrun held up.

I can’t quite decide what the next features of taskrun will be. I’ve thought about adding a feature that saves the Task Manager state to a file when a process dies prematurely, then after fixing the problem the user can resume processing from where it left off. There is nothing worse than simulating for 20 hours before finding a problem! My only concern is the numerous corner cases that would have to be covered by this approach.

If any of you have any suggestions for future features, please let me know.