Re: Problem with gzip

2019-05-15 Thread Allie Chen
Thanks Robert. Yes, reading is the bottleneck, and we cannot do much better for gzip files, that's why we would like to at least palatalize other transforms with reading. I tried with the side input to break the fusion you suggested earlier, and it does a much better job than using Reshuffle! One

Re: Problem with gzip

2019-05-15 Thread Robert Bradshaw
On Wed, May 15, 2019 at 8:43 PM Allie Chen wrote: > Thanks all for your reply. I will try each of them and see how it goes. > > The experiment I am working now is similar to > https://stackoverflow.com/questions/48886943/early-results-from-groupbykey-transform, > which tries to get early results

Re: Problem with gzip

2019-05-15 Thread Niels Basjes
Hi This project is a completely different solution towards this problem, but in the hadoop mapreduce context. https://github.com/nielsbasjes/splittablegzip I have used this a lot in the past. Perhaps porting this project to beam is an option? Niels Basjes On Tue, May 14, 2019, 20:45 Lukasz

Re: Problem with gzip

2019-05-15 Thread Robert Bradshaw
Interesting thread. Thanks for digging that up. I would try the shuffle_mode=service experiment (forgot that wasn't yet the default). If that doesn't do the trick, though avro as a materialization format does not provide perfect parallelism, it should be significantly better than what you have now

Re: Problem with gzip

2019-05-15 Thread Michael Luckey
@Robert Does your suggestion imply, that the points made by Eugene on BEAM-2803 do not apply (anymore) and the combined reshuffle could just be omitted? On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw wrote: > Unfortunately the "write" portion of the reshuffle cannot be > parallelized more than

Re: Problem with gzip

2019-05-15 Thread Robert Bradshaw
Unfortunately the "write" portion of the reshuffle cannot be parallelized more than the source that it's reading from. In my experience, generally the read is the bottleneck in this case, but it's possible (e.g. if the input compresses extremely well) that it is the write that is slow (which you se

Re: Problem with gzip

2019-05-14 Thread Lukasz Cwik
Sorry I couldn't be more helpful. *From: *Allie Chen *Date: *Tue, May 14, 2019 at 10:09 AM *To: * *Cc: *user Thank Lukasz. Unfortunately, decompressing the files is not an option for > us. > > > I am trying to speed up Reshuffle step, since it waits for all data. Here > are two ways I have trie

Re: Problem with gzip

2019-05-14 Thread Allie Chen
Thank Lukasz. Unfortunately, decompressing the files is not an option for us. I am trying to speed up Reshuffle step, since it waits for all data. Here are two ways I have tried: 1. add timestamps to the PCollection's elements after reading (since it is bounded source), then apply windowing bef

Re: Problem with gzip

2019-05-14 Thread Lukasz Cwik
Do you need to perform any joins across the files (e.g. Combine.perKey/GroupByKey/...)? If not, you could structure your pipeline ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB ReadFromFileC --> Reshuffle(optional) --> CopyOfPipel

Re: Problem with gzip

2019-05-14 Thread Allie Chen
Is it possible to use windowing or somehow pretend it is streaming so Reshuffle or GroupByKey won't wait until all data has been read? Thanks! Allie *From: *Lukasz Cwik *Date: *Fri, May 10, 2019 at 5:36 PM *To: *dev *Cc: *user There is no such flag to turn of fusion. > > Writing 100s of GiBs of

Re: Problem with gzip

2019-05-10 Thread Michael Luckey
Maybe the solution implemented on JdbcIO [1], [2] could be helpful in this cases. [1] https://issues.apache.org/jira/browse/BEAM-2803 [2] https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1088-L1118 On Fri, May 10, 2019 at 11:36 PM

Re: Problem with gzip

2019-05-10 Thread Lukasz Cwik
There is no such flag to turn of fusion. Writing 100s of GiBs of uncompressed data to reshuffle will take time when it is limited to a small number of workers. If you can split up your input into a lot of smaller files that are compressed then you shouldn't need to use the reshuffle but still cou

Re: Problem with gzip

2019-05-10 Thread Allie Chen
Re Lukasz: Thanks! I am not able to control the compression format but I will see whether the splitting gzip files will work. Is there a simple flag in Dataflow that could turn off the fusion? Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and FlatMap in Reshuffle are very sl

Re: Problem with gzip

2019-05-10 Thread Reuven Lax
It's unlikely that Reshuffle itself takes hours. It's more likely that simply reading and decompressing all that data was very slow when there was no parallelism. *From: *Allie Chen *Date: *Fri, May 10, 2019 at 1:17 PM *To: * *Cc: * Yes, I do see the data after reshuffle are processed in paral

Re: Problem with gzip

2019-05-10 Thread Lukasz Cwik
The best solution would be to find a compression format that is splittable and add support for that to Apache Beam and use it. The issue with compressed files is that you can't read from an arbitrary offset. This stack overflow post[1] has some suggestions on seekable compression libraries. A much

Re: Problem with gzip

2019-05-10 Thread Allie Chen
Yes, that is correct. *From: *Allie Chen *Date: *Fri, May 10, 2019 at 4:21 PM *To: * *Cc: * Yes. > > *From: *Lukasz Cwik > *Date: *Fri, May 10, 2019 at 4:19 PM > *To: *dev > *Cc: * > > When you had X gzip files and were not using Reshuffle, did you see X >> workers read and process the files

Re: Problem with gzip

2019-05-10 Thread Allie Chen
Yes. *From: *Lukasz Cwik *Date: *Fri, May 10, 2019 at 4:19 PM *To: *dev *Cc: * When you had X gzip files and were not using Reshuffle, did you see X > workers read and process the files? > > On Fri, May 10, 2019 at 1:17 PM Allie Chen wrote: > >> Yes, I do see the data after reshuffle are proce

Re: Problem with gzip

2019-05-10 Thread Allie Chen
Yes, I do see the data after reshuffle are processed in parallel. But Reshuffle transform itself takes hours or even days to run, according to one test (24 gzip files, 17 million lines in total) I did. The file format for our users are mostly gzip format, since uncompressed files would be too cost

Re: Problem with gzip

2019-05-10 Thread Lukasz Cwik
When you had X gzip files and were not using Reshuffle, did you see X workers read and process the files? On Fri, May 10, 2019 at 1:17 PM Allie Chen wrote: > Yes, I do see the data after reshuffle are processed in parallel. But > Reshuffle transform itself takes hours or even days to run, accord

Re: Problem with gzip

2019-05-10 Thread Lukasz Cwik
+u...@beam.apache.org Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all the data has been read before the next transforms can run. After the reshuffle, the data should have been processed in parallel across the workers. Did you see this? Are you able to change the input of