On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> maybe a little unrelated, but I think we definitely should not use random
> assignment of shard keys (RandomShardingFunction), at least for bounded
> workloads (seems to be fine for streaming workloads). Many batch runners
> simply recompute path in the computation DAG from the failed node
> (transform) to the root (source). In the case there is any non-determinism
> involved in the logic, then it can result in duplicates (as the 'previous'
> attempt might have ended in DAG path that was not affected by the fail).
> That addresses the option 2) of what Jozef have mentioned.
>
This is the reason we introduced "@RequiresStableInput".

When things were only Dataflow, we knew that each shuffle was a checkpoint,
so we inserted a Reshuffle after the random numbers were generated,
freezing them so it was safe for replay. Since other engines do not
checkpoint at every shuffle, we needed a way to communicate that this
checkpointing was required for correctness. I think we still have many IOs
that are written using Reshuffle instead of @RequiresStableInput, and I
don't know which runners process @RequiresStableInput properly.

By the way, I believe the SparkRunner explicitly calls materialize() after
a GBK specifically so that it gets correct results for IOs that rely on
Reshuffle. Has that changed?

I agree that we should minimize use of RequiresStableInput. It has a
significant cost, and the cost varies across runners. If we can use a
deterministic function, we should.

Kenn


>  Jan
> On 6/16/21 1:43 PM, Jozef Vilcek wrote:
>
>
>
> On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> In general, Beam only deals with keys and grouping by key. I think
>> expanding this idea to some more abstract notion of a sharding function
>> could make sense.
>>
>> For FileIO specifically, I wonder if you can use writeDynamic() to get
>> the behavior you are seeking.
>>
>
> The change in mind looks like this:
>
> https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18
>
> Dynamic Destinations in my mind is more towards the need for
> "partitioning" data (destination as directory level) or if one needs to
> handle groups of events differently, e.g. write some events in FormatA and
> others in FormatB.
> Shards are now used for distributing writes or bucketing of events within
> a particular destination group. More specifically, currently, each element
> is assigned `ShardedKey<Integer>` [1] before GBK operation. Sharded key is
> a compound of destination and assigned shard.
>
> Having said that, I might be able to use dynamic destination for this,
> possibly with the need of custom FileNaming, and set shards to be always 1.
> But it feels less natural than allowing the user to swap already present
> `RandomShardingFunction` [2] with something of his own choosing.
>
>
> [1]
> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
>
> [2]
> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856
>
> Kenn
>>
>> On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <tyso...@google.com>
>> wrote:
>>
>>> Adding sharding to the model may require a wider discussion than FileIO
>>> alone. I'm not entirely sure how wide, or if this has been proposed before,
>>> but IMO it warrants a design doc or proposal.
>>>
>>> A couple high level questions I can think of are,
>>>   - What runners support sharding?
>>>       * There will be some work in Dataflow required to support this but
>>> I'm not sure how much.
>>>   - What does sharding mean for streaming pipelines?
>>>
>>> A more nitty-detail question:
>>>   - How can this be achieved performantly? For example, if a shuffle is
>>> required to achieve a particular sharding constraint, should we
>>> allow transforms to declare they don't modify the sharding property (e.g.
>>> key preserving) which may allow a runner to avoid an additional shuffle if
>>> a preceding shuffle can guarantee the sharding requirements?
>>>
>>> Where X is the shuffle that could be avoided: input -> shuffle (key
>>> sharding fn A) -> transform1 (key preserving) -> transform 2 (key
>>> preserving) -> X -> fileio (key sharding fn A)
>>>
>>> On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>> wrote:
>>>
>>>> I would like to extend FileIO with possibility to specify a custom
>>>> sharding function:
>>>> https://issues.apache.org/jira/browse/BEAM-12493
>>>>
>>>> I have 2 use-cases for this:
>>>>
>>>>    1. I need to generate shards which are compatible with Hive
>>>>    bucketing and therefore need to decide shard assignment based on data
>>>>    fields of input element
>>>>    2. When running e.g. on Spark and job encounters kind of failure
>>>>    which cause a loss of some data from previous stages, Spark does issue
>>>>    recompute of necessary task in necessary stages to recover data. Because
>>>>    the shard assignment function is random as default, some data will end 
>>>> up
>>>>    in different shards and cause duplicates in the final output.
>>>>
>>>> Please let me know your thoughts in case you see a reason to not to add
>>>> such improvement.
>>>>
>>>> Thanks,
>>>> Jozef
>>>>
>>>

Reply via email to