Thank Lukasz. Unfortunately, decompressing the files is not an option for
us.


I am trying to speed up Reshuffle step, since it waits for all data. Here
are two ways I have tried:

1.  add timestamps to the PCollection's elements after reading (since it is
bounded source), then apply windowing before Reshuffle, but it still waits
all data.


2.  run the pipeline with --streaming flag, but it leads to an error:
Workflow failed. Causes: Expected custom source to have non-zero number of
splits. Also, I found in
https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
:

*DataflowRunner does not currently support the following Cloud Dataflow
specific features with Python streaming execution.*

   -

   *Streaming autoscaling*

I doubt whether this approach can solve my issue.


Thanks so much!

Allie

*From: *Lukasz Cwik <[email protected]>
*Date: *Tue, May 14, 2019 at 11:16 AM
*To: *dev
*Cc: *user

Do you need to perform any joins across the files (e.g.
> Combine.perKey/GroupByKey/...)?
> If not, you could structure your pipeline
> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
> and then run it as a batch pipeline.
>
> You can set --streaming=true on the pipeline and then it will run in a
> streaming mode but streaming prioritizes low latency and correctness on
> Google Cloud Dataflow so it will cost more to run your pipeline then in
> batch mode. It may make more sense to store the data uncompressed as it may
> be less expensive then paying the additional compute cost for streaming.
>
> *From: *Allie Chen <[email protected]>
> *Date: *Tue, May 14, 2019 at 7:38 AM
> *To: * <[email protected]>
> *Cc: *user
>
> Is it possible to use windowing or somehow pretend it is streaming so
>> Reshuffle or GroupByKey won't wait until all data has been read?
>>
>> Thanks!
>> Allie
>>
>> *From: *Lukasz Cwik <[email protected]>
>> *Date: *Fri, May 10, 2019 at 5:36 PM
>> *To: *dev
>> *Cc: *user
>>
>> There is no such flag to turn of fusion.
>>>
>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time
>>> when it is limited to a small number of workers.
>>>
>>> If you can split up your input into a lot of smaller files that are
>>> compressed then you shouldn't need to use the reshuffle but still could if
>>> you found it helped.
>>>
>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <[email protected]>
>>> wrote:
>>>
>>>> Re Lukasz: Thanks! I am not able to control the compression format but
>>>> I will see whether the splitting gzip files will work. Is there a simple
>>>> flag in Dataflow that could turn off the fusion?
>>>>
>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey
>>>> and FlatMap in Reshuffle are very slow when the data is large. Reshuffle
>>>> itself is not parallel either.
>>>>
>>>> Thanks all,
>>>>
>>>> Allie
>>>>
>>>> *From: *Reuven Lax <[email protected]>
>>>> *Date: *Fri, May 10, 2019 at 5:02 PM
>>>> *To: *dev
>>>> *Cc: *user
>>>>
>>>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>>>> simply reading and decompressing all that data was very slow when there 
>>>>> was
>>>>> no parallelism.
>>>>>
>>>>> *From: *Allie Chen <[email protected]>
>>>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>>>> *To: * <[email protected]>
>>>>> *Cc: * <[email protected]>
>>>>>
>>>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>>>>> Reshuffle transform itself takes hours or even days to run, according to
>>>>>> one test (24 gzip files, 17 million lines in total) I did.
>>>>>>
>>>>>> The file format for our users are mostly gzip format, since
>>>>>> uncompressed files would be too costly to store (It could be in hundreds 
>>>>>> of
>>>>>> GB).
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Allie
>>>>>>
>>>>>>
>>>>>> *From: *Lukasz Cwik <[email protected]>
>>>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>>>> *To: *dev, <[email protected]>
>>>>>>
>>>>>> [email protected] <[email protected]>
>>>>>>>
>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>>>>>>> all the data has been read before the next transforms can run. After the
>>>>>>> reshuffle, the data should have been processed in parallel across the
>>>>>>> workers. Did you see this?
>>>>>>>
>>>>>>> Are you able to change the input of your pipeline to use an
>>>>>>> uncompressed file or many compressed files?
>>>>>>>
>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>>
>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since
>>>>>>>> the compressed file is not splittable, one worker is allocated to read 
>>>>>>>> the
>>>>>>>> file. The same worker will do all the other transforms since Dataflow 
>>>>>>>> fused
>>>>>>>> all transforms together.  There are a large amount of data in the 
>>>>>>>> file, and
>>>>>>>> I expect to see more workers spinning up after reading transforms. I 
>>>>>>>> tried
>>>>>>>> to use Reshuffle Transform
>>>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed 
>>>>>>>> until
>>>>>>>> all data arrived at this point.
>>>>>>>>
>>>>>>>> Is there any other ways to allow more workers working on all the
>>>>>>>> other transforms after reading?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Allie
>>>>>>>>
>>>>>>>>

Reply via email to