I think that the support for @RequiresStableInput is rather limited. AFAIK it is supported by streaming Flink (where it is not needed in this situation) and by Dataflow. Batch runners without external shuffle service that works as in the case of Dataflow have IMO no way to implement it correctly. In the case of FileIO (which do not use @RequiresStableInput as it would not be supported on Spark) the randomness is easily avoidable (hashCode of key?) and I it seems to me it should be preferred.

 Jan

On 6/16/21 6:23 PM, Kenneth Knowles wrote:

On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi,

    maybe a little unrelated, but I think we definitely should not use
    random assignment of shard keys (RandomShardingFunction), at least
    for bounded workloads (seems to be fine for streaming workloads).
    Many batch runners simply recompute path in the computation DAG
    from the failed node (transform) to the root (source). In the case
    there is any non-determinism involved in the logic, then it can
    result in duplicates (as the 'previous' attempt might have ended
    in DAG path that was not affected by the fail). That addresses the
    option 2) of what Jozef have mentioned.

This is the reason we introduced "@RequiresStableInput".

When things were only Dataflow, we knew that each shuffle was a checkpoint, so we inserted a Reshuffle after the random numbers were generated, freezing them so it was safe for replay. Since other engines do not checkpoint at every shuffle, we needed a way to communicate that this checkpointing was required for correctness. I think we still have many IOs that are written using Reshuffle instead of @RequiresStableInput, and I don't know which runners process @RequiresStableInput properly.

By the way, I believe the SparkRunner explicitly calls materialize() after a GBK specifically so that it gets correct results for IOs that rely on Reshuffle. Has that changed?

I agree that we should minimize use of RequiresStableInput. It has a significant cost, and the cost varies across runners. If we can use a deterministic function, we should.

Kenn

     Jan

    On 6/16/21 1:43 PM, Jozef Vilcek wrote:


    On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <k...@apache.org
    <mailto:k...@apache.org>> wrote:

        In general, Beam only deals with keys and grouping by key. I
        think expanding this idea to some more abstract notion of a
        sharding function could make sense.

        For FileIO specifically, I wonder if you can use
        writeDynamic() to get the behavior you are seeking.


    The change in mind looks like this:
    
https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18
    
<https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18>

    Dynamic Destinations in my mind is more towards the need for
    "partitioning" data (destination as directory level) or if one
    needs to handle groups of events differently, e.g. write some
    events in FormatA and others in FormatB.
    Shards are now used for distributing writes or bucketing of
    events within a particular destination group. More specifically,
    currently, each element is assigned `ShardedKey<Integer>` [1]
    before GBK operation. Sharded key is a compound of destination
    and assigned shard.

    Having said that, I might be able to use dynamic destination for
    this, possibly with the need of custom FileNaming, and set shards
    to be always 1. But it feels less natural than allowing the user
    to swap already present `RandomShardingFunction` [2] with
    something of his own choosing.


    [1]
    
https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
    
<https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java>

    [2]
    
https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856
    
<https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856>

        Kenn

        On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton
        <tyso...@google.com <mailto:tyso...@google.com>> wrote:

            Adding sharding to the model may require a wider
            discussion than FileIO alone. I'm not entirely sure how
            wide, or if this has been proposed before, but IMO it
            warrants a design doc or proposal.

            A couple high level questions I can think of are,
              - What runners support sharding?
                  * There will be some work in Dataflow required to
            support this but I'm not sure how much.
              - What does sharding mean for streaming pipelines?

            A more nitty-detail question:
              - How can this be achieved performantly? For example,
            if a shuffle is required to achieve a particular sharding
            constraint, should we allow transforms to declare
            they don't modify the sharding property (e.g. key
            preserving) which may allow a runner to avoid an
            additional shuffle if a preceding shuffle can guarantee
            the sharding requirements?

            Where X is the shuffle that could be avoided: input ->
            shuffle (key sharding fn A) -> transform1 (key
            preserving) -> transform 2 (key preserving) -> X ->
            fileio (key sharding fn A)

            On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek
            <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:

                I would like to extend FileIO with possibility to
                specify a custom sharding function:
                https://issues.apache.org/jira/browse/BEAM-12493
                <https://issues.apache.org/jira/browse/BEAM-12493>

                I have 2 use-cases for this:

                 1. I need to generate shards which are compatible
                    with Hive bucketing and therefore need to decide
                    shard assignment based on data fields of input
                    element
                 2. When running e.g. on Spark and job encounters
                    kind of failure which cause a loss of some data
                    from previous stages, Spark does issue recompute
                    of necessary task in necessary stages to recover
                    data. Because the shard assignment function is
                    random as default, some data will end up in
                    different shards and cause duplicates in the
                    final output.

                Please let me know your thoughts in case you see a
                reason to not to add such improvement.

                Thanks,
                Jozef

Reply via email to