Parallel processing with Dask scheduler

Overview

Important Dask schedulers are experimental in Guild. The functionality outlined in this document is available as a pre-release in Guild 0.7.3.rc2 and later. To install the latest pre-release, run pip install --upgrade --pre guildai,

Guild 0.7.3 introduces support for running operations in parallel using Dask.distributed. Dask based scheduling is implemented by the built-in dask:scheduler operation. A Dask scheduler is similar to a Guild queues. Unlike a queue, a Dask scheduler is capable of running operations concurrently based on a specified number of workers.

To start a Dask scheduler, run:

guild run dask:scheduler

This starts a scheduler, which waits for staged runs. When the scheduler sees a staged run, it starts the run provided it meets the run resource requirements. See Scheduler Resources below for more information.

The scheduler appears like any other run in Guild—e.g. you can show the scheduler by running guild runs. To stop a scheduler, press Ctrl-c in the scheduler terminal or use the stop command from another terminal.

guild stop -Fo dask:scheduler

dask:scheduler supports a number of flags, which are used to configure the scheduler.

Flag Description
workers Number of workers in the Dask cluster
resources Resources provided by the scheduler
dashboard-address Address to listen to for dashboard connections (default is 8787)
loglevel Log level used for Dask scheduler (default is warn)
poll-interval Minimum number of seconds between polls (default is 10)
run-once Run all staged runs and stop (default is no)
wait-for-running Wait for other runs to stop before starting staged runs (default is no)

Use --help-op to view help for the operation.

guild run dask:scheduler --help-op

Requirements

To use the Dask scheduler, install these Python packages:

  • dask[distributed]
  • bokeh (optional support for Dask dashboard—see below for details)

To install the minimal requirements, run:

pip install dask[distributed]

To install all requirements, including support for the Dask dashboard, run:

pip install dask[distributed] bokeh

Example

To illustrate, we use a demo script that waits for a period of time and exits. We use a Dask scheduler to run multiple operations concurrently.

Create Demo Project

Create a new directory. This is your demo project.

mkdir guild-dask-demo

Next, create a file named demo.py in the project directory that contains this code:

import time, os

sleep = 5.0
run_id = os.getenv("RUN_ID")

print("Run %s sleeping for %f seconds" % (run_id, sleep))
time.sleep(sleep)
print("Run %s done" % run_id)

Sample script demo.py located in project directory

Stage Runs

Stage 10 runs using randomly chosen sleep periods from 1 to 10 seconds.

guild run demo.py sleep=[1:10] --trials 10 --stage-trials

This command stages 10 demo.py operations. You can view the staged runs by running:

guild runs --staged

Start a scheduler with the run-once flag. This causes the scheduler to exit after all staged runs are completed.

guild run dask:scheduler run-once=yes workers=10

This commands uses a Dask scheduler to run the staged runs. The scheduler uses 10 workers, which means it can run up to 10 operations concurrently. Note that the runs all start at the same time and finish according to their configured sleep period.

Scheduler Resources

You can start a Dask scheduler with one or more resources. A resource is a named quantity that is available to runs that require the resource. When a run requires a resource, it specifies the required quantity. If that quantity is not available to the scheduler, the run waits until the quantity is available. When started, the run temporarily depletes its required quantity from the scheduler. When the run finishes, it replenishes the depleted quantity. In this way, resources limit what is run by a scheduler to avoid resource exhaustion.

Resources can be used to constrain runs by available memory, GPUs, disk space or any other resource type including abstract resources like job size or user quotas.

Resources are specified using name value pairs in the format NAME=VALUE. Multiple resources are specified by separating name value pairs with whitespace.

Available Resources

To specify resources that are available to a scheduler, use the scheduler’s resources flag when starting the operation. For example, to make 100 GB of memory available to scheduled runs, you can start a scheduler like this:

guild run dask:scheduler resources=MEM=100e9

The name MEM in this case is arbitrary. You can use any name provided that runs use the same name when defining their required resources. See Required Resources below for more information.

To specify multiple resources, separate each name value pair with a space. In this case you must quote the command line argument. For example, to include a CPU resource, run:

guild run dask:scheduler resources='MEM=100e9 CPU=32'

Required Resources

To constrain a run within available resources, use a tag that starts with dask: and contains a valid resource spec. For example, to stage a run that requires 50 GB, run:

guild run <op> --stage --tag dask:MEM=50e9 

To specify more than one required resource, you can use multiple tags or use space-separated name value pairs. The following two commands are equivalent:

guild run <op> --stage --tag dask:MEM=50e9 --tag dask:CPU=8
guild run <op> --stage --tag dask:'MEM=50e9 CPU=8'

Scheduling Runs on Specific GPUs

You can use scheduler resources to constrain runs to specific GPU devices. There are three parts to this scheme:

  • When starting a scheduler, define a resource for each GPU device. To ensure that only one run is started per GPU device, use the resource value 1. Use larger quantities as needed to support multiple runs per GPU device.

  • When staging a run, specify the target GPU devices using the applicable resource names.

  • To limit the visible GPU devices to a run, stage the run using the --gpus option with applicable device IDs.

To illustrate, consider a system with four GPUs with device IDs of 0, 1, 2, and 3.To make all four GPUs available as scheduler resources that can run at most one operation at a time, start a scheduler like this:

guild run dask:scheduler resources='GPU0=1 GPU1=1 GPU2=1 GPU3=1'

In this case, each named resource represents the corresponding GPU ID.

To stage an operation that uses GPU0—i.e. GPU device with ID 0—run:

guild run <op> --stage --tag dask:GPU0=1 --gpus 0

This command stages a run with a tag that indicates the run requires the GPU0 resource. If the required quantity—in this case 1—is available, the scheduler starts the run. Otherwise the scheduler waits until the required quantity is available. When the operation starts, its CUDA_VISIBLE_DEVICES environment variable is set to 0, according to the use of --gpu option. This ensures that the operation only sees the target GPU device.

Note The use of --gpus in this example is but one way to control GPU placement for a run. You can use other methods of assigning work to specific GPU devices such as an operation flag and framework-specific code (e.g. TensorFlow’s device contact manager) or by setting CUDA_VISIBLE_DEVICES explicitly.

Dask Dashboard

By default Guild’s Dask based scheduler starts the Dask Dashboard for the underlying Dask cluster. This lets you view run Dask tasks, system status, and Dask logs.

To view the dashboard, you must install the bokeh Python package. See Requirements above for more information.

Guild shows the dashboard link when starting the scheduler. By default, the dashboard runs on port 8787 on the local system. You can specify a different port or binding address using the scheduler’s dashboard-address flag.

For example, to run the dashboard on port 8888, run:

guild run dask:scheduler dashboard-address=8888

To include an IP address or hostname, you can include it in the address. For example, to run the dashboard on port 8888 of the loopback address, run:

guild run dask:scheduler dashboard-address=127.0.0.1:8888