We can't use Beam to parallelize multiple file reads, b/c GraphDataLoader is specific to the model being trained. So multiple Beam processes can't share the same model (until we move into DGL distributed mode later this year).
We're trying to optimize throughput of the GraphDataLoader consume/process these Parquet files. On Thu, Feb 3, 2022 at 11:01 AM Cindy McMullen <cmcmul...@twitter.com> wrote: > The use case is for a GraphDataLoader > <https://docs.dgl.ai/en/0.6.x/api/python/dgl.dataloading.html#dgl.dataloading.pytorch.GraphDataLoader> > to > run w/ multiple threads. GraphDataLoade invokes its DGLDataset > <https://docs.dgl.ai/en/0.6.x/api/python/dgl.data.html#dgl.data.DGLDataset>, > which loads these Parquet files to convert into DGL-compatible > graph objects. > > > On Thu, Feb 3, 2022 at 10:00 AM Micah Kornfield <emkornfi...@gmail.com> > wrote: > >> Hi Cindy, >> >>> I'd like to ingest batches within a Parquet file in parallel. >> >> What is the motivation here? Is it speeding up Parquet reading or >> processing after the fact? >> >> >> Side note, the size of your row groups seems quite small (it might be >> right for your specific use-case). >> >> Cheers, >> Micah >> >> On Thu, Feb 3, 2022 at 8:01 AM Cindy McMullen <cmcmul...@twitter.com> >> wrote: >> >>> Maybe -- will give it a try. Thanks for the suggestion. >>> >>> On Thu, Feb 3, 2022 at 7:56 AM Partha Dutta <partha.du...@gmail.com> >>> wrote: >>> >>>> There is a parameter to iter_batches where you can pass in the >>>> row_group number, or a list of row groups. Would this help to read the >>>> Parquet file in parallel? >>>> >>>> On Thu, Feb 3, 2022 at 8:31 AM Cindy McMullen <cmcmul...@twitter.com> >>>> wrote: >>>> >>>>> Hi - >>>>> >>>>> I'd like to ingest batches within a Parquet file in parallel. The >>>>> client (DGLDataset) needs to be thread-safe. What's the best API for me >>>>> to >>>>> use to do so? >>>>> >>>>> Here's the metadata for one example file: >>>>> >>>>> <pyarrow._parquet.FileMetaData object at 0x7fbb05c64050> >>>>> created_by: parquet-mr version 1.12.0 (build >>>>> db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20) >>>>> num_columns: 4 >>>>> num_rows: 1000000 >>>>> num_row_groups: 9997 >>>>> format_version: 1.0 >>>>> serialized_size: 17824741 >>>>> >>>>> I want the consumption of batches to be distributed among multiple >>>>> workers. I'm currently trying something like this: >>>>> >>>>> # Once per client >>>>> >>>>> pqf = pq.ParquetFile(f, memory_map=True) >>>>> >>>>> >>>>> # Ideally, each worker can do this, but ParquetFile.iter_batches is not >>>>> thread-safe. This makes intuitive sense. pq_batches = >>>>> pqf.iter_batches(self.rows_per_batch, use_pandas_metadata=True) >>>>> >>>>> >>>>> >>>>> My workaround is to buffer these ParquetFile batches into DataFrame >>>>> [], but this is memory-intensive, so will not scale to multiple of these >>>>> input files. >>>>> >>>>> What's a better PyArrow pattern to use so I can distribute batches to >>>>> my workers in a thread-safe manner? >>>>> >>>>> Thanks -- >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>>> -- >>>> Partha Dutta >>>> partha.du...@gmail.com >>>> >>>