Posts Tagged ‘ python ’

ParaMgmt: Interacting with thousands of servers over SSH (part 1)

Google-datacenter_2

While working at Google in the Platforms Networking research group, I was tasked with running network performance benchmarks on large clusters of servers. Google has their own internal application scheduling system, but for unmentionable reasons, I couldn’t use this for my tests. I needed 100% control of the servers. I resulted to SSH and SCP.

A common benchmark assigns some servers as senders and others as receivers. A typical test sequence would go something like this:

  1. Build the benchmark binaries on my local workstation.
  2. Copy the sender binary and receiver binary to the sender and receiver servers, respectively.
  3. Copy the configuration files to the servers.
  4. Run the test.
  5. Copy the results files from the servers back to my workstation.
  6. Parse and analyze the results.

This process became VERY tedious. As a result, I wrote a software package to do this more efficiently and productively. It is called ParaMgmt, and it is now open-source on GitHub (https://github.com/google/paramgmt). ParaMgmt is a python package designed to ease the burden of interacting with many remote machines via SSH. The primary focus is on parallelism, good error handling, automatic connection retries, and nice viewable output. The abilities of ParaMgmt include running local commands, running remote commands, transferring files to and from remote machines, and executing local scripts on remote machines. This package includes command-line executables that wrap the functionality provided by the Python package.

The GitHub page describes how to install the software. The easiest method is to use pip and the GitHub link:

nic@myworkstation$ pip3 install --user \
> git+https://github.com/google/paramgmt.git

All you need to use the software is a list of remote hosts you want to interact with. I’ll be focusing on the command-line executables in this post, so let’s start by making a file containing our hosts:

nic@myworkstation$ cat << EOF >> hosts.txt
> 10.0.0.100
> 10.0.0.101
> 10.0.0.102
> EOF

Now that we have our hosts file, let’s run some remote commands. There are 6 command line executables:

  • rhosts = Remote hosts – just prints each remote host.
  • lcmd = Local command – runs commands locally for each remote host.
  • rcmd = Remote command – runs commands remotely on each remote host.
  • rpush = Remote push – pushes files to each remote host.
  • rpull = Remote pull – pulls files from each remote host.
  • rscript = Remote script – runs local scripts on each remote host.

First make sure you’ve setup key-based authentication with all servers (tutorial). Now let’s use the ‘rhosts’ executable to verify our hosts file, and also try adding more hosts on the command line.

nic@myworkstation$ rhosts -f hosts.txt
10.0.0.100
10.0.0.101
10.0.0.102
nic@myworkstation$ rhosts -f hosts.txt -m abc.com 123.com
abc.com
123.com
10.0.0.100
10.0.0.101
10.0.0.102

Let’s verify that SSH works using the ‘rcmd’ executable:

nic@myworkstation$ rcmd -f hosts.txt -- whoami
rcmd [10.0.0.100]: whoami
stdout:
nic
rcmd [10.0.0.101]: whoami
stdout:
nic
rcmd [10.0.0.102]: whoami
stdout:
nic
3 succeeded, 0 failed, 3 total

You can see that we remotely logged in and successfully executed the ‘whoami’ command on each host. All 3 connections executed in parallel. ParaMgmt uses coloring as a better way to view the output. In our example, the execution was successful, so the output is green. If the command output text to stderr, ParaMgmt will color the output yellow if the command still exited successfully, and red if it exited with error status. Upon an error, ParaMgmt also states how many attempts were made, the return code, and reports the hosts that failed.

nic@myworkstation$ rcmd -f hosts.txt -- 'echo some text 1>&2'
rcmd [10.0.0.100]: echo some text 1>&2
stderr:
some text
rcmd [10.0.0.101]: echo some text 1>&2
stderr:
some text
rcmd [10.0.0.102]: echo some text 1>&2
stderr:
some text
3 succeeded, 0 failed, 3 total
nic@myworkstation$ rcmd -f hosts.txt -- \
> 'echo some text 1>&2; false'
rcmd [10.0.0.100]: echo some text 1>&2; false
stderr:
some text
return code: 1
attempts: 1
rcmd [10.0.0.101]: echo some text 1>&2; false
stderr:
some text
return code: 1
attempts: 1
rcmd [10.0.0.102]: echo some text 1>&2; false
stderr:
some text
return code: 1
attempts: 1
0 succeeded, 3 failed, 3 total

Failed hosts:
10.0.0.100
10.0.0.101
10.0.0.102

ParaMgmt has a great feature that makes it extremely useful, namely automatic retries. Commands in ParaMgmt will automatically retry when an SSH connection fails. This hardly ever occurs when you are communicating with only 3 servers, but when you use ParaMgmt to connect to thousands of servers potentially scattered across the planet, all hell breaks loose. The automatic retry feature of ParaMgmt hides all the annoying network issues. It defaults to a maximum of 3 attempts, but this is configurable on the command line with the “-a” option.

Now that we can run remote commands, let’s try copying files to and from the remote machines:

nic@myworkstation$ rpush -f hosts.txt -d /tmp -- f1.txt
rpush [10.0.0.100]: f1.txt => 10.0.0.100:/tmp
rpush [10.0.0.101]: f1.txt => 10.0.0.101:/tmp
rpush [10.0.0.102]: f1.txt => 10.0.0.102:/tmp
3 succeeded, 0 failed, 3 total

nic@myworkstation$ rpull -f hosts.txt -d /tmp -- \
> /tmp/f2.txt /tmp/f3.txt
rpull [10.0.0.100]: 10.0.0.100:{/tmp/f2.txt,/tmp/f3.txt} => /tmp
rpull [10.0.0.101]: 10.0.0.101:{/tmp/f2.txt,/tmp/f3.txt} => /tmp
rpull [10.0.0.102]: 10.0.0.102:{/tmp/f2.txt,/tmp/f3.txt} => /tmp
3 succeeded, 0 failed, 3 total

As shown in this examples, ParaMgmt is able to push and pull many files simultaneously. ParaMgmt is also able to run a local script on a remote machine. You could do this by doing an rpush then an rcmd, but it is faster and cleaner to use ‘rscript’, as follows:

nic@myworkstation$ cat << EOF >> s1.sh
> #!/bin/bash
> echo -n "hello "
> echo -n `whoami`
> echo ", how are you?"
> EOF
nic@myworkstation$ rscript -f hosts.txt -- s1.sh
rscript [10.0.0.100]: running s1.sh
stdout:
Welcome to Ubuntu 14.10 (GNU/Linux 3.16.0-39-generic x86_64)
hello nic, how are you?
rscript [10.0.0.101]: running s1.sh
stdout:
Welcome to Ubuntu 14.10 (GNU/Linux 3.16.0-39-generic x86_64)
hello nic, how are you?
rscript [10.0.0.102]: running s1.sh
stdout:
Welcome to Ubuntu 14.10 (GNU/Linux 3.16.0-39-generic x86_64)
hello nic, how are you?
3 succeeded, 0 failed, 3 total

There is one more really cool feature of ParaMgmt I should cover. Often times, the remote hostname should be used in a command. For instance, after a benchmark has been run on all servers and you want to collect the data from the servers using the ‘rpull’ command, it would be nice if there was a corresponding local directory for each remote host. For this, we can use the ‘lcmd’ executable, with ParaMgmt’s hostname replacement feature. Any instance of “?HOST” in the command will be translated to the corresponding hostname. This works with all executables and is even applied on text within scripts used in the ‘rscript’ executable.

nic@myworkstation$ lcmd -f hosts.txt -- mkdir /tmp/res?HOST
lcmd [10.0.0.100]: mkdir /tmp/res10.0.0.100
lcmd [10.0.0.101]: mkdir /tmp/res10.0.0.101
lcmd [10.0.0.102]: mkdir /tmp/res10.0.0.102
3 succeeded, 0 failed, 3 total
nic@myworkstation$ rpull -f hosts.txt -d /tmp/res?HOST -- res.txt
rpull [10.0.0.100]: 10.0.0.100:res.txt => /tmp/res10.0.0.100
rpull [10.0.0.101]: 10.0.0.101:res.txt => /tmp/res10.0.0.101
rpull [10.0.0.102]: 10.0.0.102:res.txt => /tmp/res10.0.0.102
3 succeeded, 0 failed, 3 total

Here is an example of using the hostname auto-replacement in a script. I’ve just added the “?HOST” to the previous script example:

nic@myworkstation$ cat << EOF >> s1.sh
> #!/bin/bash
> echo -n "hello "
> echo -n `whoami`
> echo "@?HOST, how are you?"
> EOF
nic@myworkstation$ rscript -f hosts.txt -- s1.sh
rscript [10.0.0.100]: running s1.sh
stdout:
Welcome to Ubuntu 14.10 (GNU/Linux 3.16.0-39-generic x86_64)
hello nic@10.0.0.100, how are you?
rscript [10.0.0.101]: running s1.sh
stdout:
Welcome to Ubuntu 14.10 (GNU/Linux 3.16.0-39-generic x86_64)
hello nic@10.0.0.101, how are you?
rscript [10.0.0.102]: running s1.sh
stdout:
Welcome to Ubuntu 14.10 (GNU/Linux 3.16.0-39-generic x86_64)
hello nic@10.0.0.102, how are you?
3 succeeded, 0 failed, 3 total

ParaMgmt is fast and efficient. It handles all SSH connections in parallel freeing you from wasting your time on less-capable scripts. ParaMgmt’s command line executables are great resources to be used in all sorts of scripting environments. To really get the full usefulness of ParaMgmt, import the Python package into your Python program and unleash concurrent SSH connections to remote machines.

Unix Domain Sockets vs Loopback TCP Sockets

Two communicating processes on a single machine have a few options. They can use regular TCP sockets, UDP sockets, unix domain sockets, or shared memory. A recent project I was working on used Node.js with two communicating processes on the same machine. I wanted to know how to reduce the CPU utilization of the machine, so I ran a few experiments to compare the efficiency between unix domain sockets and TCP sockets using the loopback interface. This post covers my experiments and test results.

First off, is a disclaimer. This test is not exhaustive. Both client and server are written in Node.js and can only be as efficient as the Node.js runtime.

All code in this post is available at: github.com/nicmcd/uds_vs_tcp

Server Application

I created a simple Node.js server application that could be connected to via TCP socket or Unix domain socket. It simply echos all received messages. Here is the code:

var assert = require('assert');
assert(process.argv.length == 4, 'node server.js <tcp port> <domain socket path>');

var net = require('net');

var tcpPort = parseInt(process.argv[2]);
assert(!isNaN(tcpPort), 'bad TCP port');
console.log('TCP port: ' + tcpPort);

var udsPath = process.argv[3];
console.log('UDS path: ' + udsPath);

function createServer(name, portPath) {
    var server = net.createServer(function(socket) {
        console.log(name + ' server connected');
        socket.on('end', function() {
            console.log(name + ' server disconnected');
        });
        socket.write('start sending now!');
        socket.pipe(socket);
    });
    server.listen(portPath, function() {
        console.log(name + ' server listening on ' + portPath);
    });
}

var tcpServer = createServer('TCP', tcpPort);
var udsServer = createServer('UDS', udsPath);

Client Application

The client application complements the server application. It connects to the server via TCP or Unix domain sockets. It sends a bunch of randomly generated packets and measures the time it takes to finish. When complete, it prints the time and exits. Here is the code:

var assert = require('assert');
assert(process.argv.length == 5, 'node client.js <port or path> <packet size> <packet count>');

var net = require('net');
var crypto = require('crypto');

if (isNaN(parseInt(process.argv[2])) == false)
    var options = {port: parseInt(process.argv[2])};
else
    var options = {path: process.argv[2]};
console.log('options: ' + JSON.stringify(options));

var packetSize = parseInt(process.argv[3]);
assert(!isNaN(packetSize), 'bad packet size');
console.log('packet size: ' + packetSize);

var packetCount = parseInt(process.argv[4]);
assert(!isNaN(packetCount), 'bad packet count');
console.log('packet count: ' + packetCount);

var client = net.connect(options, function() {
    console.log('client connected');
});

var printedFirst = false;
var packet = crypto.randomBytes(packetSize).toString('base64').substring(0,packetSize);
var currPacketCount = 0;
var startTime;
var endTime;
var delta;
client.on('data', function(data) {
    if (printedFirst == false) {
        console.log('client received: ' + data);
        printedFirst = true;
    }
    else {
        currPacketCount += 1;
        if (data.length != packetSize)
            console.log('weird packet size: ' + data.length);
        //console.log('client received a packet: ' + currPacketCount);
    }

    if (currPacketCount < packetCount) {
        if (currPacketCount == 0) {
            startTime = process.hrtime();
        }
        client.write(packet);
    } else {
        client.end();
        endTime = process.hrtime(startTime);
        delta = (endTime[0] * 1e9 + endTime[1]) / 1e6;
        console.log('millis: ' + delta);
    }
});

Running a Single Test

First start the server application with:

node server.js 5555 /tmp/uds

This starts the server using TCP port 5555 and Unix domain socket /tmp/uds.

Now we can run the client application to get some statistics. Let’s first try the TCP socket. Run the client with:


node client.js 5555 1000 100000

This runs the client application using TCP port 5555 and sends 100,000 packets all sized 1000 bytes. This tooks 8006 milliseconds on my machine. We can now try running with the Unix domain socket with:


node client.js /tmp/uds 1000 100000

This runs the client the same as before except it uses the /tmp/uds Unix domain socket instead of the TCP socket. On my machine this took 3570 milliseconds to run. These two runs show that for 1k byte packets, Unix domain sockets are about 2-3x more efficient than TCP sockets.
At this point you might be completely convinced that Unix domain sockets are better and you’ll use them whenever you can. That’s too easy. Let’s run the client application a whole bunch of times and graph the results.
I recently posted about a python package I created for running many tasks and aggregating the data. I thought this socket comparison would make a good example.

Running the Full Test

As mentioned, running the full test uses the Taskrun Python package (available at github.com/nicmcd/taskrun). The script I quickly hacked together to run the client application and parse the results is as follows:


import taskrun
import os

POWER = 15
RUNS = 10
PACKETS_PER_RUN = 100000

manager = taskrun.Task.Manager(
    numProcs = 1,
    showCommands = True,
    runTasks = True,
    showProgress = True)

DIR = "sims"
mkdir = manager.task_new('dir', 'rm -rI ' + DIR + '; mkdir ' + DIR)

def makeName(stype, size, run):
    return stype + '_size' + str(size) + '_run' + str(run)

def makeCommand(port_or_path, size, name):
    return 'node client.js ' + port_or_path + ' ' + str(size) + ' ' + str(PACKETS_PER_RUN) + \
        ' | grep millis | awk \'{printf "%s, ", $2}\' > ' + os.path.join(DIR, name)

barrier1 = manager.task_new('barrier1', 'sleep 0')
for exp in range(0, POWER):
    size = pow(2, exp)
    for run in range(0, RUNS):
        # Unix domain socket test
        name = makeName('uds', size, run)
        task = manager.task_new(name, makeCommand('/tmp/uds', size, name))
        task.dependency_is(mkdir)
        barrier1.dependency_is(task)

        # TCP socket test
        name = makeName('tcp', size, run)
        task = manager.task_new(name, makeCommand('5555', size, name))
        task.dependency_is(mkdir)
        barrier1.dependency_is(task)

# create CSV header
filename = os.path.join(DIR, 'uds_vs_tcp.csv')
header = 'NAME, '
for run in range(0, RUNS):
    header += 'RUN ' + str(run) + ', '
hdr_task = manager.task_new('CSV header', 'echo \'' + header + '\' > ' + filename)
hdr_task.dependency_is(barrier1)

# UDS to CSV
cmd = ''
for exp in range(0,POWER):
    size = pow(2, exp)
    cmd += 'echo -n \'UDS Size ' + str(size) + ', \' >> ' + filename + '; '
    for run in range(0, RUNS):
        name = makeName('uds', size, run)
        cmd += 'cat ' + os.path.join(DIR, name) + ' >> ' + filename + '; '
    cmd += 'echo \'\' >> ' + filename + '; '
uds_task = manager.task_new('UDS to CSV', cmd)
uds_task.dependency_is(hdr_task)

# TCP to CSV
cmd = ''
for exp in range(0,POWER):
    size = pow(2, exp)
    cmd += 'echo -n \'TCP Size ' + str(size) + ', \' >> ' + filename + '; '
    for run in range(0, RUNS):
        name = makeName('tcp', size, run)
        cmd += 'cat ' + os.path.join(DIR, name) + ' >> ' + filename + '; '
    cmd += 'echo \'\' >> ' + filename + '; '
tcp_task = manager.task_new('TCP to CSV', cmd)
tcp_task.dependency_is(uds_task)

manager.run_request_is()

Admittedly, this isn’t the prettiest code to look at, but it gets the job done. For both Unix domain socket and TCP socket, it runs the client application for all packet sizes that are a power of 2 from 1 to 16384. Each setup is run 10 times. Each test result is written to its own file. After all the tests have been run, the taskrun script creates a CSV file using all the test results. The CSV file can then be imported into a spreadsheet application for analysis.

Results

I ran this on an Intel E5-2620 v2 processor with 16GB of RAM. I imported the CSV into Excel, averaged the 10 results of each setup, then graphed the results. This first graph shows the execution time compared to packet size on a logarithmic graph.

Execution Time vs. Packet Size

The results shown here are fairly predicable. The Unix domain sockets are always more efficient and the efficiency benefit is in the 2-3x range. After noticing some weird ups and down in the graph, I decided to generate a graph with the execution times normalized to the TCP execution time.

Relative Execution Time vs Packet Size

I’m not exactly sure why the efficiency of Unix domain sockets varies as it does compared to TCP sockets, but it is always better. This is simply because Unix domain sockets don’t traverse the operating system’s network stack. The kernel simply copies the data from the client’s application into the file buffer in the server’s application.

taskrun – An easy-to-use python package for running tasks with dependencies and process management

Lately I’ve been running lots of network simulations. I’m always running the simulator over and over while varying the simulation parameters. I’ve also written some programs that parse the simulator output file and generate some CSV files and graphs. Each block of simulations is created in a new directory. Running the simulations by hand, and even by shell scripts, has gotten to be VERY tedious.

All my simulations have the same basic style: create a directory to hold a block of simulations, create sub-directories for each simulation, after the simulation completes run the first parsing program, after all simulations in a block have run and the corresponding first parsing program, run a second parsing program to generate graphs from the aggregate data from all simulations in the block. This process is often also parallelized many times across a second-level simulation parameter. As you can see, there is a great deal of parallelism, however, there are also a lot of dependencies. The dependencies create a simple directed acyclic graph (DAG).

In attempt at making the process of running simulations easier and faster, I created a Python package called taskrun. Taskrun has the following features:

  • Task dependency chaining: Each created task can list other tasks as its dependencies and can itself be a dependency for other tasks.
  • Parallelism throttling: A task manager is used to wait until a processor is available before starting a new task. Although the number of ready tasks might be large, it is more efficient to only run as many tasks at one time as there is processors on the machine. This reduces unnecessary cache thrashing and context switching. This can also be used to nicely share a community machine.
  • Simple task declaration: Tasks are easily declared and dependencies are easily chained. The syntax is easy to use and integrates very easily into for loops.
  • Easy to read output: The output is configurable to optionally show progress status, task commands, and task output. Each task also has the option of redirected the stdout and stderr streams to a file rather than the console.

As an example, I’ll present a sample task dependency graph and corresponding taskrun usage code. For the example, I’ll be running a network simulator and varying two input parameters: network topology and buffer size. I’ll hold the network size constant at 1000 endpoints. The simulator generates a lot of output debugging information so I want to redirect the stdout and stderr streams to a file. Here is the network simulator syntax:

netsim -s num_endpoints -t topology -b buffer_size -o output_file

The simulator outputs a large data file that needs to be parsed based on the statistics of interest. I’ve created a parsing program that extracts packet latencies and writes a CSV file. It has the following syntax:

parsesim -i input_file -o output_file

I have 3 topologies I’d like to test: “fat_tree”, “mesh”, and “torus”. For each topology I want to try 4 buffer sizes: 1k, 2k, 4k, and 8k. After these 4 simulations have ended for a particular topology, the results must be summarized and a graph needs to be generated. I’ve created a parsing program that extracts the data from 4 parsesim outputs, summarizes the results, and generates a graph. It has the following syntax:

graphsim -o output_file [input directory]

Before simulating anything, I like to create a new directory for the entire simulation run. I also create a directory for each topology and each buffer size within each topology. Along with all the simulations and parsing programs, creating the necessary directories are also tasks. I have created a dependency graph for this process as follows:

Process 1 creates a directory called “sims” holding all outputs, processes 2-4 create topology specific directories beneath “sims”, and processes 5-16 create directories beneath the corresponding topology directory for the corresponding buffer size. Processes 17-28 are the actual network simulations (./netsim). Processes 29-40 extract packet latencies from the simulation outputs and write CSV files (./parsesim). Processes 41-43 summarize the data of their corresponding topology and generate graphs.

The following code shows how to use the taskrun package to generate and run the process dependency graph described above:

#!/usr/bin/env python

import os
import taskrun

# instantiate a Task Manager by which all processes will be controlled
manager = taskrun.Task.Manager(
    numProcs = 8,        # this defaults to the number of processors on the machine
    showCommands = True, # print each command as it is run
    runTasks = True,     # actually run the command (False is good for testing)
    showProgress = True) # show progress as a percentage

# these will guide the for loops
topologies     = [ 'fat_tree', 'mesh', 'torus' ]
buffer_sizes   = [ '1024', '2048', '4096', '8192' ]
root_dir_name  = 'sims'

# create a task that will create a root directory for all the simulation data
root_dir = manager.task_new('make root', 'mkdir ' + root_dir_name)

for topology in topologies:

    # create a task that will create a topology directory
    topo_dir = manager.task_new(topology + ' dir', 'mkdir ' + os.path.join(root_dir_name, topology))
    topo_dir.dependency_is(root_dir)

    # create a task for generating topology summary graphs
    cmd = 'graphsim -o ' + os.path.join(root_dir_name, topology, 'graph.png') + \
        ' ' + os.path.join(root_dir_name, topology)
    out = os.path.join(root_dir_name, topology, 'graph.out')
    topo_graph = manager.task_new(topology + ' summary', cmd, out)

    for buffer_size in buffer_sizes:

        # create a task that will create a buffer size directory
        size_dir = manager.task_new(topology + '-' + buffer_size + ' dir',
                                    'mkdir ' + os.path.join(root_dir_name, topology, buffer_size))
        size_dir.dependency_is(topo_dir)

        # create a task for a simulation
        cmd = 'netsim -s 1000 -t ' + topology + ' -b ' + buffer_size + ' -o ' + \
            os.path.join(root_dir_name, topology, buffer_size, 'sim.dat')
        out = os.path.join(root_dir_name, topology, buffer_size, 'sim.out')
        simulation = manager.task_new(topology + '-' + buffer_size + ' sim', cmd, out)
        simulation.dependency_is(size_dir)

        # create a task for
        cmd = 'parsesim -i ' + os.path.join(root_dir_name, topology, buffer_size, 'sim.dat') + \
            ' -o ' + os.path.join(root_dir_name, topology, buffer_size, 'latency.csv')
        out = os.path.join(root_dir_name, topology, buffer_size, 'latency.out')
        parse = manager.task_new(topology + '-' + buffer_size + ' parse', cmd, out)
        parse.dependency_is(simulation)

        # link the 'topo_graph' task to all 'parse' tasks of this topology
        topo_graph.dependency_is(parse)

# run all processes from the task manager in dependency order
manager.run_request_is()

There are a few interesting things to note in this code sample. First, I’ve set the parallelization parameter ‘numProcs’ to 8, so there will be at most 8 processes running at a time. If this parameter is None or not given, the default value is set to the number of processors on the machine, which is generally what is wanted anyway. The second thing to notice is that taskrun works very well with for loops, which is very common for simulation runs where simulation parameters are being swept.

The progress status and error codes that are generated by taskrun print to the console in color. The colored output is utilized by a package called termcolor. Taskrun will run without termcolor, but the output will not be colored. Termcolor can be found at: https://pypi.python.org/pypi/termcolor

Taskrun is still very new, but I have found it to be extremely useful. I recently used it as part of a simulation sequence that had dependency chains up to 8 deep and ran a total of over 500 simulations. The total simulation run took days to complete, and taskrun held up.

I can’t quite decide what the next features of taskrun will be. I’ve thought about adding a feature that saves the Task Manager state to a file when a process dies prematurely, then after fixing the problem the user can resume processing from where it left off. There is nothing worse than simulating for 20 hours before finding a problem! My only concern is the numerous corner cases that would have to be covered by this approach.

If any of you have any suggestions for future features, please let me know.