Interesting thread. Thanks for digging that up. I would try the shuffle_mode=service experiment (forgot that wasn't yet the default). If that doesn't do the trick, though avro as a materialization format does not provide perfect parallelism, it should be significantly better than what you have now (large gzip files) and may be good enough.
On Wed, May 15, 2019 at 2:34 PM Michael Luckey <[email protected]> wrote: > > @Robert > > Does your suggestion imply, that the points made by Eugene on BEAM-2803 do > not apply (anymore) and the combined reshuffle could just be omitted? > > On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <[email protected]> wrote: >> >> Unfortunately the "write" portion of the reshuffle cannot be >> parallelized more than the source that it's reading from. In my >> experience, generally the read is the bottleneck in this case, but >> it's possible (e.g. if the input compresses extremely well) that it is >> the write that is slow (which you seem to indicate based on your >> observation of the UI, right?). >> >> It could be that materializing to temporary files is cheaper than >> materializing randomly to shuffle (especially on pre-portable Python). >> In that case you could force a fusion break with a side input instead. >> E.g. >> >> class FusionBreak(beam.PTransform): >> def expand(self, pcoll): >> # Create an empty PCollection that depends on pcoll. >> empty = pcoll | beam.FlatMap(lambda x: ()) >> # Use this empty PCollection as a side input, which will force >> a fusion break. >> return pcoll | beam.Map(lambda x, unused: x, >> beam.pvalue.AsIterable(empty)) >> >> which could be used in place of Reshard like >> >> p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ... >> >> You'll probably want to be sure to pass the use_fastavro experiment as well. >> >> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <[email protected]> wrote: >> > >> > Hi >> > >> > This project is a completely different solution towards this problem, but >> > in the hadoop mapreduce context. >> > >> > https://github.com/nielsbasjes/splittablegzip >> > >> > >> > I have used this a lot in the past. >> > Perhaps porting this project to beam is an option? >> > >> > Niels Basjes >> > >> > >> > >> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <[email protected]> wrote: >> >> >> >> Sorry I couldn't be more helpful. >> >> >> >> From: Allie Chen <[email protected]> >> >> Date: Tue, May 14, 2019 at 10:09 AM >> >> To: <[email protected]> >> >> Cc: user >> >> >> >>> Thank Lukasz. Unfortunately, decompressing the files is not an option >> >>> for us. >> >>> >> >>> >> >>> I am trying to speed up Reshuffle step, since it waits for all data. >> >>> Here are two ways I have tried: >> >>> >> >>> 1. add timestamps to the PCollection's elements after reading (since it >> >>> is bounded source), then apply windowing before Reshuffle, but it still >> >>> waits all data. >> >>> >> >>> >> >>> 2. run the pipeline with --streaming flag, but it leads to an error: >> >>> Workflow failed. Causes: Expected custom source to have non-zero number >> >>> of splits. Also, I found in >> >>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features: >> >>> >> >>> DataflowRunner does not currently support the following Cloud Dataflow >> >>> specific features with Python streaming execution. >> >>> >> >>> Streaming autoscaling >> >>> >> >>> I doubt whether this approach can solve my issue. >> >>> >> >>> >> >>> Thanks so much! >> >>> >> >>> Allie >> >>> >> >>> >> >>> From: Lukasz Cwik <[email protected]> >> >>> Date: Tue, May 14, 2019 at 11:16 AM >> >>> To: dev >> >>> Cc: user >> >>> >> >>>> Do you need to perform any joins across the files (e.g. >> >>>> Combine.perKey/GroupByKey/...)? >> >>>> If not, you could structure your pipeline >> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA >> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB >> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC >> >>>> and then run it as a batch pipeline. >> >>>> >> >>>> You can set --streaming=true on the pipeline and then it will run in a >> >>>> streaming mode but streaming prioritizes low latency and correctness on >> >>>> Google Cloud Dataflow so it will cost more to run your pipeline then in >> >>>> batch mode. It may make more sense to store the data uncompressed as it >> >>>> may be less expensive then paying the additional compute cost for >> >>>> streaming. >> >>>> >> >>>> From: Allie Chen <[email protected]> >> >>>> Date: Tue, May 14, 2019 at 7:38 AM >> >>>> To: <[email protected]> >> >>>> Cc: user >> >>>> >> >>>>> Is it possible to use windowing or somehow pretend it is streaming so >> >>>>> Reshuffle or GroupByKey won't wait until all data has been read? >> >>>>> >> >>>>> Thanks! >> >>>>> Allie >> >>>>> >> >>>>> From: Lukasz Cwik <[email protected]> >> >>>>> Date: Fri, May 10, 2019 at 5:36 PM >> >>>>> To: dev >> >>>>> Cc: user >> >>>>> >> >>>>>> There is no such flag to turn of fusion. >> >>>>>> >> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time >> >>>>>> when it is limited to a small number of workers. >> >>>>>> >> >>>>>> If you can split up your input into a lot of smaller files that are >> >>>>>> compressed then you shouldn't need to use the reshuffle but still >> >>>>>> could if you found it helped. >> >>>>>> >> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <[email protected]> >> >>>>>> wrote: >> >>>>>>> >> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression format >> >>>>>>> but I will see whether the splitting gzip files will work. Is there >> >>>>>>> a simple flag in Dataflow that could turn off the fusion? >> >>>>>>> >> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey >> >>>>>>> and FlatMap in Reshuffle are very slow when the data is large. >> >>>>>>> Reshuffle itself is not parallel either. >> >>>>>>> >> >>>>>>> Thanks all, >> >>>>>>> >> >>>>>>> Allie >> >>>>>>> >> >>>>>>> From: Reuven Lax <[email protected]> >> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM >> >>>>>>> To: dev >> >>>>>>> Cc: user >> >>>>>>> >> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely >> >>>>>>>> that simply reading and decompressing all that data was very slow >> >>>>>>>> when there was no parallelism. >> >>>>>>>> >> >>>>>>>> From: Allie Chen <[email protected]> >> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM >> >>>>>>>> To: <[email protected]> >> >>>>>>>> Cc: <[email protected]> >> >>>>>>>> >> >>>>>>>>> Yes, I do see the data after reshuffle are processed in parallel. >> >>>>>>>>> But Reshuffle transform itself takes hours or even days to run, >> >>>>>>>>> according to one test (24 gzip files, 17 million lines in total) I >> >>>>>>>>> did. >> >>>>>>>>> >> >>>>>>>>> The file format for our users are mostly gzip format, since >> >>>>>>>>> uncompressed files would be too costly to store (It could be in >> >>>>>>>>> hundreds of GB). >> >>>>>>>>> >> >>>>>>>>> Thanks, >> >>>>>>>>> >> >>>>>>>>> Allie >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> From: Lukasz Cwik <[email protected]> >> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM >> >>>>>>>>> To: dev, <[email protected]> >> >>>>>>>>> >> >>>>>>>>>> [email protected] >> >>>>>>>>>> >> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits >> >>>>>>>>>> till all the data has been read before the next transforms can >> >>>>>>>>>> run. After the reshuffle, the data should have been processed in >> >>>>>>>>>> parallel across the workers. Did you see this? >> >>>>>>>>>> >> >>>>>>>>>> Are you able to change the input of your pipeline to use an >> >>>>>>>>>> uncompressed file or many compressed files? >> >>>>>>>>>> >> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen >> >>>>>>>>>> <[email protected]> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>> Hi, >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since >> >>>>>>>>>>> the compressed file is not splittable, one worker is allocated >> >>>>>>>>>>> to read the file. The same worker will do all the other >> >>>>>>>>>>> transforms since Dataflow fused all transforms together. There >> >>>>>>>>>>> are a large amount of data in the file, and I expect to see more >> >>>>>>>>>>> workers spinning up after reading transforms. I tried to use >> >>>>>>>>>>> Reshuffle Transform to prevent the fusion, but it is not >> >>>>>>>>>>> scalable since it won’t proceed until all data arrived at this >> >>>>>>>>>>> point. >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> Is there any other ways to allow more workers working on all the >> >>>>>>>>>>> other transforms after reading? >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> Thanks, >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> Allie >> >>>>>>>>>>> >> >>>>>>>>>>>
