On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský <je...@seznam.cz> wrote:
> Hi, > > maybe a little unrelated, but I think we definitely should not use random > assignment of shard keys (RandomShardingFunction), at least for bounded > workloads (seems to be fine for streaming workloads). Many batch runners > simply recompute path in the computation DAG from the failed node > (transform) to the root (source). In the case there is any non-determinism > involved in the logic, then it can result in duplicates (as the 'previous' > attempt might have ended in DAG path that was not affected by the fail). > That addresses the option 2) of what Jozef have mentioned. > This is the reason we introduced "@RequiresStableInput". When things were only Dataflow, we knew that each shuffle was a checkpoint, so we inserted a Reshuffle after the random numbers were generated, freezing them so it was safe for replay. Since other engines do not checkpoint at every shuffle, we needed a way to communicate that this checkpointing was required for correctness. I think we still have many IOs that are written using Reshuffle instead of @RequiresStableInput, and I don't know which runners process @RequiresStableInput properly. By the way, I believe the SparkRunner explicitly calls materialize() after a GBK specifically so that it gets correct results for IOs that rely on Reshuffle. Has that changed? I agree that we should minimize use of RequiresStableInput. It has a significant cost, and the cost varies across runners. If we can use a deterministic function, we should. Kenn > Jan > On 6/16/21 1:43 PM, Jozef Vilcek wrote: > > > > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <k...@apache.org> wrote: > >> In general, Beam only deals with keys and grouping by key. I think >> expanding this idea to some more abstract notion of a sharding function >> could make sense. >> >> For FileIO specifically, I wonder if you can use writeDynamic() to get >> the behavior you are seeking. >> > > The change in mind looks like this: > > https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18 > > Dynamic Destinations in my mind is more towards the need for > "partitioning" data (destination as directory level) or if one needs to > handle groups of events differently, e.g. write some events in FormatA and > others in FormatB. > Shards are now used for distributing writes or bucketing of events within > a particular destination group. More specifically, currently, each element > is assigned `ShardedKey<Integer>` [1] before GBK operation. Sharded key is > a compound of destination and assigned shard. > > Having said that, I might be able to use dynamic destination for this, > possibly with the need of custom FileNaming, and set shards to be always 1. > But it feels less natural than allowing the user to swap already present > `RandomShardingFunction` [2] with something of his own choosing. > > > [1] > https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java > > [2] > https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856 > > Kenn >> >> On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <tyso...@google.com> >> wrote: >> >>> Adding sharding to the model may require a wider discussion than FileIO >>> alone. I'm not entirely sure how wide, or if this has been proposed before, >>> but IMO it warrants a design doc or proposal. >>> >>> A couple high level questions I can think of are, >>> - What runners support sharding? >>> * There will be some work in Dataflow required to support this but >>> I'm not sure how much. >>> - What does sharding mean for streaming pipelines? >>> >>> A more nitty-detail question: >>> - How can this be achieved performantly? For example, if a shuffle is >>> required to achieve a particular sharding constraint, should we >>> allow transforms to declare they don't modify the sharding property (e.g. >>> key preserving) which may allow a runner to avoid an additional shuffle if >>> a preceding shuffle can guarantee the sharding requirements? >>> >>> Where X is the shuffle that could be avoided: input -> shuffle (key >>> sharding fn A) -> transform1 (key preserving) -> transform 2 (key >>> preserving) -> X -> fileio (key sharding fn A) >>> >>> On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <jozo.vil...@gmail.com> >>> wrote: >>> >>>> I would like to extend FileIO with possibility to specify a custom >>>> sharding function: >>>> https://issues.apache.org/jira/browse/BEAM-12493 >>>> >>>> I have 2 use-cases for this: >>>> >>>> 1. I need to generate shards which are compatible with Hive >>>> bucketing and therefore need to decide shard assignment based on data >>>> fields of input element >>>> 2. When running e.g. on Spark and job encounters kind of failure >>>> which cause a loss of some data from previous stages, Spark does issue >>>> recompute of necessary task in necessary stages to recover data. Because >>>> the shard assignment function is random as default, some data will end >>>> up >>>> in different shards and cause duplicates in the final output. >>>> >>>> Please let me know your thoughts in case you see a reason to not to add >>>> such improvement. >>>> >>>> Thanks, >>>> Jozef >>>> >>>