Thanks Robert. Yes, reading is the bottleneck, and we cannot do much better
for gzip files, that's why we would like to at least palatalize other
transforms with reading.
I tried with the side input to break the fusion you suggested earlier, and
it does a much better job than using Reshuffle! One
On Wed, May 15, 2019 at 8:43 PM Allie Chen wrote:
> Thanks all for your reply. I will try each of them and see how it goes.
>
> The experiment I am working now is similar to
> https://stackoverflow.com/questions/48886943/early-results-from-groupbykey-transform,
> which tries to get early results
Hi
This project is a completely different solution towards this problem, but
in the hadoop mapreduce context.
https://github.com/nielsbasjes/splittablegzip
I have used this a lot in the past.
Perhaps porting this project to beam is an option?
Niels Basjes
On Tue, May 14, 2019, 20:45 Lukasz
Interesting thread. Thanks for digging that up.
I would try the shuffle_mode=service experiment (forgot that wasn't
yet the default). If that doesn't do the trick, though avro as a
materialization format does not provide perfect parallelism, it should
be significantly better than what you have now
@Robert
Does your suggestion imply, that the points made by Eugene on BEAM-2803 do
not apply (anymore) and the combined reshuffle could just be omitted?
On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw wrote:
> Unfortunately the "write" portion of the reshuffle cannot be
> parallelized more than
Unfortunately the "write" portion of the reshuffle cannot be
parallelized more than the source that it's reading from. In my
experience, generally the read is the bottleneck in this case, but
it's possible (e.g. if the input compresses extremely well) that it is
the write that is slow (which you se
Sorry I couldn't be more helpful.
*From: *Allie Chen
*Date: *Tue, May 14, 2019 at 10:09 AM
*To: *
*Cc: *user
Thank Lukasz. Unfortunately, decompressing the files is not an option for
> us.
>
>
> I am trying to speed up Reshuffle step, since it waits for all data. Here
> are two ways I have trie
Thank Lukasz. Unfortunately, decompressing the files is not an option for
us.
I am trying to speed up Reshuffle step, since it waits for all data. Here
are two ways I have tried:
1. add timestamps to the PCollection's elements after reading (since it is
bounded source), then apply windowing bef
Do you need to perform any joins across the files (e.g.
Combine.perKey/GroupByKey/...)?
If not, you could structure your pipeline
ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
ReadFromFileC --> Reshuffle(optional) --> CopyOfPipel
Is it possible to use windowing or somehow pretend it is streaming so
Reshuffle or GroupByKey won't wait until all data has been read?
Thanks!
Allie
*From: *Lukasz Cwik
*Date: *Fri, May 10, 2019 at 5:36 PM
*To: *dev
*Cc: *user
There is no such flag to turn of fusion.
>
> Writing 100s of GiBs of
Maybe the solution implemented on JdbcIO [1], [2] could be helpful in this
cases.
[1] https://issues.apache.org/jira/browse/BEAM-2803
[2]
https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1088-L1118
On Fri, May 10, 2019 at 11:36 PM
There is no such flag to turn of fusion.
Writing 100s of GiBs of uncompressed data to reshuffle will take time when
it is limited to a small number of workers.
If you can split up your input into a lot of smaller files that are
compressed then you shouldn't need to use the reshuffle but still cou
Re Lukasz: Thanks! I am not able to control the compression format but I
will see whether the splitting gzip files will work. Is there a simple flag
in Dataflow that could turn off the fusion?
Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
FlatMap in Reshuffle are very sl
It's unlikely that Reshuffle itself takes hours. It's more likely that
simply reading and decompressing all that data was very slow when there was
no parallelism.
*From: *Allie Chen
*Date: *Fri, May 10, 2019 at 1:17 PM
*To: *
*Cc: *
Yes, I do see the data after reshuffle are processed in paral
The best solution would be to find a compression format that is splittable
and add support for that to Apache Beam and use it. The issue with
compressed files is that you can't read from an arbitrary offset. This
stack overflow post[1] has some suggestions on seekable compression
libraries.
A much
Yes, that is correct.
*From: *Allie Chen
*Date: *Fri, May 10, 2019 at 4:21 PM
*To: *
*Cc: *
Yes.
>
> *From: *Lukasz Cwik
> *Date: *Fri, May 10, 2019 at 4:19 PM
> *To: *dev
> *Cc: *
>
> When you had X gzip files and were not using Reshuffle, did you see X
>> workers read and process the files
Yes.
*From: *Lukasz Cwik
*Date: *Fri, May 10, 2019 at 4:19 PM
*To: *dev
*Cc: *
When you had X gzip files and were not using Reshuffle, did you see X
> workers read and process the files?
>
> On Fri, May 10, 2019 at 1:17 PM Allie Chen wrote:
>
>> Yes, I do see the data after reshuffle are proce
Yes, I do see the data after reshuffle are processed in parallel. But
Reshuffle transform itself takes hours or even days to run, according to
one test (24 gzip files, 17 million lines in total) I did.
The file format for our users are mostly gzip format, since uncompressed
files would be too cost
When you had X gzip files and were not using Reshuffle, did you see X
workers read and process the files?
On Fri, May 10, 2019 at 1:17 PM Allie Chen wrote:
> Yes, I do see the data after reshuffle are processed in parallel. But
> Reshuffle transform itself takes hours or even days to run, accord
+u...@beam.apache.org
Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
the data has been read before the next transforms can run. After the
reshuffle, the data should have been processed in parallel across the
workers. Did you see this?
Are you able to change the input of
20 matches
Mail list logo