Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

210000% more RAM usage than expected when reading dask arrays #4448

Closed
arkanoid87 opened this issue Feb 1, 2019 · 18 comments
Closed

210000% more RAM usage than expected when reading dask arrays #4448

arkanoid87 opened this issue Feb 1, 2019 · 18 comments

Comments

@arkanoid87
Copy link

arkanoid87 commented Feb 1, 2019

I'm a novice user but I barely believe that I'm experiencing the expected performance of Dask

Configuration:

2 x Workers [128GB RAM each]:

  • dask=1.1.1
  • python=3.6
  • conda=4.6
  • CentOS Linux
    Client is on worker#1
  1. I've build my graph of operations with a persist at the end. The only node in memory is the last one (I've called preserve() on it).

screen shot 2019-02-01 at 06 02 22

  1. I successfully get the dask.array out of it
result = predicted_y_img_rechunk.compute()

it is a 27915 x 22415 float16 image, ~1300MB according to math

result.shape, result.chunksize, result.dtype
((27915, 22415), (5000, 5000), dtype('float16'))

From this point I've been testing many different possibilities: to_hdf, to_zarr, to_npy, even custom store(function), but all cause my ram to fillup in seconds up to 128GB and crash the kernel

simplest example is a np.array conversion

result_np = np.array(result)

I've also tried to rechunk it == shape size, but no changes in behaviour

My data would fit nicely in RAM many many times, but I'm in the condition of having a dask.array and not being able to save it to disk

@mrocklin
Copy link
Member

mrocklin commented Feb 1, 2019 via email

@arkanoid87
Copy link
Author

arkanoid87 commented Feb 2, 2019

Please consider examples A and B

A

Fits in RAM nicely

%%time

nrows = 600000000

@delayed
def generate(l):
    return pd.Series(np.arange(l, dtype=np.uint64))

col1 = dd.from_delayed(generate(nrows), divisions=(0,nrows))
col2 = dd.from_delayed(generate(nrows), divisions=(0,nrows))
df = dd.multi.concat([col1, col2], axis=1).repartition(npartitions=10)

display(df.compute())

CPU times: user 15.6 s, sys: 23.2 s, total: 38.9 s
Wall time: 3min 54s

B

WARNING: RAM bomb

%%time

nrows = 600000000

@delayed
def generate(l):
    return pd.Series(np.arange(l, dtype=np.uint64))

col1 = generate(nrows)
col2 = generate(nrows)
df = delayed(dd.multi.concat)([col1, col2], axis=1).repartition(npartitions=10)

display(df.compute().compute())

kernel crashed

But with a smaller nrows = 100000, things are different:

A

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 2 columns):
0    100000 non-null uint64
1    100000 non-null uint64
dtypes: uint64(2)
memory usage: 1.5 MB
None
CPU times: user 88.3 ms, sys: 7.07 ms, total: 95.4 ms
Wall time: 280 ms

B

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 2 columns):
0    100000 non-null uint64
1    100000 non-null uint64
dtypes: uint64(2)
memory usage: 1.5 MB
None
CPU times: user 51.7 ms, sys: 11.5 ms, total: 63.2 ms
Wall time: 247 ms

By changing all my code to pattern A I have successfully completed an iteration of my graph.
I followed pattern B during my initial trials because I was checking my functions with %%time with small data and it felt like it was the way to go and I have not seen any warning in code/documentation

Questions

  • Why B explodes in ram usage with large nrows? Why B is actually faster than A with small nrows? (the lower I go, the faster B is)
  • Is it considered good practice to return a dask collection from a delayed function like in B?
  • If I build my graph with x.from_delayed(...) functions, if x is dd it actually calls the delayed function, if x is da it is not. Why? Please consider following examples

Exception not raised

@delayed
def generate():
    raise Exception("You fool")
    return pd.Series(np.arange(1000, dtype=np.uint64))

x = generate()

y = da.from_delayed(x, shape=(1,1000), dtype=np.uint64)

# y.compute() # this would raise it

Exception raised

@delayed
def generate():
    raise Exception("You fool")
    return pd.Series(np.arange(1000, dtype=np.uint64))

x = generate()

y = dd.from_delayed(x, divisions=(0,1000)) # raises exception

# y.compute() # never reached

Thank you

@paularmand
Copy link

Could it be that dd needs metadata and is calling the function to retrieve the first number of results? Your function raise an error at that time.

@paularmand
Copy link

In case the above was not clear, see ttp://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.from_delayed, and try adding an argument meta to your call to dd.from_delayed.

@arkanoid87
Copy link
Author

arkanoid87 commented Feb 2, 2019 via email

@mrocklin
Copy link
Member

mrocklin commented Feb 2, 2019 via email

@arkanoid87
Copy link
Author

arkanoid87 commented Feb 2, 2019

Thanks for the comments, but please I need some more help to apply this to my real problem.

I am doing ML on large images (30K x 30k, float32) with geo/meteo data.

I am lazily loading geo images into dask arrays

@delayed
def load_raster(path):
    with rasterio.open(path, 'r') as raster:
        return raster.read(1).ravel()

img1 = load_raster('A.tif')
img2 = load_raster('B.tif')
img3 = load_raster('C.tif')
# actually 7 of them

I have to perform per-pixel prediction using different already fitted scikit models. In numpy I'd column-stack the three and call model.predict(X). But here I'd like to share the work on the available computing power (2 workers, 64GB - 20 cores each)

How can I stack the images efficiently?
This stack operation seems computed on a single worker, is there a better solution?

X = da.stack([da.from_delayed(d, dtype=np.float32, shape=(30000,30000)) for d in [ 
        img1, img2, img3
]], axis=1)

I'm also trying to work column wise using dask Series and Dataframes instead of arrays

@delayed
def load_raster(path):
    with rasterio.open(path, 'r') as raster:
        return pd.Series(raster.read(1).ravel())

img1 = dd.from_delayed(load_raster('A.tif'), meta=('A', np.float32), divisions=(0,30000))
img2 = dd.from_delayed(load_raster('B.tif'), meta=('B', np.float32), divisions=(0,30000))
img3 = dd.from_delayed(load_raster('C.tif'), meta=('C', np.float32), divisions=(0,30000))

X = dd.multi.concat([img1s, img2s, img3s], axis=1).repartition(npartitions=40) # is partitioning by number of cores a good thing?

To sum up, what is best given my context?

y = da.map_blocks(model.predict, X, dtype=np.float32, chunks=(X.chunksize[0], 1))
y = dd.map_partitions(model.predict, X, meta=('y', np.float32))

Or should I use dask_ml.wrappers.Incremental instead?

Thanks

@mrocklin
Copy link
Member

mrocklin commented Feb 2, 2019 via email

@arkanoid87
Copy link
Author

Ok, but xarray.open_rasterio returns a chunked Dask array, so here I'm back to one of my first questions:

Is it considered good practice to return a dask collection from a delayed function?

e.g.

@delayed
def load_raster(path, chunks):
    return xarray.open_rasterio(path, chunks=chunks)

img1 = load_raster('A.tif', (1, -1))

@TomAugspurger
Copy link
Member

Is it considered good practice to return a dask collection from a delayed function?

Nope: http://docs.dask.org/en/latest/delayed-best-practices.html#don-t-call-dask-delayed-on-other-dask-collections

@arkanoid87
Copy link
Author

Ok, but then there's a missing point

I have many large images in a NFS shared between the workers.

Images does not fit nicely in ram and I have to stack them, so you told me to use open_rasterio to avoid the all-in-ram step and that is ok.

But let's say I want to load them this way in parallel on the workers as they have access to the fs. If I can't return a dask collection from a delayed function, how?

Thank you

@TomAugspurger
Copy link
Member

If I can't return a dask collection from a delayed function, how?

I don't follow. array.open_rasterio will be lazy when chunks is provided. Did you try that?

@arkanoid87
Copy link
Author

arkanoid87 commented Feb 3, 2019

Wouldn't that load the whole image in ram anyway before dumping to a Dask Array? Just like using rasterio.open().load()

I feel confusion between np, pandas, dask and xarray

From what you are saying it seems that inside Delayed functions I get np/pd as input and I should return return np/pd collections as output, but now you are saying to return array.open_rasterio that would return a xarray DataArray.

So the general question is: how can I demand a worker to lazy load data that would not fit into worker's memory?

Thank you

@arkanoid87
Copy link
Author

arkanoid87 commented Feb 3, 2019

EDIT: I was pushing the model into the graph. Fixed

I also have another related problem

I have saved a large fitted RandomForestRegressor model to a file and I need to call .predict for each pixel of those large images once stacked.

Problem is that RandomForestRegressor is quite large once loaded in memory AND predict function ram usage grows in size linearly with the input size, so I'm using map_blocks/map_partition like this:

mymodel = delayed(mytree)
y = X.map_partitions(lambda X, model: model.predict(X), mymodel, meta=('y', np.float32))

But when doing

y = y.persist()

Workers star failing with many truncated pickle errors and the calculation stops.

tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f72250670d0>, <Future finished exception=UnpicklingError('pickle data was truncated',)>)
Traceback (most recent call last):
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/tornado/ioloop.py", line 758, in _run_callback
    ret = callback()
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/tornado/ioloop.py", line 779, in _discard_future_result
    future.result()
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/tornado/gen.py", line 326, in wrapper
    yielded = next(result)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/worker.py", line 2053, in execute
    data[k] = self.data[k]
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/zict/buffer.py", line 70, in __getitem__
    return self.slow_to_fast(key)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/zict/buffer.py", line 57, in slow_to_fast
    value = self.slow[key]
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/zict/func.py", line 39, in __getitem__
    return self.load(self.d[key])
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 378, in deserialize_bytes
    return deserialize(header, frames)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 187, in deserialize
    return loads(header, frames)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 49, in dask_loads
    return loads(header, frames)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 517, in deserialize_object_with_dict
    v = deserialize(h, f)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 187, in deserialize
    return loads(header, frames)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 57, in pickle_loads
    return pickle.loads(b''.join(frames))
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
_pickle.UnpicklingError: pickle data was truncated

I feel that the error is due to the size of mytree, but what can I do to overcome the problem?

Thanks

@arkanoid87
Copy link
Author

arkanoid87 commented Feb 4, 2019

EDIT: I was pushing the model into the graph. Fixed

I've just found another unexpected super-heavy operation

I've fixed my pipeline and it ends with a Delayed function that produces a tiff image on disk, and it works.

Now I just needed to put almost the whole graph in a loop and produce 12 of these, one per month.
So I did

shared = dd.from_array(...)
results = []
for month in range(1,13):
    # some heavy lifting
    final_y = ...
    result = delayed(save_image)(final_y)
    results.append(result)

If I do manually:

results[0].compute()

It works

If I pass the whole list:

client.persist(results) # or client.compute, or dask.compute/persist

It clogs 128GB of ram and doesn't event start computing

I've also tried with optimize_graph=False but no changes

I have to re-launch the calculation without sharing the shareable

@mrocklin
Copy link
Member

Wouldn't that load the whole image in ram anyway before dumping to a Dask Array? Just like using rasterio.open().load()

No, xarray is lazy. It uses dask under the hood .

Workers star failing with many truncated pickle errors and the calculation stops.

My first guess is that you have mismatched versions. Try

client.get_versions(check=True)

@wgeul
Copy link

wgeul commented Mar 15, 2020

EDIT: I was pushing the model into the graph. Fixed

I also have another related problem
I have saved a large fitted RandomForestRegressor model to a file and I need to call .predict for each pixel of those large images once stacked.
Problem is that RandomForestRegressor is quite large once loaded in memory AND predict function ram usage grows in size linearly with the input size, so I'm using map_blocks/map_partition like this:

mymodel = delayed(mytree)
y = X.map_partitions(lambda X, model: model.predict(X), mymodel, meta=('y', np.float32))

But when doing

y = y.persist()

Workers star failing with many truncated pickle errors and the calculation stops.

tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f72250670d0>, <Future finished exception=UnpicklingError('pickle data was truncated',)>)
Traceback (most recent call last):
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/tornado/ioloop.py", line 758, in _run_callback
    ret = callback()
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/tornado/ioloop.py", line 779, in _discard_future_result
    future.result()
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/tornado/gen.py", line 326, in wrapper
    yielded = next(result)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/worker.py", line 2053, in execute
    data[k] = self.data[k]
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/zict/buffer.py", line 70, in __getitem__
    return self.slow_to_fast(key)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/zict/buffer.py", line 57, in slow_to_fast
    value = self.slow[key]
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/zict/func.py", line 39, in __getitem__
    return self.load(self.d[key])
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 378, in deserialize_bytes
    return deserialize(header, frames)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 187, in deserialize
    return loads(header, frames)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 49, in dask_loads
    return loads(header, frames)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 517, in deserialize_object_with_dict
    v = deserialize(h, f)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 187, in deserialize
    return loads(header, frames)
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 57, in pickle_loads
    return pickle.loads(b''.join(frames))
  File "/home/nfs/jack/miniconda3/envs/jupyter/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
_pickle.UnpicklingError: pickle data was truncated

I feel that the error is due to the size of mytree, but what can I do to overcome the problem?
Thanks

I am getting a similar error "pickle.UnpicklingError: pickle data was truncated", also when using RandomForestRegressor. Have you had any luck in overcoming this issue?

Edit:

I figured out that the issue is related to RAM per core. Solved it by altering nprocs and/or spill rate.

@mrocklin
Copy link
Member

I'm glad to see that you've resolved your issue. I'm going to go ahead and close this. Thanks for reporting things @arkanoid87

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants