Ok just ran the job on a small input and did not specify numShards. so it's
literally just:

.apply("WriteLines", TextIO.write().to(options.getOutput()));

Output of map for join:
[image: image.png]

Details of Shuffle:
[image: image.png]

Reported Bytes Shuffled:
[image: image.png]


On Wed, Sep 18, 2019 at 4:24 PM Reuven Lax <re...@google.com> wrote:

>
>
> On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan <joseph.dun...@liveramp.com>
> wrote:
>
>> I will attempt to do without sharding (though I believe we did do a run
>> without shards and it incurred the extra shuffle costs).
>>
>
> It shouldn't. There will be a shuffle, but that shuffle should contain a
> small amount of data (essentially a list of filenames).
>
>>
>> Pipeline is simple.
>>
>> The only shuffle that is explicitly defined is the shuffle after merging
>> files together into a single PCollection (Flatten Transform).
>>
>> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected to
>> pay for shuffles on the middle shuffle but were surprised to see that the
>> output data from the Flatten was quadrupled in the reflected shuffled GB
>> shown in Dataflow. Which lead me down this path of finding things.
>>
>> [image: image.png]
>>
>> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax <re...@google.com> wrote:
>>
>>> In that case you should be able to leave sharding unspecified, and you
>>> won't incur the extra shuffle. Specifying explicit sharding is generally
>>> necessary only for streaming.
>>>
>>> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
>>>> batch on dataflowRunner.
>>>>
>>>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Are you using streaming or batch? Also which runner are you using?
>>>>>
>>>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>
>>>>>> So I followed up on why TextIO shuffles and dug into the code some.
>>>>>> It is using the shards and getting all the values into a keyed group to
>>>>>> write to a single file.
>>>>>>
>>>>>> However... I wonder if there is way to just take the records that are
>>>>>> on a worker and write them out. Thus not needing a shard number and doing
>>>>>> this. Closer to how hadoop handle's writes.
>>>>>>
>>>>>> Maybe just a regular pardo and on bundleSetup it creates a writer and
>>>>>> processElement reuses that writter to write to the same file for all
>>>>>> elements within a bundle?
>>>>>>
>>>>>> I feel like this goes beyond scope of simple user mailing list so I'm
>>>>>> expanding it to dev as well.
>>>>>> +dev <dev@beam.apache.org>
>>>>>>
>>>>>> Finding a solution that prevents quadrupling shuffle costs when
>>>>>> simply writing out a file is a necessity for large scale jobs that work
>>>>>> with 100+ TB of data. If anyone has any ideas I'd love to hear them.
>>>>>>
>>>>>> Thanks,
>>>>>> Shannon Duncan
>>>>>>
>>>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>>
>>>>>>> We have been using Beam for a bit now. However we just turned on the
>>>>>>> dataflow shuffle service and were very surprised that the shuffled data
>>>>>>> amounts were quadruple the amounts we expected.
>>>>>>>
>>>>>>> Turns out that the file writing TextIO is doing shuffles within
>>>>>>> itself.
>>>>>>>
>>>>>>> Is there a way to prevent shuffling in the writing phase?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Shannon Duncan
>>>>>>>
>>>>>>

Reply via email to