I believe that the Total shuffle data process counter counts the number of
bytes written to shuffle + the number of bytes read. So if you shuffle 1GB
of data, you should expect to see 2GB on the counter.

On Wed, Sep 18, 2019 at 2:39 PM Shannon Duncan <joseph.dun...@liveramp.com>
wrote:

> Ok just ran the job on a small input and did not specify numShards. so
> it's literally just:
>
> .apply("WriteLines", TextIO.write().to(options.getOutput()));
>
> Output of map for join:
> [image: image.png]
>
> Details of Shuffle:
> [image: image.png]
>
> Reported Bytes Shuffled:
> [image: image.png]
>
>
> On Wed, Sep 18, 2019 at 4:24 PM Reuven Lax <re...@google.com> wrote:
>
>>
>>
>> On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> I will attempt to do without sharding (though I believe we did do a run
>>> without shards and it incurred the extra shuffle costs).
>>>
>>
>> It shouldn't. There will be a shuffle, but that shuffle should contain a
>> small amount of data (essentially a list of filenames).
>>
>>>
>>> Pipeline is simple.
>>>
>>> The only shuffle that is explicitly defined is the shuffle after merging
>>> files together into a single PCollection (Flatten Transform).
>>>
>>> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected
>>> to pay for shuffles on the middle shuffle but were surprised to see that
>>> the output data from the Flatten was quadrupled in the reflected shuffled
>>> GB shown in Dataflow. Which lead me down this path of finding things.
>>>
>>> [image: image.png]
>>>
>>> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> In that case you should be able to leave sharding unspecified, and you
>>>> won't incur the extra shuffle. Specifying explicit sharding is generally
>>>> necessary only for streaming.
>>>>
>>>> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <
>>>> joseph.dun...@liveramp.com> wrote:
>>>>
>>>>> batch on dataflowRunner.
>>>>>
>>>>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Are you using streaming or batch? Also which runner are you using?
>>>>>>
>>>>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>>
>>>>>>> So I followed up on why TextIO shuffles and dug into the code some.
>>>>>>> It is using the shards and getting all the values into a keyed group to
>>>>>>> write to a single file.
>>>>>>>
>>>>>>> However... I wonder if there is way to just take the records that
>>>>>>> are on a worker and write them out. Thus not needing a shard number and
>>>>>>> doing this. Closer to how hadoop handle's writes.
>>>>>>>
>>>>>>> Maybe just a regular pardo and on bundleSetup it creates a writer
>>>>>>> and processElement reuses that writter to write to the same file for all
>>>>>>> elements within a bundle?
>>>>>>>
>>>>>>> I feel like this goes beyond scope of simple user mailing list so
>>>>>>> I'm expanding it to dev as well.
>>>>>>> +dev <dev@beam.apache.org>
>>>>>>>
>>>>>>> Finding a solution that prevents quadrupling shuffle costs when
>>>>>>> simply writing out a file is a necessity for large scale jobs that work
>>>>>>> with 100+ TB of data. If anyone has any ideas I'd love to hear them.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Shannon Duncan
>>>>>>>
>>>>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>>>
>>>>>>>> We have been using Beam for a bit now. However we just turned on
>>>>>>>> the dataflow shuffle service and were very surprised that the shuffled 
>>>>>>>> data
>>>>>>>> amounts were quadruple the amounts we expected.
>>>>>>>>
>>>>>>>> Turns out that the file writing TextIO is doing shuffles within
>>>>>>>> itself.
>>>>>>>>
>>>>>>>> Is there a way to prevent shuffling in the writing phase?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Shannon Duncan
>>>>>>>>
>>>>>>>

Reply via email to