12. dask and Large Datasets Computations

12. dask and Large Datasets Computations#

Some people think that large datasets or “big data” are mostly applied to machine learning and artificial intelligence fields. In atmospheric science,

Big data refers to data sets that are so voluminous and complex that traditional data processing application software is inadequate to deal with them.

Due to the long periods and finer grid resolutions of modern reanalysis data or model outputs, it often leads to system overload if not processed properly. You may see the following error message:

MemoryError: Unable to allocate 52.2 GiB for an array with shape (365, 37, 721, 1440) and data type float32

This error message appears because the data size has exceeded the RAM capacity. How should we avoid this situation?

Dask#

`dask`` is a flexible library for parallel computing in Python. It can scale up to operate on large datasets and perform computations that cannot fit into memory. dask achieves this by breaking down large computations into smaller tasks, which are then executed in parallel. This can avoid consuming large amount of RAM.

To understand the usage of dask, with demonstrate first with a 1000 × 4000 array size.

1. Numpy Array:

import numpy as np

shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np
array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]])

2. Dask Array:

import dask.array as da

ones = da.ones(shape)
ones
Array Chunk
Bytes 30.52 MiB 30.52 MiB
Shape (1000, 4000) (1000, 4000)
Dask graph 1 chunks in 1 graph layer
Data type float64 numpy.ndarray
4000 1000

Dask devides the entire array into sub-arrays named “chunk”. In dask, we can specify the size of a chunk.

chunk_shape = (1000, 1000)
ones = da.ones(shape, chunks=chunk_shape)
ones
Array Chunk
Bytes 30.52 MiB 7.63 MiB
Shape (1000, 4000) (1000, 1000)
Dask graph 4 chunks in 1 graph layer
Data type float64 numpy.ndarray
4000 1000

We can do some arithmetic calculations, such as multiplication and averaging.

ones_mean = (ones * ones[::-1, ::-1]).mean()
ones_mean
Array Chunk
Bytes 8 B 8 B
Shape () ()
Dask graph 1 chunks in 6 graph layers
Data type float64 numpy.ndarray

Following is the calculation procedure:

Dask allows computation of each chunk in each memory core, and finally combines all the computation of each chunk to a final result. For more advanced usages of dask, see dask official tutorial.

Dask integrates commonly-used functions in numpy and xarray, which is beneficial to processing climate data. Then how will dask help with large datasets?

Large Climate Dataset Processing#

In Unit 2, we introduced the parallel=True option in xarray.open_mfdataset. This option allows xarray to read the file using dask array. Therefore, the system will read data using multi-core computation to speed up the reading process. Now, we read the NCEP R2 850-hPa wind field and specify the size of chunks by chunks={'time': 183, 'level': 1, 'longitude': 93*2, 'latitude': 91*2} then calculate climatology, anomaly, and plot.

import xarray as xr
import dask

with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    uds = xr.open_mfdataset('./data/ncep_r2_uv850/u850.*.nc',
                               combine = "by_coords",               
                               parallel=True,
                               chunks={'time':183,  'longitude':36,'latitude':24}
                             )
    vds = xr.open_mfdataset('data/ncep_r2_uv850/v850.*.nc',
                               combine = "by_coords",               
                               parallel=True,
                               chunks={'time':183,  'longitude':36,'latitude':24}
                             )
%time
CPU times: user 10 μs, sys: 2 μs, total: 12 μs
Wall time: 24.1 μs
u = uds.sel(lat=slice(90,0)).uwnd
v = vds.sel(lat=slice(90,0)).vwnd
%time
u
CPU times: user 10 μs, sys: 2 μs, total: 12 μs
Wall time: 22.6 μs
<xarray.DataArray 'uwnd' (time: 8766, level: 1, lat: 37, lon: 144)> Size: 187MB
dask.array<getitem, shape=(8766, 1, 37, 144), dtype=float32, chunksize=(183, 1, 37, 144), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 70kB 1998-01-01 1998-01-02 ... 2021-12-31
  * lon      (lon) float32 576B 0.0 2.5 5.0 7.5 10.0 ... 350.0 352.5 355.0 357.5
  * lat      (lat) float32 148B 90.0 87.5 85.0 82.5 80.0 ... 7.5 5.0 2.5 0.0
  * level    (level) float32 4B 850.0
Attributes: (12/14)
    standard_name:         eastward_wind
    long_name:             Daily U-wind on Pressure Levels
    units:                 m/s
    unpacked_valid_range:  [-140.  175.]
    actual_range:          [-78.96 110.35]
    precision:             2
    ...                    ...
    var_desc:              u-wind
    dataset:               NCEP/DOE AMIP-II Reanalysis (Reanalysis-2) Daily A...
    level_desc:            Pressure Levels
    statistic:             Mean
    parent_stat:           Individual Obs
    cell_methods:          time: mean (of 4 6-hourly values in one day)

Now we see the chunk information. This is because of the dask lazy computation that the program didn’t really compute with real data values. At this moment, the data is not loaded in the RAM of the system.

Now, we are going to calculate the climatological mean. However, the built-in functions groupby() and resample() in xarray are not well supported. Therefore, we will compute with the flox libray, which is a library with significantly faster groupby calculations. The flox groupby method is flox.xarray.xarray_reduce.

from flox.xarray import xarray_reduce

uDayClm = xarray_reduce(u,u.time.dt.dayofyear,func='mean',dim='time')
vDayClm = xarray_reduce(u,u.time.dt.dayofyear,func='mean',dim='time')

uDayClm 
<xarray.DataArray 'uwnd' (dayofyear: 366, level: 1, lat: 37, lon: 144)> Size: 8MB
dask.array<transpose, shape=(366, 1, 37, 144), dtype=float32, chunksize=(366, 1, 37, 144), chunktype=numpy.ndarray>
Coordinates:
  * lon        (lon) float32 576B 0.0 2.5 5.0 7.5 ... 350.0 352.5 355.0 357.5
  * lat        (lat) float32 148B 90.0 87.5 85.0 82.5 80.0 ... 7.5 5.0 2.5 0.0
  * level      (level) float32 4B 850.0
  * dayofyear  (dayofyear) int64 3kB 1 2 3 4 5 6 7 ... 361 362 363 364 365 366
Attributes: (12/14)
    standard_name:         eastward_wind
    long_name:             Daily U-wind on Pressure Levels
    units:                 m/s
    unpacked_valid_range:  [-140.  175.]
    actual_range:          [-78.96 110.35]
    precision:             2
    ...                    ...
    var_desc:              u-wind
    dataset:               NCEP/DOE AMIP-II Reanalysis (Reanalysis-2) Daily A...
    level_desc:            Pressure Levels
    statistic:             Mean
    parent_stat:           Individual Obs
    cell_methods:          time: mean (of 4 6-hourly values in one day)

After lazy computation, we can now really trigger computation and return the final result with .compute().

uDayClm_fn = uDayClm.compute()
vDayClm_fn = vDayClm.compute()

uDayClm_fn
<xarray.DataArray 'uwnd' (dayofyear: 366, level: 1, lat: 37, lon: 144)> Size: 8MB
array([[[[-1.5247940e+00, -1.4514602e+00, -1.3754174e+00, ...,
          -1.7235428e+00, -1.6562518e+00, -1.5960444e+00],
         [-5.6333429e-01, -4.1833475e-01, -2.7458504e-01, ...,
          -9.6874976e-01, -8.4125155e-01, -6.9416922e-01],
         [ 7.5583190e-01,  9.2770594e-01,  1.0993727e+00, ...,
           3.0166462e-01,  4.4770643e-01,  6.0145718e-01],
         ...,
         [-1.4350017e+00, -1.3072938e+00, -1.2437533e+00, ...,
          -3.2308366e+00, -2.5950010e+00, -1.8743768e+00],
         [-7.3541850e-01, -6.0729337e-01, -4.0958592e-01, ...,
          -2.8920867e+00, -2.0904186e+00, -1.2187523e+00],
         [-6.3625211e-01, -6.9604486e-01, -6.7854387e-01, ...,
          -2.7762527e+00, -1.9468769e+00, -1.0535429e+00]]],


       [[[ 1.3624781e-01,  1.8458217e-01,  2.3916422e-01, ...,
          -1.3961315e-02,  3.6873043e-02,  8.3957411e-02],
         [ 2.3895724e-01,  3.3770636e-01,  4.3708053e-01, ...,
          -5.7292778e-02,  4.5207102e-02,  1.4082973e-01],
         [ 2.3937146e-01,  3.5499719e-01,  4.8520657e-01, ...,
...
         [-1.4406258e+00, -1.0618763e+00, -4.6687624e-01, ...,
          -3.2627106e+00, -2.4233358e+00, -1.8195858e+00],
         [-8.7166828e-01, -7.2979182e-01, -4.7916898e-01, ...,
          -2.8472936e+00, -1.9472933e+00, -1.2368768e+00]]],


       [[[-4.1091676e+00, -3.9441681e+00, -3.7783356e+00, ...,
          -4.5566688e+00, -4.4050002e+00, -4.2674980e+00],
         [-3.2116699e+00, -3.1075017e+00, -2.9991691e+00, ...,
          -3.5141671e+00, -3.4233353e+00, -3.3258324e+00],
         [-3.0250015e+00, -3.1033335e+00, -3.1783364e+00, ...,
          -2.5975020e+00, -2.7550004e+00, -2.8883379e+00],
         ...,
         [ 6.7749852e-01,  9.3666548e-01,  8.9166456e-01, ...,
          -2.1883342e+00, -1.1716664e+00, -9.6667372e-02],
         [ 3.4666610e-01,  7.5583148e-01,  1.2258316e+00, ...,
          -2.0166693e+00, -1.1691660e+00, -3.0000091e-01],
         [ 2.3749900e-01,  4.7500062e-01,  9.8333144e-01, ...,
          -1.7283325e+00, -9.7916764e-01, -2.2666772e-01]]]],
      dtype=float32)
Coordinates:
  * lon        (lon) float32 576B 0.0 2.5 5.0 7.5 ... 350.0 352.5 355.0 357.5
  * lat        (lat) float32 148B 90.0 87.5 85.0 82.5 80.0 ... 7.5 5.0 2.5 0.0
  * level      (level) float32 4B 850.0
  * dayofyear  (dayofyear) int64 3kB 1 2 3 4 5 6 7 ... 361 362 363 364 365 366
Attributes: (12/14)
    standard_name:         eastward_wind
    long_name:             Daily U-wind on Pressure Levels
    units:                 m/s
    unpacked_valid_range:  [-140.  175.]
    actual_range:          [-78.96 110.35]
    precision:             2
    ...                    ...
    var_desc:              u-wind
    dataset:               NCEP/DOE AMIP-II Reanalysis (Reanalysis-2) Daily A...
    level_desc:            Pressure Levels
    statistic:             Mean
    parent_stat:           Individual Obs
    cell_methods:          time: mean (of 4 6-hourly values in one day)

The computed result is now saved in a DataArray.

Best Practices with Working with dask#

  1. Save intermediate results to disk as a netCDF files (using to_netcdf()) and then load them again with open_dataset() for further computations.

  2. Specify smaller chunks across space when using open_mfdataset() (e.g., chunks={'latitude': 10, 'longitude': 10})。

  3. Chunk as early as possible, and avoid rechunking as much as possible. Always pass the chunks={} argument to open_mfdataset() to avoid redundant file reads.

  4. groupby() is a costly operation and will perform a lot better if the flox package is installed. See the flox documentation for more. By default Xarray will use flox if installed.