Unfortunately the "write" portion of the reshuffle cannot be
parallelized more than the source that it's reading from. In my
experience, generally the read is the bottleneck in this case, but
it's possible (e.g. if the input compresses extremely well) that it is
the write that is slow (which you seem to indicate based on your
observation of the UI, right?).

It could be that materializing to temporary files is cheaper than
materializing randomly to shuffle (especially on pre-portable Python).
In that case you could force a fusion break with a side input instead.
E.g.

class FusionBreak(beam.PTransform):
    def expand(self, pcoll):
        # Create an empty PCollection that depends on pcoll.
        empty = pcoll | beam.FlatMap(lambda x: ())
        # Use this empty PCollection as a side input, which will force
a fusion break.
        return pcoll | beam.Map(lambda x, unused: x,
beam.pvalue.AsIterable(empty))

which could be used in place of Reshard like

    p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...

You'll probably want to be sure to pass the use_fastavro experiment as well.

On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
>
> Hi
>
> This project is a completely different solution towards this problem, but in 
> the hadoop mapreduce context.
>
> https://github.com/nielsbasjes/splittablegzip
>
>
> I have used this a lot in the past.
> Perhaps porting this project to beam is an option?
>
> Niels Basjes
>
>
>
> On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
>>
>> Sorry I couldn't be more helpful.
>>
>> From: Allie Chen <yifangc...@google.com>
>> Date: Tue, May 14, 2019 at 10:09 AM
>> To: <d...@beam.apache.org>
>> Cc: user
>>
>>> Thank Lukasz. Unfortunately, decompressing the files is not an option for 
>>> us.
>>>
>>>
>>> I am trying to speed up Reshuffle step, since it waits for all data. Here 
>>> are two ways I have tried:
>>>
>>> 1.  add timestamps to the PCollection's elements after reading (since it is 
>>> bounded source), then apply windowing before Reshuffle, but it still waits 
>>> all data.
>>>
>>>
>>> 2.  run the pipeline with --streaming flag, but it leads to an error: 
>>> Workflow failed. Causes: Expected custom source to have non-zero number of 
>>> splits. Also, I found in 
>>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features:
>>>
>>> DataflowRunner does not currently support the following Cloud Dataflow 
>>> specific features with Python streaming execution.
>>>
>>> Streaming autoscaling
>>>
>>> I doubt whether this approach can solve my issue.
>>>
>>>
>>> Thanks so much!
>>>
>>> Allie
>>>
>>>
>>> From: Lukasz Cwik <lc...@google.com>
>>> Date: Tue, May 14, 2019 at 11:16 AM
>>> To: dev
>>> Cc: user
>>>
>>>> Do you need to perform any joins across the files (e.g. 
>>>> Combine.perKey/GroupByKey/...)?
>>>> If not, you could structure your pipeline
>>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>>>> and then run it as a batch pipeline.
>>>>
>>>> You can set --streaming=true on the pipeline and then it will run in a 
>>>> streaming mode but streaming prioritizes low latency and correctness on 
>>>> Google Cloud Dataflow so it will cost more to run your pipeline then in 
>>>> batch mode. It may make more sense to store the data uncompressed as it 
>>>> may be less expensive then paying the additional compute cost for 
>>>> streaming.
>>>>
>>>> From: Allie Chen <yifangc...@google.com>
>>>> Date: Tue, May 14, 2019 at 7:38 AM
>>>> To: <d...@beam.apache.org>
>>>> Cc: user
>>>>
>>>>> Is it possible to use windowing or somehow pretend it is streaming so 
>>>>> Reshuffle or GroupByKey won't wait until all data has been read?
>>>>>
>>>>> Thanks!
>>>>> Allie
>>>>>
>>>>> From: Lukasz Cwik <lc...@google.com>
>>>>> Date: Fri, May 10, 2019 at 5:36 PM
>>>>> To: dev
>>>>> Cc: user
>>>>>
>>>>>> There is no such flag to turn of fusion.
>>>>>>
>>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time 
>>>>>> when it is limited to a small number of workers.
>>>>>>
>>>>>> If you can split up your input into a lot of smaller files that are 
>>>>>> compressed then you shouldn't need to use the reshuffle but still could 
>>>>>> if you found it helped.
>>>>>>
>>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yifangc...@google.com> wrote:
>>>>>>>
>>>>>>> Re Lukasz: Thanks! I am not able to control the compression format but 
>>>>>>> I will see whether the splitting gzip files will work. Is there a 
>>>>>>> simple flag in Dataflow that could turn off the fusion?
>>>>>>>
>>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey 
>>>>>>> and FlatMap in Reshuffle are very slow when the data is large. 
>>>>>>> Reshuffle itself is not parallel either.
>>>>>>>
>>>>>>> Thanks all,
>>>>>>>
>>>>>>> Allie
>>>>>>>
>>>>>>> From: Reuven Lax <re...@google.com>
>>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
>>>>>>> To: dev
>>>>>>> Cc: user
>>>>>>>
>>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely that 
>>>>>>>> simply reading and decompressing all that data was very slow when 
>>>>>>>> there was no parallelism.
>>>>>>>>
>>>>>>>> From: Allie Chen <yifangc...@google.com>
>>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
>>>>>>>> To: <d...@beam.apache.org>
>>>>>>>> Cc: <user@beam.apache.org>
>>>>>>>>
>>>>>>>>> Yes, I do see the data after reshuffle are processed in parallel. But 
>>>>>>>>> Reshuffle transform itself takes hours or even days to run, according 
>>>>>>>>> to one test (24 gzip files, 17 million lines in total) I did.
>>>>>>>>>
>>>>>>>>> The file format for our users are mostly gzip format, since 
>>>>>>>>> uncompressed files would be too costly to store (It could be in 
>>>>>>>>> hundreds of GB).
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Allie
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> From: Lukasz Cwik <lc...@google.com>
>>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
>>>>>>>>> To: dev, <user@beam.apache.org>
>>>>>>>>>
>>>>>>>>>> +user@beam.apache.org
>>>>>>>>>>
>>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till 
>>>>>>>>>> all the data has been read before the next transforms can run. After 
>>>>>>>>>> the reshuffle, the data should have been processed in parallel 
>>>>>>>>>> across the workers. Did you see this?
>>>>>>>>>>
>>>>>>>>>> Are you able to change the input of your pipeline to use an 
>>>>>>>>>> uncompressed file or many compressed files?
>>>>>>>>>>
>>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yifangc...@google.com> 
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since 
>>>>>>>>>>> the compressed file is not splittable, one worker is allocated to 
>>>>>>>>>>> read the file. The same worker will do all the other transforms 
>>>>>>>>>>> since Dataflow fused all transforms together.  There are a large 
>>>>>>>>>>> amount of data in the file, and I expect to see more workers 
>>>>>>>>>>> spinning up after reading transforms. I tried to use Reshuffle 
>>>>>>>>>>> Transform to prevent the fusion, but it is not scalable since it 
>>>>>>>>>>> won’t proceed until all data arrived at this point.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Is there any other ways to allow more workers working on all the 
>>>>>>>>>>> other transforms after reading?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Allie
>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to