User Guide

Installation

Install the clustermq package in R from CRAN. This will automatically detect if ZeroMQ is installed and otherwise use the bundled library:

# Recommended:
#   If your system has `libzmq` installed but you want to enable the worker
#   crash monitor, set the environment variable below to use the bundled
#   `libzmq` library with the required feature (`-DZMQ_BUILD_DRAFT_API=1`):

# Sys.setenv(CLUSTERMQ_USE_SYSTEM_LIBZMQ=0)
install.packages("clustermq")

Alternatively you can use the remotes package to install directly from Github. Note that this version needs autoconf/automake and CMake for compilation:

# Sys.setenv(CLUSTERMQ_USE_SYSTEM_LIBZMQ=0)
# install.packages('remotes')
remotes::install_github("mschubert/clustermq")
# remotes::install_github("mschubert/clustermq@develop") # dev version

In the develop branch, we will introduce code changes and new features. These may contain bugs, poor documentation, or other inconveniences. This branch may not install at times. However, feedback is very welcome.

For any installation issues please see the FAQ.

Configuration

An HPC cluster’s scheduler ensures that computing jobs are distributed to available worker nodes. Hence, this is what clustermq interfaces with in order to do computations.

By default, we will take whichever scheduler we find and fall back on local processing. This will work in most, but not all cases. You may need to configure your scheduler.

Setting up the scheduler

To set up a scheduler explicitly, see the following links:

You may in addition need to activate compute environments or containers if your shell (e.g. ~/.bashrc) does not do this automatically.

Check the FAQ if your job submission/call to Q errors or gets stuck.

Local parallelization

While this is not the main focus of the package, you can use it to parallelize function calls locally on multiple cores or processes. This can also be useful to test your code on a subset of the data before submitting it to a scheduler.

SSH connector

There are reasons why you might prefer to not to work on the computing cluster directly but rather on your local machine instead. RStudio is an excellent local IDE, it’s more responsive than and feature-rich than browser-based solutions (RStudio server, Project Jupyter), and it avoids X forwarding issues when you want to look at plots you just made.

Using this setup, however, you lost access to the computing cluster. Instead, you had to copy your data there, and then submit individual scripts as jobs, aggregating the data in the end again. clustermq is trying to solve this by providing a transparent SSH interface.

In order to use clustermq from your local machine, the package needs to be installed on both there and on the computing cluster. On the computing cluster, set up your scheduler and make sure clustermq runs there without problems. Note that the remote scheduler can not be LOCAL (default if no HPC scheduler found) or SSH for this to work.

# If this is set to 'LOCAL' or 'SSH' you will get the following error:
#  Expected PROXY_READY, received ‘PROXY_ERROR: Remote SSH QSys is not allowed’
options(
    clustermq.scheduler = "multiprocess" # or multicore, LSF, SGE, Slurm etc.
)

On your local machine, add the following options:

options(
    clustermq.scheduler = "ssh",
    clustermq.ssh.host = "user@host", # use your user and host, obviously
    clustermq.ssh.log = "~/cmq_ssh.log" # log for easier debugging
)

We recommend that you set up SSH keys for password-less login.

Usage

The Q function

The following arguments are supported by Q:

Behavior can further be fine-tuned using the options below:

The full documentation is available by typing ?Q.

Examples

The package is designed to distribute arbitrary function calls on HPC worker nodes. There are, however, a couple of caveats to observe as the R session running on a worker does not share your local memory.

The simplest example is to a function call that is completely self-sufficient, and there is one argument (x) that we iterate through:

fx = function(x) x * 2
Q(fx, x=1:3, n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 2
#> 
#> [[2]]
#> [1] 4
#> 
#> [[3]]
#> [1] 6

Non-iterated arguments are supported by the const argument:

fx = function(x, y) x * 2 + y
Q(fx, x=1:3, const=list(y=10), n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 12
#> 
#> [[2]]
#> [1] 14
#> 
#> [[3]]
#> [1] 16

If a function relies on objects in its environment that are not passed as arguments (including other functions), they can be exported using the export argument:

fx = function(x) x * 2 + y
Q(fx, x=1:3, export=list(y=10), n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 12
#> 
#> [[2]]
#> [1] 14
#> 
#> [[3]]
#> [1] 16

If we want to use a package function we need to load it on the worker using the pkgs parameter, or referencing it with package_name:::

f1 = function(x) splitIndices(x, 3)
Q(f1, x=3, n_jobs=1, pkgs="parallel")
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [[1]][[1]]
#> [1] 1
#> 
#> [[1]][[2]]
#> [1] 2
#> 
#> [[1]][[3]]
#> [1] 3

f2 = function(x) parallel::splitIndices(x, 3)
Q(f2, x=8, n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [[1]][[1]]
#> [1] 1 2 3
#> 
#> [[1]][[2]]
#> [1] 4 5
#> 
#> [[1]][[3]]
#> [1] 6 7 8

# Q(f1, x=5, n_jobs=1)
# (Error #1) could not find function "splitIndices"

As parallel foreach backend

The foreach package provides an interface to perform repeated tasks on different backends. While it can perform the function of simple loops using %do%:

library(foreach)
foreach(i=1:3) %do% sqrt(i)
#> [[1]]
#> [1] 1
#> 
#> [[2]]
#> [1] 1.414214
#> 
#> [[3]]
#> [1] 1.732051

it can also perform these operations in parallel using %dopar%:

foreach(i=1:3) %dopar% sqrt(i)
#> Warning: executing %dopar% sequentially: no parallel backend registered
#> [[1]]
#> [1] 1
#> 
#> [[2]]
#> [1] 1.414214
#> 
#> [[3]]
#> [1] 1.732051

The latter allows registering different handlers for parallel execution, where we can use clustermq:

# set up the scheduler first, otherwise this will run sequentially

# this accepts same arguments as `Q`
# the number of jobs is ignored here since we're using the LOCAL scheduler
clustermq::register_dopar_cmq(n_jobs=2, memory=1024)

# this will be executed as jobs
foreach(i=1:3) %dopar% sqrt(i)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 1
#> 
#> [[2]]
#> [1] 1.414214
#> 
#> [[3]]
#> [1] 1.732051

As BiocParallel supports foreach too, this means we can run all packages that use BiocParallel on the cluster as well via DoparParam.

library(BiocParallel)

# the number of jobs is ignored here since we're using the LOCAL scheduler
clustermq::register_dopar_cmq(n_jobs=2, memory=1024)
register(DoparParam())
bplapply(1:3, sqrt)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 1
#> 
#> [[2]]
#> [1] 1.414214
#> 
#> [[3]]
#> [1] 1.732051

With targets

The targets package enables users to define a dependency structure of different function calls, and only evaluate them if the underlying data changed.

The targets package is a Make-like pipeline tool for statistics and data science in R. The package skips costly runtime for tasks that are already up to date, orchestrates the necessary computation with implicit parallel computing, and abstracts files as R objects. If all the current output matches the current upstream code and data, then the whole pipeline is up to date, and the results are more trustworthy than otherwise.

It can use clustermq to perform calculations as jobs.

Options

The various configurable options are mentioned throughout the documentation, where applicable, however, we list all of the options here for reference.

Options can be set by including a call to options(<key> = <value>) in your current session or added as a line to your ~/.Rprofile. The former will only be available in your active session, while the latter will be available any time after you restart R.

Debugging workers

Function calls evaluated by workers are wrapped in event handlers, which means that even if a call evaluation throws an error, this should be reported back to the main R session.

However, there are reasons why workers might crash, and in which case they can not report back. These include:

In this case, it is useful to have the worker(s) create a log file that will also include events that are not reported back. It can be requested using:

Q(..., log_worker=TRUE)

This will create a file called <cmq_id>-<array_index>.log in your current working directory, irrespective of which scheduler you use.

You can customize the file name using

Q(..., template=list(log_file = <yourlog>))

Note that in this case log_file is a template field of your scheduler script, and hence needs to be present there in order for this to work. The default templates all have this field included.

In order to log each worker separately, some schedulers support wildcards in their log file names. For instance:

Your scheduler documentation will have more details about the available options.

When reporting a bug that includes worker crashes, please always include a log file.

Environments

In some cases, it may be necessary to activate a specific computing environment on the scheduler jobs prior to starting up the worker. This can be, for instance, because R was only installed in a specific environment or container.

Examples for such environments or containers are:

It should be possible to activate them in the job submission script (i.e., the template file). This is widely untested, but would look the following for the LSF scheduler (analogous for others):

#BSUB-J {{ job_name }}[1-{{ n_jobs }}]  # name of the job / array jobs
#BSUB-o {{ log_file | /dev/null }}      # stdout + stderr
#BSUB-M {{ memory | 4096 }}             # Memory requirements in Mbytes
#BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes
##BSUB-q default                        # name of the queue (uncomment)
##BSUB-W {{ walltime | 6:00 }}          # walltime (uncomment)

module load {{ bashenv | default_bash_env }}
# or: source activate {{ conda | default_conda_env_name }}
# or: your environment activation command
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

This template still needs to be filled, so in the above example you need to pass either

Q(..., template=list(bashenv="my environment name"))

or set it via an option:

options(
    clustermq.defaults = list(bashenv="my default env")
)

Scheduler templates

The package provides its own default scheduler templates, similar to the ones listed below. Which template is used is decided based on which scheduler submission executable is present in the user’s $PATH, e.g. sbatch for SLURM or bsub for LSF. qsub is ambiguous between SGE and PBS/Torque, so in this case options(clustermq.scheduler = "<opt>") should be set to the correct one.

A user can provide their own template file via options(clustermq.template = "<file>"), containing arbitrary template values {{ value | default }}. These values will be filled upon job submission in the following order of priority:

  1. The argument provided to Q(..., template=list(key = value)) or workers(... template=list(key = value))
  2. The value of getOption("clustermq.defaults")
  3. The default value inside the template

LSF

Set the following options in your R session that will submit jobs:

options(
    clustermq.scheduler = "lsf",
    clustermq.template = "/path/to/file/below" # if using your own template
)

To supply your own template, save the contents below with any desired changes to a file and have clustermq.template point to it.

#BSUB-J {{ job_name }}[1-{{ n_jobs }}]  # name of the job / array jobs
#BSUB-n {{ cores | 1 }}                 # number of cores to use per job
#BSUB-o {{ log_file | /dev/null }}      # stdout + stderr; %I for array index
#BSUB-M {{ memory | 4096 }}             # Memory requirements in Mbytes
#BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes
##BSUB-q default                        # name of the queue (uncomment)
##BSUB-W {{ walltime | 6:00 }}          # walltime (uncomment)

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

In this file, #BSUB-* defines command-line arguments to the bsub program.

Once this is done, the package will use your settings and no longer warn you of the missing options.

SGE

Set the following options in your R session that will submit jobs:

options(
    clustermq.scheduler = "sge",
    clustermq.template = "/path/to/file/below" # if using your own template
)

To supply your own template, save the contents below with any desired changes to a file and have clustermq.template point to it.

#$ -N {{ job_name }}               # job name
##$ -q default                      # submit to queue named "default"
#$ -j y                            # combine stdout/error in one file
#$ -o {{ log_file | /dev/null }}   # output file
#$ -cwd                            # use pwd as work dir
#$ -V                              # use environment variable
#$ -t 1-{{ n_jobs }}               # submit jobs as array
#$ -pe smp {{ cores | 1 }}         # number of cores to use per job
#$ -l m_mem_free={{ memory | 1073741824 }} # 1 Gb in bytes

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

In this file, #$-* defines command-line arguments to the qsub program.

Once this is done, the package will use your settings and no longer warn you of the missing options.

SLURM

Set the following options in your R session that will submit jobs:

options(
    clustermq.scheduler = "slurm",
    clustermq.template = "/path/to/file/below" # if using your own template
)

To supply your own template, save the contents below with any desired changes to a file and have clustermq.template point to it.

#!/bin/sh
#SBATCH --job-name={{ job_name }}
##SBATCH --partition=default
#SBATCH --output={{ log_file | /dev/null }}
#SBATCH --error={{ log_file | /dev/null }}
#SBATCH --mem-per-cpu={{ memory | 4096 }}
#SBATCH --array=1-{{ n_jobs }}
#SBATCH --cpus-per-task={{ cores | 1 }}

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

In this file, #SBATCH defines command-line arguments to the sbatch program.

Once this is done, the package will use your settings and no longer warn you of the missing options.

PBS

Set the following options in your R session that will submit jobs:

options(
    clustermq.scheduler = "pbs",
    clustermq.template = "/path/to/file/below" # if using your own template
)

To supply your own template, save the contents below with any desired changes to a file and have clustermq.template point to it.

#PBS -N {{ job_name }}
#PBS -J 1-{{ n_jobs }}
#PBS -l select=1:ncpus={{ cores | 1 }}:mpiprocs={{ cores | 1 }}:mem={{ memory | 4096 }}MB
#PBS -l walltime={{ walltime | 12:00:00 }}
#PBS -o {{ log_file | /dev/null }}
#PBS -j oe
##PBS -q default

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

In this file, #PBS-* defines command-line arguments to the qsub program.

Once this is done, the package will use your settings and no longer warn you of the missing options.

Torque

Set the following options in your R session that will submit jobs:

options(
    clustermq.scheduler = "Torque",
    clustermq.template = "/path/to/file/below" # if using your own template
)

To supply your own template, save the contents below with any desired changes to a file and have clustermq.template point to it.

#PBS -N {{ job_name }}
#PBS -l nodes={{ n_jobs }}:ppn={{ cores | 1 }},walltime={{ walltime | 12:00:00 }}
#PBS -o {{ log_file | /dev/null }}
#PBS -j oe
##PBS -q default

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

In this file, #PBS-* defines command-line arguments to the qsub program.

Once this is done, the package will use your settings and no longer warn you of the missing options.

SSH

While SSH is not a scheduler, we can access remote schedulers via SSH. If you want to use it, first make sure that clustermq works on your server with the real scheduler. Only then move on to setting up SSH.

options(
    clustermq.scheduler = "ssh",
    clustermq.ssh.host = "myhost", # set this up in your local ~/.ssh/config
    clustermq.ssh.log = "~/ssh_proxy.log",     # log file on your HPC
    clustermq.ssh.timeout = 30,    # if changing default connection timeout
    clustermq.template = "/path/to/file/below" # if using your own template
)

The default template is shown below. If R is not in your HPC $PATH, you may need to specify its path or load the required bash modules/conda environments.

To supply your own template, save its contents with any desired changes to a file on your local machine and have clustermq.template point to it.

ssh -o "ExitOnForwardFailure yes" -f
    -R {{ ctl_port }}:localhost:{{ local_port }}
    -R {{ job_port }}:localhost:{{ fwd_port }}
    {{ ssh_host }}
    "R --no-save --no-restore -e
        'clustermq:::ssh_proxy(ctl={{ ctl_port }}, job={{ job_port }})'
        > {{ ssh_log | /dev/null }} 2>&1"