@Robert Does your suggestion imply, that the points made by Eugene on BEAM-2803 do not apply (anymore) and the combined reshuffle could just be omitted?
On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <rober...@google.com> wrote: > Unfortunately the "write" portion of the reshuffle cannot be > parallelized more than the source that it's reading from. In my > experience, generally the read is the bottleneck in this case, but > it's possible (e.g. if the input compresses extremely well) that it is > the write that is slow (which you seem to indicate based on your > observation of the UI, right?). > > It could be that materializing to temporary files is cheaper than > materializing randomly to shuffle (especially on pre-portable Python). > In that case you could force a fusion break with a side input instead. > E.g. > > class FusionBreak(beam.PTransform): > def expand(self, pcoll): > # Create an empty PCollection that depends on pcoll. > empty = pcoll | beam.FlatMap(lambda x: ()) > # Use this empty PCollection as a side input, which will force > a fusion break. > return pcoll | beam.Map(lambda x, unused: x, > beam.pvalue.AsIterable(empty)) > > which could be used in place of Reshard like > > p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ... > > You'll probably want to be sure to pass the use_fastavro experiment as > well. > > On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote: > > > > Hi > > > > This project is a completely different solution towards this problem, > but in the hadoop mapreduce context. > > > > https://github.com/nielsbasjes/splittablegzip > > > > > > I have used this a lot in the past. > > Perhaps porting this project to beam is an option? > > > > Niels Basjes > > > > > > > > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote: > >> > >> Sorry I couldn't be more helpful. > >> > >> From: Allie Chen <yifangc...@google.com> > >> Date: Tue, May 14, 2019 at 10:09 AM > >> To: <d...@beam.apache.org> > >> Cc: user > >> > >>> Thank Lukasz. Unfortunately, decompressing the files is not an option > for us. > >>> > >>> > >>> I am trying to speed up Reshuffle step, since it waits for all data. > Here are two ways I have tried: > >>> > >>> 1. add timestamps to the PCollection's elements after reading (since > it is bounded source), then apply windowing before Reshuffle, but it still > waits all data. > >>> > >>> > >>> 2. run the pipeline with --streaming flag, but it leads to an error: > Workflow failed. Causes: Expected custom source to have non-zero number of > splits. Also, I found in > https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features > : > >>> > >>> DataflowRunner does not currently support the following Cloud Dataflow > specific features with Python streaming execution. > >>> > >>> Streaming autoscaling > >>> > >>> I doubt whether this approach can solve my issue. > >>> > >>> > >>> Thanks so much! > >>> > >>> Allie > >>> > >>> > >>> From: Lukasz Cwik <lc...@google.com> > >>> Date: Tue, May 14, 2019 at 11:16 AM > >>> To: dev > >>> Cc: user > >>> > >>>> Do you need to perform any joins across the files (e.g. > Combine.perKey/GroupByKey/...)? > >>>> If not, you could structure your pipeline > >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA > >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB > >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC > >>>> and then run it as a batch pipeline. > >>>> > >>>> You can set --streaming=true on the pipeline and then it will run in > a streaming mode but streaming prioritizes low latency and correctness on > Google Cloud Dataflow so it will cost more to run your pipeline then in > batch mode. It may make more sense to store the data uncompressed as it may > be less expensive then paying the additional compute cost for streaming. > >>>> > >>>> From: Allie Chen <yifangc...@google.com> > >>>> Date: Tue, May 14, 2019 at 7:38 AM > >>>> To: <d...@beam.apache.org> > >>>> Cc: user > >>>> > >>>>> Is it possible to use windowing or somehow pretend it is streaming > so Reshuffle or GroupByKey won't wait until all data has been read? > >>>>> > >>>>> Thanks! > >>>>> Allie > >>>>> > >>>>> From: Lukasz Cwik <lc...@google.com> > >>>>> Date: Fri, May 10, 2019 at 5:36 PM > >>>>> To: dev > >>>>> Cc: user > >>>>> > >>>>>> There is no such flag to turn of fusion. > >>>>>> > >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take > time when it is limited to a small number of workers. > >>>>>> > >>>>>> If you can split up your input into a lot of smaller files that are > compressed then you shouldn't need to use the reshuffle but still could if > you found it helped. > >>>>>> > >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yifangc...@google.com> > wrote: > >>>>>>> > >>>>>>> Re Lukasz: Thanks! I am not able to control the compression format > but I will see whether the splitting gzip files will work. Is there a > simple flag in Dataflow that could turn off the fusion? > >>>>>>> > >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the > GroupByKey and FlatMap in Reshuffle are very slow when the data is large. > Reshuffle itself is not parallel either. > >>>>>>> > >>>>>>> Thanks all, > >>>>>>> > >>>>>>> Allie > >>>>>>> > >>>>>>> From: Reuven Lax <re...@google.com> > >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM > >>>>>>> To: dev > >>>>>>> Cc: user > >>>>>>> > >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely > that simply reading and decompressing all that data was very slow when > there was no parallelism. > >>>>>>>> > >>>>>>>> From: Allie Chen <yifangc...@google.com> > >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM > >>>>>>>> To: <d...@beam.apache.org> > >>>>>>>> Cc: <user@beam.apache.org> > >>>>>>>> > >>>>>>>>> Yes, I do see the data after reshuffle are processed in > parallel. But Reshuffle transform itself takes hours or even days to run, > according to one test (24 gzip files, 17 million lines in total) I did. > >>>>>>>>> > >>>>>>>>> The file format for our users are mostly gzip format, since > uncompressed files would be too costly to store (It could be in hundreds of > GB). > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> > >>>>>>>>> Allie > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> From: Lukasz Cwik <lc...@google.com> > >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM > >>>>>>>>> To: dev, <user@beam.apache.org> > >>>>>>>>> > >>>>>>>>>> +user@beam.apache.org > >>>>>>>>>> > >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits > till all the data has been read before the next transforms can run. After > the reshuffle, the data should have been processed in parallel across the > workers. Did you see this? > >>>>>>>>>> > >>>>>>>>>> Are you able to change the input of your pipeline to use an > uncompressed file or many compressed files? > >>>>>>>>>> > >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen < > yifangc...@google.com> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. > Since the compressed file is not splittable, one worker is allocated to > read the file. The same worker will do all the other transforms since > Dataflow fused all transforms together. There are a large amount of data > in the file, and I expect to see more workers spinning up after reading > transforms. I tried to use Reshuffle Transform to prevent the fusion, but > it is not scalable since it won’t proceed until all data arrived at this > point. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Is there any other ways to allow more workers working on all > the other transforms after reading? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Allie > >>>>>>>>>>> > >>>>>>>>>>> >