Ok just ran the job on a small input and did not specify numShards. so it's literally just:
.apply("WriteLines", TextIO.write().to(options.getOutput())); Output of map for join: [image: image.png] Details of Shuffle: [image: image.png] Reported Bytes Shuffled: [image: image.png] On Wed, Sep 18, 2019 at 4:24 PM Reuven Lax <re...@google.com> wrote: > > > On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan <joseph.dun...@liveramp.com> > wrote: > >> I will attempt to do without sharding (though I believe we did do a run >> without shards and it incurred the extra shuffle costs). >> > > It shouldn't. There will be a shuffle, but that shuffle should contain a > small amount of data (essentially a list of filenames). > >> >> Pipeline is simple. >> >> The only shuffle that is explicitly defined is the shuffle after merging >> files together into a single PCollection (Flatten Transform). >> >> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected to >> pay for shuffles on the middle shuffle but were surprised to see that the >> output data from the Flatten was quadrupled in the reflected shuffled GB >> shown in Dataflow. Which lead me down this path of finding things. >> >> [image: image.png] >> >> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax <re...@google.com> wrote: >> >>> In that case you should be able to leave sharding unspecified, and you >>> won't incur the extra shuffle. Specifying explicit sharding is generally >>> necessary only for streaming. >>> >>> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan < >>> joseph.dun...@liveramp.com> wrote: >>> >>>> batch on dataflowRunner. >>>> >>>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> Are you using streaming or batch? Also which runner are you using? >>>>> >>>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan < >>>>> joseph.dun...@liveramp.com> wrote: >>>>> >>>>>> So I followed up on why TextIO shuffles and dug into the code some. >>>>>> It is using the shards and getting all the values into a keyed group to >>>>>> write to a single file. >>>>>> >>>>>> However... I wonder if there is way to just take the records that are >>>>>> on a worker and write them out. Thus not needing a shard number and doing >>>>>> this. Closer to how hadoop handle's writes. >>>>>> >>>>>> Maybe just a regular pardo and on bundleSetup it creates a writer and >>>>>> processElement reuses that writter to write to the same file for all >>>>>> elements within a bundle? >>>>>> >>>>>> I feel like this goes beyond scope of simple user mailing list so I'm >>>>>> expanding it to dev as well. >>>>>> +dev <dev@beam.apache.org> >>>>>> >>>>>> Finding a solution that prevents quadrupling shuffle costs when >>>>>> simply writing out a file is a necessity for large scale jobs that work >>>>>> with 100+ TB of data. If anyone has any ideas I'd love to hear them. >>>>>> >>>>>> Thanks, >>>>>> Shannon Duncan >>>>>> >>>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan < >>>>>> joseph.dun...@liveramp.com> wrote: >>>>>> >>>>>>> We have been using Beam for a bit now. However we just turned on the >>>>>>> dataflow shuffle service and were very surprised that the shuffled data >>>>>>> amounts were quadruple the amounts we expected. >>>>>>> >>>>>>> Turns out that the file writing TextIO is doing shuffles within >>>>>>> itself. >>>>>>> >>>>>>> Is there a way to prevent shuffling in the writing phase? >>>>>>> >>>>>>> Thanks, >>>>>>> Shannon Duncan >>>>>>> >>>>>>