Parallelising with Dask

Overview

Teaching: 50 min
Exercises: 30 min
Questions
  • How do we setup and monitor a Dask cluster?

  • How do we parallelise Python code with Dask?

  • How do we use Dask with Xarray?

Objectives
  • Understand how to setup a Dask cluster

  • Recall how to monitor Dask’s performance

  • Apply Dask to work with Xarray

  • Apply Dask futures and delayed tasks

What is Dask?

Dask is a Distributed processing library for Python. It enables parallel processing of Python code across multiple cores on the same computer or across multiple computers. It can be used behind the scenes by Xarray with minimal modification to code. JASMIN users can make use of a Dask gateway that allows their Dask code submitted from the Jupyter notebook interface to run on the Lotus HPC cluster. Dask has two broad categories of features, high level data structures which behave in a similar way to common Python data structures but with the ability to perform operations in parallel and low level task scheduling to run any Python code in parallel.

Setting up Dask on your computer

Dask should already be installed in your Conda/Mamba environment. Dask refers to the system it runs the computation on as a Dask “cluster”, although the “cluster” can just be running on your local computer (or the JASMIN notebook server). Later on we’ll look at using a remote cluster running on a different computer, but for now let’s create one on our own computer.

from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4,
                n_workers=1, memory_limit='2GB')
client

The code above will create a local Dask cluster with one worker and 4 threads for each worker and a limit of 2GB of memory. Displaying the client object will tell us all about the cluster.

Using the Dask dashboard

In the information about the Dask cluster is a link to a Dashboard webpage. From the Dashboard we can monitor our Dask cluster and see how busy it is, view a graph of task dependencies, memory usage and the status of the Dask workers. This can be really useful when checking if our Dask cluster is behaving correctly and working out how optimially our code is making use of Dask’s parallelism.

Dask dashboard graph view

Dask dashboard task view

Using the Dask dashboard on JASMIN

Note that if you are using the JASMIN notebook service, the link to the dashboard won’t work as the port it runs on isn’t open to connections from the internet.

ssh-keygen #MAKE SURE YOU SET A PASSPHRASE
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
ssh -R 8787:localhost:8787 login2.jasmin.ac.uk

Port 8787 might not be the port your Dask cluster is using, make sure the first 8787 is the number your Dask cluster is running on. If anybody else is doing this then port 8787 on login2 might be in use, change the second 8787 to something else to match. Now connect an SSH tunnel from your computer to Jasmin login2 and forward port 8787 back to your computer, if you changed 8787 to something else in the previous step then use the same number here in both cases.

ssh -L 8787:localhost:8787 login2.jasmin.ac.uk

Open your web browser to http://127.0.0.1:8787 and you will see your Dask cluster page. Note that you have just exposed this to anybody else with JASMIN access and there is no password on it.

Dask Arrays

Dask has its own type of arrays, these behave much like Numpy (and Xarray) arrays, but they can be split into a number of chunks. Any processing operations can work in parallel across these chunks. Data can also be loaded “lazily” into Dask Arrays, this means it is only loaded from disk when it is accessed. This can give us the illusion of loading a dataset that is larger than our memory.

Creating a Dask Array

Dask arrays can be created from existing from other array formats including NumPy arrays, Python lists and PandasDataFrames. We can also initalise a new array as a Dask Array from the start, Dask copies the zeros, ones and random functions from NumPy which initalise an array as all zeros, ones or as random numbers. For example to create a 10000x10000 array of random numbers we can use:

import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))
x

Dask arrays support common Numpy operations including slicing, arithmetic whole array operations, reduction functions such as mean and sum, transposing and reshaping.

y = da.ones((10000,10000), chunks = (1000, 1000))
z = x + y
z

In the above example we added the x and y arrays together, but when we display the result we just get an array getitem in response instead of an actual value. This is because Dask hasn’t actually computed the result yet. Dask works by building up a dependency graph of all the operations we’re performing, but doesn’t compute anything until we call the compute function on the final result. Let’s go ahead and call compute on the z object, if we monitor the Dask Dashboard we should see some activity.

result = z.compute()
result

The new variable result will now contain the result of the computation and will be of the type numpy.ndarray.

type(result)

Compare Dask and Numpy Performance

Compare the performance of the following code using Numpy and Dask functions. Use the %%time magic in the cells to find out the execution time. Ensure you only time the core computation and not the Dask cluster setup or library imports, this means you’ll have to write this code into multiple cells. Dask version (note you’ll need to do the Dask client setup first):

import dask.array as da
x = da.random.random((20000,20000), chunks=(1000,1000))
x_mean = x.mean()
x_mean.compute()

Numpy version:

import numpy as np
npx = np.random.random((20000,20000))
npx_mean = npx.mean()

Which went faster overall? Why do you think you got the result you did? Try making the dataset a little larger, going much beyond 25000x25000 might use too much memory. Try running the top command in a terminal while your notebook is running, look at the CPU % when running the Numpy and Dask versions and compare them. Try changing the number of Dask threads and see what effect this has on the CPU %.

Troubleshooting Dask

Sometimes Dask can jam up and stop executing tasks. If this happens try the following:

Using Dask with Xarray

We previously used Xarray to load our temperature anomaly dataset from the Goddard Institute for Space Studies and performed some computational operations against it using Xarray. Let’s go and load it again, but this time we’ll give an extra option to open_dataset, the chunk option which allows us to chunk the Xarray data to prepare it for computing with Dask. The chunk option expects a Python dictionary defining the chunk size for each of the dimensions, any dimension we don’t want to chunk should be set to -1. For example:

import xarray as xr
from dask.distributed import Client
client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')
client
ds = xr.open_dataset("gistemp1200-21c.nc", chunks={'lat':30, 'lon':30, 'time':-1})
ds
da = ds['tempanomaly']
da

Here we see that the Dask DataArray da is now chunked every 30 degrees of Latitude and Longitude. We can also specify automatic chunking by using chunks={}, but with such a small dataset there won’t be any chunking applied automatically.

Any Xarray operations we now apply to the array will now use Dask. Let’s repeat some of our earlier Xarray examples and compute a correction factor to the dataset, if we watch the Dask dashboard we’ll see some signs of activity.

dataset_corrected = ds['tempanomaly'] * 1.1 - 1.0

If we print dataset_corrected we’ll see that it actually contains a Dask array.

print(dataset_corrected)

Dask is “lazy” and doesn’t compute anything until we tell it to. To get Dask to trigger computing the result we need to call .compute on dataset_corrected.

result = dataset_corrected.compute()
result

Low Level computation with Dask

When higher level Dask functions are not sufficient for our needs we can write our own functions and request Dask executes these in parallel. Dask has two different strategies we can use for this, Delayed functions and Futures. Delayed functions will delay starting until we call .compute at which point all the dependencies of the operation we request are executed. With futures tasks begin as soon as possible and immediately return a future object that is eventually populated with the result when the operation completes.

Delayed Tasks

To execute a function as a delayed task we must tag it with a dask.delayed decorator. Here is a simple example:

import dask
@dask.delayed
def apply_correction(x):
   return x * 1.01 + 0.1

import dask.array as da
x = da.random.random(1_000_000, chunks=1000)
corrected = apply_correction(x)

squared = corrected ** 2

result = squared.compute()

This will call the apply_correction function on each of the 1000 chunks that make up the array x and then square the result. But nothing will execute until we call the compute function on the last line. Both squared and corrected will have the type of dask.delayed.Delayed until they have been computed.

Visualising the Task Graph

We have already seen that we can visualise the Dask task graph in the dashboard as it is executing. But we can also visualise it inside a Jupyter notebook by calling the visualize function on a Dask datastructure. We can render this before we call compute if we want to see what is going to happen. This may not always work with larger datasets, our example above with 1,000,000 elements and 1000 chunks is going to be too big, but if we reduce the size of the array x to 10,000 items instead of 1,000,000 then it will be possible.

@dask.delayed
def apply_correction(x):
   return x * 1.01 + 0.1

import dask.array as da
x = da.random.random(10_000, chunks=1000)
corrected = apply_correction(x)

squared = corrected ** 2

squared.visualize()

result = squared.compute()

Futures

An alternative approach to using any function with Dask is to use Dask Futures. These begin execution immediately, but are non-blocking so execution (appears to) proceeds to the next statement while the processing is done in the background. Any objects which are returned by a function will have a Dask future type until the exectuion has completed.

If we want to block until a result is available then we can call the result function. For example taking the code from the last section we can do the following:

import dask.array as da

def apply_correction(x):
   return x * 1.01 + 0.1

def square(x):
   return x ** 2

x = da.random.random(10_000, chunks=1000)
corrected = client.submit(apply_correction, x)

corrected

squared = client.submit(square, corrected)

squared

squared = squared.result()
squared

If we watch the task activity in the Dask dashboard then we should see activity start as soon as the client.submit calls are made. The squared and corrected variables will be Dask future objects, if we display them we will see their status as to whether they are finished or not.

When to use Futures or Delayed

The Dask documentation does not have much advice on when it is more appropriate to use Futures or Delayed functions. Some general advice from the forums is to use Delayed functions and task graphs first, but to switch to futures for more complicated problems.

Using the JASMIN Dask gateway

JASMIN offers a Dask Gateway service which can submit Dask jobs to a special queue on the Lotus cluster. To use this we need to do a bit of extra setup. We will need to import the dask_gateway library and configure the gateway.

import dask_gateway
import dask
gw = dask_gateway.Gateway("https://dask-gateway.jasmin.ac.uk", auth="jupyterhub")

The gateway can be given a set of options including how many worker cores to use, initially we can set this to one and scale it up later. We also need to allocate at least one core as to the scheduler which will manage our Dask cluster. Finally we need to tell Dask which Conda/Mamba environment to use and this needs to match the one we’re running in our notebook.

options = gw.cluster_options()
options.worker_cores = 1
options.scheduler_cores = 1
options.worker_setup='source /apps/jasmin/jaspy/mambaforge_envs/jaspy3.10/mf-22.11.1-4/bin/activate ~/.conda/envs/esces'

Finally we can check if we already had a cluster running and reuse that if we do and then get a client object from the cluster that will behave the same way as the local Dask client did.

clusters = gw.list_clusters()
if not clusters:
    cluster = gw.new_cluster(options, shutdown_on_close=False)
else:
    cluster = gw.connect(clusters[0].name)

client = cluster.get_client()

Now that we have a running cluster we can allow it to adapt and scale up and down as we demand it. This will translate to Slurm jobs being launched on the JASMIN cluster itself. JASMIN allows users to spawn up to 16 jobs in the Dask queue, but one of these will be taken by the scheduler so the we can only launch a maximum of 15 workers.

cluster.adapt(minimum=1, maximum=15)

If we now connect to one of the JASMIN sci servers (sci1-8) we can see our jobs in the SLURM queue by running the squeue command.

ssh -J <jasminusername>@login2.jasmin.ac.uk sci6
squeue -p dask

Once we are done with Dask we can shutdown the cluster by calling its shutdown function. This should cause the jobs in the SLURM queue to finish.

cluster.shutdown()

JASMIN Dask Dashboard

If you display the contents of the client or cluster variable then you will be given an address beginning https://dask-gateway.jasmin.ac.uk that will take you to a Dask dashboard for your cluster. Unfortunately this server is only accessible within the JASMIN network, to access it you will have to use a web browser running inside an NoMachine session or port forward via the JASMIN login server.

Challenge

Setup Dask a Dask cluster on JASMIN. Load the GIS temperature anomaly dataset with Xarray and run the correction algorithm on it. Time how long the compute operation takes by using the %%time magic. Experiment with:

  • Changing the chunk sizes you use in Xarray
  • Changing the number of worker cores
  • Changing the number of workers (set in cluster.adapt)

Key Points

  • Dask is a parallel computing framework for Python

  • Dask creates a task graph of all the steps in the operation we request

  • Dask can use your local computer, an HPC cluster, Kubernetes cluster or a remote system over SSH

  • We can monitor Dask’s processing with its dashboard

  • Xarray can use Dask to parallelise some of its operations

  • Delayed tasks let us lazily evaluate functions, only causing them to execute when the final result is requested

  • Futures start a task immediately but return a futures object until computation is completed