Yes we do have a use case for specifying number of shards, but
unfortunately I can't share it with the group.

Shannon

On Fri, Sep 27, 2019 at 2:14 PM Reuven Lax <[email protected]> wrote:

> Is there a reason that you need to explicitly specify the number of
> shards? If you don't, then this extra shuffle will not be performed.
>
> Reuven
>
> On Fri, Sep 27, 2019 at 12:12 PM Shannon Duncan <
> [email protected]> wrote:
>
>> Interesting. Right now we are only doing batch processing so I hadn't
>> thought about the windowing aspect.
>>
>> On Fri, Sep 27, 2019 at 12:10 PM Reuven Lax <[email protected]> wrote:
>>
>>> Are you doing this in streaming with windowed writes? Window grouping
>>> does not "happen" in Beam until a GroupByKey, so you do need the GroupByKey
>>> in that case.
>>>
>>> If you are not windowing but want a specific number of shards (though
>>> the general suggestion in that case is to not pick a specific number of
>>> shards, but let the runner pick it for you), your approach could work.
>>> However the implementation would be more complicated than you suggest. The
>>> problem is that every file writer has a buffer, and when you force many of
>>> them to be in memory in a map you risk running out of memory. If you look
>>> at the spilledFiles code in WriteFiles.java, it was written to handle
>>> exactly this case.
>>>
>>> Reuven
>>>
>>> On Fri, Sep 27, 2019 at 8:47 AM Shannon Duncan <
>>> [email protected]> wrote:
>>>
>>>> Yes, Specifically TextIO withNumShards().
>>>>
>>>> On Fri, Sep 27, 2019 at 10:45 AM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> I'm not sure what you mean by "write out ot a specific shard number."
>>>>> Are you talking about FIleIO sinks?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Fri, Sep 27, 2019 at 7:41 AM Shannon Duncan <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> So when beam writes out to a specific shard number, as I understand
>>>>>> it does a few things:
>>>>>>
>>>>>> - Assigns a shard key to each record (reduces parallelism)
>>>>>> - Shuffles and Groups by the shard key to colocate all records
>>>>>> - Writes out to each shard file within a single DoFn per key...
>>>>>>
>>>>>> When thinking about this, I believe we might be able to eliminate the
>>>>>> GroupByKey to go ahead and write out to each file with its records with
>>>>>> only a DoFn after the shard key is assigned.
>>>>>>
>>>>>> As long as the shard key is the actual key of the PCollection, then
>>>>>> could we use a state variable to force all keys that are the same to
>>>>>> process to share state with each other?
>>>>>>
>>>>>> On a DoFn can we use the setup to hold a Map of files being written
>>>>>> to within bundles on that instance, and on teardown can we close all 
>>>>>> files
>>>>>> within the map?
>>>>>>
>>>>>> If this is the case does it reduce the need for a shuffle and allow a
>>>>>> DoFn to safely write out in append mode to a file, batch, etc held in
>>>>>> state?
>>>>>>
>>>>>> It doesn't really decrease parallelism after the key is assigned
>>>>>> since it can parallelize over each key within its state window. Which is
>>>>>> the same level of parallelism we achieve by doing a GroupByKey and doing 
>>>>>> a
>>>>>> for loop over the result. So performance shouldn't be impacted if this
>>>>>> holds true.
>>>>>>
>>>>>> It's kind of like combining both the shuffle and the data write in
>>>>>> the same step?
>>>>>>
>>>>>> This does however have a significant cost reduction by eliminating a
>>>>>> compute based shuffle and also eliminating a Dataflow shuffle service 
>>>>>> call
>>>>>> if shuffle service is enabled.
>>>>>>
>>>>>> Thoughts?
>>>>>>
>>>>>> Thanks,
>>>>>> Shannon Duncan
>>>>>>
>>>>>

Reply via email to