Maybe the solution implemented on JdbcIO [1], [2] could be helpful in this cases.
[1] https://issues.apache.org/jira/browse/BEAM-2803 [2] https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1088-L1118 On Fri, May 10, 2019 at 11:36 PM Lukasz Cwik <lc...@google.com> wrote: > There is no such flag to turn of fusion. > > Writing 100s of GiBs of uncompressed data to reshuffle will take time when > it is limited to a small number of workers. > > If you can split up your input into a lot of smaller files that are > compressed then you shouldn't need to use the reshuffle but still could if > you found it helped. > > On Fri, May 10, 2019 at 2:24 PM Allie Chen <yifangc...@google.com> wrote: > >> Re Lukasz: Thanks! I am not able to control the compression format but I >> will see whether the splitting gzip files will work. Is there a simple flag >> in Dataflow that could turn off the fusion? >> >> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and >> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself >> is not parallel either. >> >> Thanks all, >> >> Allie >> >> *From: *Reuven Lax <re...@google.com> >> *Date: *Fri, May 10, 2019 at 5:02 PM >> *To: *dev >> *Cc: *user >> >> It's unlikely that Reshuffle itself takes hours. It's more likely that >>> simply reading and decompressing all that data was very slow when there was >>> no parallelism. >>> >>> *From: *Allie Chen <yifangc...@google.com> >>> *Date: *Fri, May 10, 2019 at 1:17 PM >>> *To: * <dev@beam.apache.org> >>> *Cc: * <u...@beam.apache.org> >>> >>> Yes, I do see the data after reshuffle are processed in parallel. But >>>> Reshuffle transform itself takes hours or even days to run, according to >>>> one test (24 gzip files, 17 million lines in total) I did. >>>> >>>> The file format for our users are mostly gzip format, since >>>> uncompressed files would be too costly to store (It could be in hundreds of >>>> GB). >>>> >>>> Thanks, >>>> >>>> Allie >>>> >>>> >>>> *From: *Lukasz Cwik <lc...@google.com> >>>> *Date: *Fri, May 10, 2019 at 4:07 PM >>>> *To: *dev, <u...@beam.apache.org> >>>> >>>> +u...@beam.apache.org <u...@beam.apache.org> >>>>> >>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till >>>>> all the data has been read before the next transforms can run. After the >>>>> reshuffle, the data should have been processed in parallel across the >>>>> workers. Did you see this? >>>>> >>>>> Are you able to change the input of your pipeline to use an >>>>> uncompressed file or many compressed files? >>>>> >>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yifangc...@google.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> >>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the >>>>>> compressed file is not splittable, one worker is allocated to read the >>>>>> file. The same worker will do all the other transforms since Dataflow >>>>>> fused >>>>>> all transforms together. There are a large amount of data in the file, >>>>>> and >>>>>> I expect to see more workers spinning up after reading transforms. I >>>>>> tried >>>>>> to use Reshuffle Transform >>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516> >>>>>> to prevent the fusion, but it is not scalable since it won’t proceed >>>>>> until >>>>>> all data arrived at this point. >>>>>> >>>>>> Is there any other ways to allow more workers working on all the >>>>>> other transforms after reading? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Allie >>>>>> >>>>>>