In general, Beam only deals with keys and grouping by key. I think
expanding this idea to some more abstract notion of a sharding function
could make sense.

For FileIO specifically, I wonder if you can use writeDynamic() to get the
behavior you are seeking.

Kenn

On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <tyso...@google.com> wrote:

> Adding sharding to the model may require a wider discussion than FileIO
> alone. I'm not entirely sure how wide, or if this has been proposed before,
> but IMO it warrants a design doc or proposal.
>
> A couple high level questions I can think of are,
>   - What runners support sharding?
>       * There will be some work in Dataflow required to support this but
> I'm not sure how much.
>   - What does sharding mean for streaming pipelines?
>
> A more nitty-detail question:
>   - How can this be achieved performantly? For example, if a shuffle is
> required to achieve a particular sharding constraint, should we
> allow transforms to declare they don't modify the sharding property (e.g.
> key preserving) which may allow a runner to avoid an additional shuffle if
> a preceding shuffle can guarantee the sharding requirements?
>
> Where X is the shuffle that could be avoided: input -> shuffle (key
> sharding fn A) -> transform1 (key preserving) -> transform 2 (key
> preserving) -> X -> fileio (key sharding fn A)
>
> On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
>
>> I would like to extend FileIO with possibility to specify a custom
>> sharding function:
>> https://issues.apache.org/jira/browse/BEAM-12493
>>
>> I have 2 use-cases for this:
>>
>>    1. I need to generate shards which are compatible with Hive bucketing
>>    and therefore need to decide shard assignment based on data fields of 
>> input
>>    element
>>    2. When running e.g. on Spark and job encounters kind of failure
>>    which cause a loss of some data from previous stages, Spark does issue
>>    recompute of necessary task in necessary stages to recover data. Because
>>    the shard assignment function is random as default, some data will end up
>>    in different shards and cause duplicates in the final output.
>>
>> Please let me know your thoughts in case you see a reason to not to add
>> such improvement.
>>
>> Thanks,
>> Jozef
>>
>

Reply via email to