# 12. 大型資料處理

一般認為所謂大型資料或「大數據 (Big Data)」通常應用在機器學習領域上，但在大氣科學上，我們所認定的大數據可能更接近以下維基百科的定義：

> Big data is data sets that are so voluminous and complex that traditional data processing application software are inadequate to deal with them.

大氣科學資料因為時間長、網格解析度和垂直層越來越細的緣故，如果沒有謹慎處理，可能會造成記憶體無法負荷，例如以下錯誤訊息：

`MemoryError: Unable to allocate 52.2 GiB for an array with shape (365, 37, 721, 1440) and data type float32`

會有這樣的情形是因為要分析的資料量已經超過電腦記憶體 (RAM) 的負荷。那要如何避免這個情形發生呢？

## Dask 

Dask是一套Python的套件，可以用電腦多核心(core)來進行平行運算，因此可以提升效率。在計算時，程式不會完全讀入所有的資料，而是以符號的方式先進行運算，這個過程稱為 "lazy computation"，也因此運算的過程不會耗費大量的記憶體 RAM。

為了理解dask如何在xarray上運作，我們先以一組1000 × 4000大小的矩陣來示範。

**1. Numpy矩陣**

In [1]:
import numpy as np

shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np

array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]])

**2. Dask矩陣**

In [2]:
import dask.array as da

ones = da.ones(shape)
ones

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,30.52 MiB
Shape,"(1000, 4000)","(1000, 4000)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 30.52 MiB 30.52 MiB Shape (1000, 4000) (1000, 4000) Count 1 Tasks 1 Chunks Type float64 numpy.ndarray",4000  1000,

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,30.52 MiB
Shape,"(1000, 4000)","(1000, 4000)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray


![](https://docs.dask.org/en/latest/_images/dask-array.svg)

Dask會把矩陣分成許多子矩陣，這些子矩陣稱為"chunk"。在dask中，我們可以指定chunks的大小。

In [3]:
chunk_shape = (1000, 1000)
ones = da.ones(shape, chunks=chunk_shape)
ones

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,7.63 MiB
Shape,"(1000, 4000)","(1000, 1000)"
Count,4 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 30.52 MiB 7.63 MiB Shape (1000, 4000) (1000, 1000) Count 4 Tasks 4 Chunks Type float64 numpy.ndarray",4000  1000,

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,7.63 MiB
Shape,"(1000, 4000)","(1000, 1000)"
Count,4 Tasks,4 Chunks
Type,float64,numpy.ndarray


如果我們做點計算，例如先進行相乘然後再平均

In [4]:
ones_mean = (ones * ones[::-1, ::-1]).mean()
ones_mean

Unnamed: 0,Array,Chunk
Bytes,8 B,8.0 B
Shape,(),()
Count,19 Tasks,1 Chunks
Type,float64,numpy.ndarray
Array Chunk Bytes 8 B 8.0 B Shape () () Count 19 Tasks 1 Chunks Type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8.0 B
Shape,(),()
Count,19 Tasks,1 Chunks
Type,float64,numpy.ndarray


計算過程如下：

![](https://earth-env-data-science.github.io/_images/dask_arrays_16_0.png)

也就是chunk本身會在各自的核心中先進行計算，然後最後再合併一起成為最終的結果。

由以上的計算過程，我們可以大致理解dask的計算原理，更進階的用法可以參閱dask的官方網站說明。

從以上例子我們可以知道，dask矩陣囊括了我們熟知的numpy套件中的函數。其實dask也囊括了xarray的函數，這對於我們要使用dask輔助處理大氣科學中大型資料是非常有利的。那麼dask怎麼幫助我們加速資料處理呢？

## 大型氣象資料處理

在第二單元中，我們已經介紹在開啟多個檔案`xarray.open_mfdataset`時，可以加上`parallel=True`來加快計算速度，這就是把xarray以dask矩陣的方式讀取，因此電腦多核心會同時讀取檔案，以增加速度。以下我們開啟NCEP R2水平風場的檔案，指定chunks的大小`chunks={'time':183, 'level': 1, 'longitude':93*2,'latitude':91*2}`，並且計算氣候場、距平值，然後畫出結果。

In [5]:
import xarray as xr
import dask

with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    uds = xr.open_mfdataset('./data/ncep_r2_uv850/u850.*.nc',
                               combine = "by_coords",               
                               parallel=True,
                               chunks={'time':183,  'longitude':36,'latitude':24}
                             )
    vds = xr.open_mfdataset('data/ncep_r2_uv850/v850.*.nc',
                               combine = "by_coords",               
                               parallel=True,
                               chunks={'time':183,  'longitude':36,'latitude':24}
                             )
%time

CPU times: user 1 µs, sys: 0 ns, total: 1 µs
Wall time: 2.86 µs


In [6]:
u = uds.sel(lat=slice(90,0)).uwnd
v = vds.sel(lat=slice(90,0)).vwnd
%time
u

CPU times: user 1e+03 ns, sys: 0 ns, total: 1e+03 ns
Wall time: 3.1 µs


Unnamed: 0,Array,Chunk
Bytes,178.17 MiB,3.72 MiB
Shape,"(8766, 1, 37, 144)","(183, 1, 37, 144)"
Count,168 Tasks,48 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 178.17 MiB 3.72 MiB Shape (8766, 1, 37, 144) (183, 1, 37, 144) Count 168 Tasks 48 Chunks Type float32 numpy.ndarray",8766  1  144  37  1,

Unnamed: 0,Array,Chunk
Bytes,178.17 MiB,3.72 MiB
Shape,"(8766, 1, 37, 144)","(183, 1, 37, 144)"
Count,168 Tasks,48 Chunks
Type,float32,numpy.ndarray


在以上的預覽中，我們可以看到Chunk的資訊，這是因為在dask的lazy computation下，程式還沒有真正將資料的數值給代入，且將資料切成我們指定的小塊來進行平行運算了。到此時資料都還沒有真正進入電腦的記憶體。

接下來我們要計算氣候平均，但根據xarray網站的建議，目前dask對`groupby()`、`resample()`函數還沒有很好的支援，如果在dask下計算，反而很沒效率，因此筆者建議，在切完資料、進行複雜計算前，都應該使用`load()`將資料讀取進記憶體。

> `xarray.Dataset.load`: Manually trigger loading and/or computation of this dataset’s data from disk or a remote source into memory and return this dataset. 

不過如果上面預覽中的task越多、資料檔越大，這個步驟還是得花很多時間。

In [7]:
u.load()
v.load()
%time
u

CPU times: user 2 µs, sys: 1e+03 ns, total: 3 µs
Wall time: 4.77 µs


此時這是一個DataArray。接下來計算氣候平均就很快了。

In [8]:
uDayClim = u.groupby('time.dayofyear').mean('time')
vDayClim = v.groupby('time.dayofyear').mean('time')

uDayClim

### 使用dask的一些好習慣

1. 目前dask在`resample()` or `groupby()`兩個函數並沒有做很好的效率最佳化，因此建議在這之前就先進行`load()`的動作，以避免非常大量的計算。從上面的範例就可以看到從頭計算到ws這個動作，就要花費2279930 tasks，必然要花費很多時間！
2. 把一些初步的結果先儲存成netCDF檔案，然後重新讀進來，會比較節省時間。
3. 空間上切越小的chunks越好 (e.g., chunks={'latitude': 10, 'longitude': 10})。
4. xarray官方網站建議開啟多個檔案時，設定`engine='h5netcdf'`，會比 `engine='netcdf4'`快。
