> The simulations are all running in parallel so writing to the same file is challenging
How is each simulation managed? Typically you would launch each process, in which case you can write to as many files as processes you launch at a time. > The raw results of each simulation gets read in using Pandas and written to an arrow file after a simulation is complete This is potentially another point where you can coalesce outputs. Perhaps the process that reads simulation results can read many simulation results instead of 1 so that the output arrow file is larger. The last thing I can think of is that sometimes blob storage allows you to group many writes together as "partial writes" to a single blob. This is necessary because it's extremely common that you have a single named object that must be spread across storage units (e.g. inodes or extents or data pages etc.). This SO post [1] mentions blocks and block lists which could be used to write each small file as a blob block which you then can create many lists out of. The Azure Blob Storage documentation [2] also lists "page blobs" and "append blobs". So, this third suggestion is to see if you can use one of these approaches to address the overhead of listing all of the files (which David mentions). It looks like both block blobs and append blobs (the ones I think are most useful for you) have a limit of ~50,000 blocks/appends, so you would need some simple naming scheme to be sure you don't overrun that limit. I'm not sure if it's possible, but hopefully you can write them in such a way that the Dataset API (or something similar) can combine each batch efficiently (depending on size of each batch and how Arrow combines compressed batches, it may be better to not compress until you combine the small files). [1]: https://stackoverflow.com/a/61156040 [2]: https://learn.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api Aldrin Montana Computer Science PhD Student UC Santa Cruz On Mon, Oct 10, 2022 at 4:49 AM David Li <[email protected]> wrote: > Which filesystem implementation are you using for Azure? I would guess > that part of the bottleneck might even just be in _listing_ all the files. > I believe some of the S3 filesystem implementations try to parallelize this > step for that reason, but if all files lie in the same prefix, even that > may not be possible. I am not sure if any of the Azure Blob Storage > filesystem implementations do this. (Also, Arrow Dataset plans to - but > does not yet support - pipelining listing/reading, for this reason.) > > Another solution may be to turn up the fragment readahead settings while > reading, since you have so many small files. Unfortunately it appears this > is only available in PyArrow starting in 10.0.0 [1]. > > Compression is not helping you for these single-row files but I also would > not think it's a major contributor to the runtime; I think you're mostly > getting crushed under the overhead of listing, then reading (and opening > HTTP connections for) 100k files. (And because the readers try to minimize > I/O for larger files, they're generally making multiple requests per file, > which in this case is hurting you even more.) > > The optimal chunk size is going to depend on your application. Chunk size > is mostly controlled by how you are generating the data. You can think of > it as the intra-file unit of parallelism in this case (though I would think > the equivalent to the Spark example is to have multiple files, one per > worker). > > [1]: > https://arrow.apache.org/docs/dev/python/generated/pyarrow.dataset.Scanner.html?highlight=fragment_readahead > > On Sun, Oct 9, 2022, at 22:36, Cedric Yau wrote: > > If each file is just one row, then you might be better off writing the > values out as a Tab Separate Values file. Those can all be concatenated > together. > > The data is also small enough to transmit via a queue. I'm primarily AWS > but Azure Queue Storage appears to be the right service: > How to use Azure Queue Storage from Python | Microsoft Learn > <https://learn.microsoft.com/en-us/azure/storage/queues/storage-python-how-to-use-queue-storage?tabs=python%2Cenvironment-variable-windows> > > This would allow you to gather results as they are ready instead of > waiting until the end. > > Cedric > > On Sun, Oct 9, 2022 at 10:27 PM Nikhil Makan <[email protected]> > wrote: > > That's it! Thanks David. > > to_table().column(0).num_chunks = to_table().num_rows, therefore > combine_chunks() merged them all into one. > > However I need to unpack the finer details of this a bit more: > > - Given I have over 100k simulations producing these 'single row' > files. What's the best way to handle this afterwards or is there a better > way to store this from the start. The simulations are all running in > parallel so writing to the same file is challenging and I don't want to go > down the route of implementing a database of some sort. Would storing them > as uncompressed arrow files improve performance when reading in with the > dataset API or is there another more efficient way to combine them? > - What is optimal for chunks? Naturally a chunk for each row is not > efficient. Leaning on some knowledge I have with Spark the idea is to > partition your data so it can be spread across the number of nodes in the > cluster, to few partitions meant an under utilised cluster. What should be > done with chunks and do we have control over setting this? > > Kind regards > Nikhil Makan > > > On Mon, Oct 10, 2022 at 2:21 PM David Li <[email protected]> wrote: > > > I would guess that because the 187mb file is generated from 100,000+ > files, and each input file is one row, you are hitting basically a > pathological case for Dataset/Arrow. Namely, the Dataset code isn't going > to consolidate input batches together, and so each input batch (= 1 row) is > making it into the output file. And there's some metadata per batch (=per > row!), inflating the storage requirements, and completely inhibiting > compression. (You're running LZ4 on each individual value in each row!) > > To check this, can you call combine_chunks() [1] after to_table, before > write_feather? This will combine the record batches (well, technically, > chunks of the ChunkedArrays) into a single record batch, and I would guess > you'll end up with something similar to the to_pandas().to_feather() case > (without bouncing through Pandas). > > You could also check this with to_table().column(0).num_chunks (I'd expect > this would be equal to to_table().num_rows). > > [1]: > https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.combine_chunks > > On Sun, Oct 9, 2022, at 21:12, Nikhil Makan wrote: > > Hi Will, > > For clarity the simulation files do get written to an Azure Blob Storage, > however to simplify things I have not tried to read the data directly from > the cloud storage. I have downloaded it first and then loaded it into a > dataset locally (which takes 12 mins). The process to produce the arrow > file for each simulation is done through pandas. The raw results of each > simulation gets read in using Pandas and written to an arrow file after a > simulation is complete. The 100 000+ files are therefore in the arrow > format using LZ4 compression. > > The table retrieved from the dataset object is then written to an arrow > file using feather.write_feather which again by default uses LZ4. > > Do you know if there is any way to inspect the two files or tables to get > more information about them as I can't understand how I have two arrow > files, one which is 187 mb the other 5.5mb however both with the same > compression LZ4, schema, shape and nbytes when read in. > > I can even read in the 187mb arrow file and write it back to disk and it > remains 187 mb, so there is definitely some property of the arrow table > that I am not seeing. It is not necessarily a property of just the file. > > Kind regards > Nikhil Makan > > > On Mon, Oct 10, 2022 at 1:38 PM Will Jones <[email protected]> > wrote: > > Hi Nikhil, > > To do this I have chosen to use the Dataset API which reads all the files > (this takes around 12 mins) > > > Given the number of files (100k+ right?) this does seem surprising. > Especially if you are using a remote filesystem like Azure or S3. Perhaps > you should consider having your application record the file paths of each > simulation result; then the Datasets API doesn't need to spend time > resolving all the file paths. > > The part I find strange is the size of the combined arrow file (187 mb) on > disk and time it takes to write and read that file (> 1min). > > > On the size, you should check which compression you are using. There are > some code paths that write uncompressed data by default and some code paths > that do. The Pandas to_feather() uses LZ4 by default; it's possible the > other way you are writing isn't. See IPC write options [1]. > > On the time to read, that seems very long for local, and even for remote > (Azure?). > > Best, > > Will Jones > > [1] > https://arrow.apache.org/docs/python/generated/pyarrow.ipc.IpcWriteOptions.html#pyarrow.ipc.IpcWriteOptions > > > On Sun, Oct 9, 2022 at 5:08 PM Nikhil Makan <[email protected]> > wrote: > > Hi All, > > I have a situation where I am running a lot of simulations in parallel > (100k+). The results of each simulation get written to an arrow file. A > result file contains around 8 columns and 1 row. > > After all simulations have run I want to be able to merge all these files > up into a single arrow file. > > - To do this I have chosen to use the Dataset API which reads all the > files (this takes around 12 mins) > - I then call to_table() on the dataset to bring it into memory > - Finally I write the table to an arrow file using > feather.write_feather() > > Unfortunately I do not have a reproducible example, but hopefully the > answer to this question won't require one. > > The part I find strange is the size of the combined arrow file (187 mb) on > disk and time it takes to write and read that file (> 1min). > > Alternatively I can convert the table to a pandas dataframe by calling > to_pandas() and then use to_feather() on the dataframe. The resulting file > on disk is now only 5.5 mb and naturally writes and opens in a flash. > > I feel like this has something to do with partitions and how the table is > being structured coming from the dataset API that is then preserved when > writing to a file. > > Does anyone know why this would be the case and how to achieve the same > outcome as done with the intermediate step by converting to pandas. > > Kind regards > Nikhil Makan > > > > > -- > Cedric Yau > [email protected] > > >
