I believe that the Total shuffle data process counter counts the number of bytes written to shuffle + the number of bytes read. So if you shuffle 1GB of data, you should expect to see 2GB on the counter.
On Wed, Sep 18, 2019 at 2:39 PM Shannon Duncan <joseph.dun...@liveramp.com> wrote: > Ok just ran the job on a small input and did not specify numShards. so > it's literally just: > > .apply("WriteLines", TextIO.write().to(options.getOutput())); > > Output of map for join: > [image: image.png] > > Details of Shuffle: > [image: image.png] > > Reported Bytes Shuffled: > [image: image.png] > > > On Wed, Sep 18, 2019 at 4:24 PM Reuven Lax <re...@google.com> wrote: > >> >> >> On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan < >> joseph.dun...@liveramp.com> wrote: >> >>> I will attempt to do without sharding (though I believe we did do a run >>> without shards and it incurred the extra shuffle costs). >>> >> >> It shouldn't. There will be a shuffle, but that shuffle should contain a >> small amount of data (essentially a list of filenames). >> >>> >>> Pipeline is simple. >>> >>> The only shuffle that is explicitly defined is the shuffle after merging >>> files together into a single PCollection (Flatten Transform). >>> >>> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected >>> to pay for shuffles on the middle shuffle but were surprised to see that >>> the output data from the Flatten was quadrupled in the reflected shuffled >>> GB shown in Dataflow. Which lead me down this path of finding things. >>> >>> [image: image.png] >>> >>> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax <re...@google.com> wrote: >>> >>>> In that case you should be able to leave sharding unspecified, and you >>>> won't incur the extra shuffle. Specifying explicit sharding is generally >>>> necessary only for streaming. >>>> >>>> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan < >>>> joseph.dun...@liveramp.com> wrote: >>>> >>>>> batch on dataflowRunner. >>>>> >>>>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> Are you using streaming or batch? Also which runner are you using? >>>>>> >>>>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan < >>>>>> joseph.dun...@liveramp.com> wrote: >>>>>> >>>>>>> So I followed up on why TextIO shuffles and dug into the code some. >>>>>>> It is using the shards and getting all the values into a keyed group to >>>>>>> write to a single file. >>>>>>> >>>>>>> However... I wonder if there is way to just take the records that >>>>>>> are on a worker and write them out. Thus not needing a shard number and >>>>>>> doing this. Closer to how hadoop handle's writes. >>>>>>> >>>>>>> Maybe just a regular pardo and on bundleSetup it creates a writer >>>>>>> and processElement reuses that writter to write to the same file for all >>>>>>> elements within a bundle? >>>>>>> >>>>>>> I feel like this goes beyond scope of simple user mailing list so >>>>>>> I'm expanding it to dev as well. >>>>>>> +dev <dev@beam.apache.org> >>>>>>> >>>>>>> Finding a solution that prevents quadrupling shuffle costs when >>>>>>> simply writing out a file is a necessity for large scale jobs that work >>>>>>> with 100+ TB of data. If anyone has any ideas I'd love to hear them. >>>>>>> >>>>>>> Thanks, >>>>>>> Shannon Duncan >>>>>>> >>>>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan < >>>>>>> joseph.dun...@liveramp.com> wrote: >>>>>>> >>>>>>>> We have been using Beam for a bit now. However we just turned on >>>>>>>> the dataflow shuffle service and were very surprised that the shuffled >>>>>>>> data >>>>>>>> amounts were quadruple the amounts we expected. >>>>>>>> >>>>>>>> Turns out that the file writing TextIO is doing shuffles within >>>>>>>> itself. >>>>>>>> >>>>>>>> Is there a way to prevent shuffling in the writing phase? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Shannon Duncan >>>>>>>> >>>>>>>