# 12. `dask` and Large Datasets Computations

Some people think that large datasets or "big data" are mostly applied to machine learning and artificial intelligence fields. In atmospheric science, 

> Big data refers to data sets that are so voluminous and complex that traditional data processing application software is inadequate to deal with them.

Due to the long periods and finer grid resolutions of modern reanalysis data or model outputs, it often leads to system overload if not processed properly. You may see the following error message:

`MemoryError: Unable to allocate 52.2 GiB for an array with shape (365, 37, 721, 1440) and data type float32`

This error message appears because the data size has exceeded the RAM capacity. How should we avoid this situation?

## Dask 

`dask`` is a flexible library for parallel computing in Python. It can scale up to operate on large datasets and perform computations that cannot fit into memory. dask achieves this by breaking down large computations into smaller tasks, which are then executed in parallel. This can avoid consuming large amount of RAM. 

To understand the usage of dask, with demonstrate first with a 1000 × 4000 array size. 

**1. Numpy Array:**

In [1]:
import numpy as np

shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np

array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]])

**2. Dask Array:**

In [2]:
import dask.array as da

ones = da.ones(shape)
ones

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,30.52 MiB
Shape,"(1000, 4000)","(1000, 4000)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 30.52 MiB 30.52 MiB Shape (1000, 4000) (1000, 4000) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",4000  1000,

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,30.52 MiB
Shape,"(1000, 4000)","(1000, 4000)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


![](https://docs.dask.org/en/latest/_images/dask-array.svg)

Dask devides the entire array into sub-arrays named "chunk". In `dask`, we can specify the size of a chunk.

In [3]:
chunk_shape = (1000, 1000)
ones = da.ones(shape, chunks=chunk_shape)
ones

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,7.63 MiB
Shape,"(1000, 4000)","(1000, 1000)"
Dask graph,4 chunks in 1 graph layer,4 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 30.52 MiB 7.63 MiB Shape (1000, 4000) (1000, 1000) Dask graph 4 chunks in 1 graph layer Data type float64 numpy.ndarray",4000  1000,

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,7.63 MiB
Shape,"(1000, 4000)","(1000, 1000)"
Dask graph,4 chunks in 1 graph layer,4 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


We can do some arithmetic calculations, such as multiplication and averaging.

In [4]:
ones_mean = (ones * ones[::-1, ::-1]).mean()
ones_mean

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 6 graph layers,1 chunks in 6 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Dask graph 1 chunks in 6 graph layers Data type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 6 graph layers,1 chunks in 6 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


Following is the calculation procedure:

![](https://earth-env-data-science.github.io/_images/dask_arrays_16_0.png)

Dask allows computation of each chunk in each memory core, and finally combines all the computation of each chunk to a final result. For more advanced usages of dask, see [`dask` official tutorial](https://www.dask.org/).

Dask integrates commonly-used functions in `numpy` and `xarray`, which is beneficial to processing climate data. Then how will `dask` help with large datasets?


## Large Climate Dataset Processing

In Unit 2, we introduced the `parallel=True` option in `xarray.open_mfdataset`. This option allows `xarray` to read the file using `dask` array. Therefore, the system will read data using multi-core computation to speed up the reading process. Now, we read the NCEP R2 850-hPa wind field and specify the size of chunks by `chunks={'time': 183, 'level': 1, 'longitude': 93*2, 'latitude': 91*2}` then calculate climatology, anomaly, and plot.

In [5]:
import xarray as xr
import dask

with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    uds = xr.open_mfdataset('./data/ncep_r2_uv850/u850.*.nc',
                               combine = "by_coords",               
                               parallel=True,
                               chunks={'time':183,  'longitude':36,'latitude':24}
                             )
    vds = xr.open_mfdataset('data/ncep_r2_uv850/v850.*.nc',
                               combine = "by_coords",               
                               parallel=True,
                               chunks={'time':183,  'longitude':36,'latitude':24}
                             )
%time

CPU times: user 12 μs, sys: 2 μs, total: 14 μs
Wall time: 26.5 μs


In [6]:
u = uds.sel(lat=slice(90,0)).uwnd
v = vds.sel(lat=slice(90,0)).vwnd
%time
u

CPU times: user 9 μs, sys: 2 μs, total: 11 μs
Wall time: 23.1 μs


Unnamed: 0,Array,Chunk
Bytes,178.17 MiB,3.72 MiB
Shape,"(8766, 1, 37, 144)","(183, 1, 37, 144)"
Dask graph,48 chunks in 50 graph layers,48 chunks in 50 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 178.17 MiB 3.72 MiB Shape (8766, 1, 37, 144) (183, 1, 37, 144) Dask graph 48 chunks in 50 graph layers Data type float32 numpy.ndarray",8766  1  144  37  1,

Unnamed: 0,Array,Chunk
Bytes,178.17 MiB,3.72 MiB
Shape,"(8766, 1, 37, 144)","(183, 1, 37, 144)"
Dask graph,48 chunks in 50 graph layers,48 chunks in 50 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


Now we see the chunk information. This is because of the dask **lazy computation** that the program didn't really compute with real data values. At this moment, the data is not loaded in the RAM of the system.

Now, we are going to calculate the climatological mean. However, the built-in functions `groupby()` and `resample()` in `xarray` are not well supported. Therefore, we will compute with the `flox` libray, which is a library with significantly faster groupby calculations. The `flox` groupby method is [`flox.xarray.xarray_reduce`](https://flox.readthedocs.io/en/latest/generated/flox.xarray.xarray_reduce.html). 

In [7]:
from flox.xarray import xarray_reduce

uDayClm = xarray_reduce(u,u.time.dt.dayofyear,func='mean',dim='time')
vDayClm = xarray_reduce(u,u.time.dt.dayofyear,func='mean',dim='time')

uDayClm 

Unnamed: 0,Array,Chunk
Bytes,7.44 MiB,7.44 MiB
Shape,"(366, 1, 37, 144)","(366, 1, 37, 144)"
Dask graph,1 chunks in 58 graph layers,1 chunks in 58 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 7.44 MiB 7.44 MiB Shape (366, 1, 37, 144) (366, 1, 37, 144) Dask graph 1 chunks in 58 graph layers Data type float32 numpy.ndarray",366  1  144  37  1,

Unnamed: 0,Array,Chunk
Bytes,7.44 MiB,7.44 MiB
Shape,"(366, 1, 37, 144)","(366, 1, 37, 144)"
Dask graph,1 chunks in 58 graph layers,1 chunks in 58 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


After lazy computation, we can now really trigger computation and return the final result with `.compute()`.

In [8]:
uDayClm_fn = uDayClm.compute()
vDayClm_fn = vDayClm.compute()

uDayClm_fn

The computed result is now saved in a DataArray. 

### Best Practices with Working with `dask`

1. Save intermediate results to disk as a netCDF files (using `to_netcdf()`) and then load them again with `open_dataset()` for further computations.
2. Specify smaller chunks across space when using `open_mfdataset()` (e.g., `chunks={'latitude': 10, 'longitude': 10}`)。
3. Chunk as early as possible, and avoid rechunking as much as possible. Always pass the `chunks={}` argument to `open_mfdataset()` to avoid redundant file reads.
4. `groupby()` is a costly operation and will perform a lot better if the flox package is installed. See the `flox` documentation for more. By default Xarray will use `flox` if installed.
