Thank Lukasz. Unfortunately, decompressing the files is not an option for us.
I am trying to speed up Reshuffle step, since it waits for all data. Here are two ways I have tried: 1. add timestamps to the PCollection's elements after reading (since it is bounded source), then apply windowing before Reshuffle, but it still waits all data. 2. run the pipeline with --streaming flag, but it leads to an error: Workflow failed. Causes: Expected custom source to have non-zero number of splits. Also, I found in https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features : *DataflowRunner does not currently support the following Cloud Dataflow specific features with Python streaming execution.* - *Streaming autoscaling* I doubt whether this approach can solve my issue. Thanks so much! Allie *From: *Lukasz Cwik <[email protected]> *Date: *Tue, May 14, 2019 at 11:16 AM *To: *dev *Cc: *user Do you need to perform any joins across the files (e.g. > Combine.perKey/GroupByKey/...)? > If not, you could structure your pipeline > ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA > ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB > ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC > and then run it as a batch pipeline. > > You can set --streaming=true on the pipeline and then it will run in a > streaming mode but streaming prioritizes low latency and correctness on > Google Cloud Dataflow so it will cost more to run your pipeline then in > batch mode. It may make more sense to store the data uncompressed as it may > be less expensive then paying the additional compute cost for streaming. > > *From: *Allie Chen <[email protected]> > *Date: *Tue, May 14, 2019 at 7:38 AM > *To: * <[email protected]> > *Cc: *user > > Is it possible to use windowing or somehow pretend it is streaming so >> Reshuffle or GroupByKey won't wait until all data has been read? >> >> Thanks! >> Allie >> >> *From: *Lukasz Cwik <[email protected]> >> *Date: *Fri, May 10, 2019 at 5:36 PM >> *To: *dev >> *Cc: *user >> >> There is no such flag to turn of fusion. >>> >>> Writing 100s of GiBs of uncompressed data to reshuffle will take time >>> when it is limited to a small number of workers. >>> >>> If you can split up your input into a lot of smaller files that are >>> compressed then you shouldn't need to use the reshuffle but still could if >>> you found it helped. >>> >>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <[email protected]> >>> wrote: >>> >>>> Re Lukasz: Thanks! I am not able to control the compression format but >>>> I will see whether the splitting gzip files will work. Is there a simple >>>> flag in Dataflow that could turn off the fusion? >>>> >>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey >>>> and FlatMap in Reshuffle are very slow when the data is large. Reshuffle >>>> itself is not parallel either. >>>> >>>> Thanks all, >>>> >>>> Allie >>>> >>>> *From: *Reuven Lax <[email protected]> >>>> *Date: *Fri, May 10, 2019 at 5:02 PM >>>> *To: *dev >>>> *Cc: *user >>>> >>>> It's unlikely that Reshuffle itself takes hours. It's more likely that >>>>> simply reading and decompressing all that data was very slow when there >>>>> was >>>>> no parallelism. >>>>> >>>>> *From: *Allie Chen <[email protected]> >>>>> *Date: *Fri, May 10, 2019 at 1:17 PM >>>>> *To: * <[email protected]> >>>>> *Cc: * <[email protected]> >>>>> >>>>> Yes, I do see the data after reshuffle are processed in parallel. But >>>>>> Reshuffle transform itself takes hours or even days to run, according to >>>>>> one test (24 gzip files, 17 million lines in total) I did. >>>>>> >>>>>> The file format for our users are mostly gzip format, since >>>>>> uncompressed files would be too costly to store (It could be in hundreds >>>>>> of >>>>>> GB). >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Allie >>>>>> >>>>>> >>>>>> *From: *Lukasz Cwik <[email protected]> >>>>>> *Date: *Fri, May 10, 2019 at 4:07 PM >>>>>> *To: *dev, <[email protected]> >>>>>> >>>>>> [email protected] <[email protected]> >>>>>>> >>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till >>>>>>> all the data has been read before the next transforms can run. After the >>>>>>> reshuffle, the data should have been processed in parallel across the >>>>>>> workers. Did you see this? >>>>>>> >>>>>>> Are you able to change the input of your pipeline to use an >>>>>>> uncompressed file or many compressed files? >>>>>>> >>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> >>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since >>>>>>>> the compressed file is not splittable, one worker is allocated to read >>>>>>>> the >>>>>>>> file. The same worker will do all the other transforms since Dataflow >>>>>>>> fused >>>>>>>> all transforms together. There are a large amount of data in the >>>>>>>> file, and >>>>>>>> I expect to see more workers spinning up after reading transforms. I >>>>>>>> tried >>>>>>>> to use Reshuffle Transform >>>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516> >>>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed >>>>>>>> until >>>>>>>> all data arrived at this point. >>>>>>>> >>>>>>>> Is there any other ways to allow more workers working on all the >>>>>>>> other transforms after reading? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Allie >>>>>>>> >>>>>>>>
