I'm not sure I understand your PR. How is this PR different than the by() method in FileIO?
On Tue, Jun 22, 2021 at 1:22 AM Jozef Vilcek <[email protected]> wrote: > MR for review for this change is here: > https://github.com/apache/beam/pull/15051 > > On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek <[email protected]> > wrote: > >> I would like this thread to stay focused on sharding FileIO only. >> Possible change to the model is an interesting topic but of a much >> different scope. >> >> Yes, I agree that sharding is mostly a physical rather than logical >> property of the pipeline. That is why it feels more natural to distinguish >> between those two on the API level. >> As for handling sharding requirements by adding more sugar to dynamic >> destinations + file naming one has to keep in mind that results of dynamic >> writes can be observed in the form of KV<DestinationT, String>, so written >> files per dynamic destination. Often we do GBP to post-process files per >> destination / logical group. If sharding would be encoded there, then it >> might complicate things either downstream or inside the sugar part to put >> shard in and then take it out later. >> From the user perspective I do not see much difference. We would still >> need to allow API to define both behaviors and it would only be executed >> differently by implementation. >> I do not see a value in changing FileIO (WriteFiles) logic to stop using >> sharding and use dynamic destination for both given that sharding function >> is already there and in use. >> >> To the point of external shuffle and non-deterministic user input. >> Yes users can create non-deterministic behaviors but they are in control. >> Here, Beam internally adds non-deterministic behavior and users can not >> opt-out. >> All works fine as long as external shuffle service (depends on Runner) >> holds to the data and hands it out on retries. However if data in shuffle >> service is lost for some reason - e.g. disk failure, node breaks down - >> then pipeline have to recover the data by recomputing necessary paths. >> >> On Thu, Jun 17, 2021 at 7:36 PM Robert Bradshaw <[email protected]> >> wrote: >> >>> Sharding is typically a physical rather than logical property of the >>> pipeline, and I'm not convinced it makes sense to add it to Beam in >>> general. One can already use keys and GBK/Stateful DoFns if some kind >>> of logical grouping is needed, and adding constraints like this can >>> prevent opportunities for optimizations (like dynamic sharding and >>> fusion). >>> >>> That being said, file output are one area where it could make sense. I >>> would expect that dynamic destinations could cover this usecase, and a >>> general FileNaming subclass could be provided to make this pattern >>> easier (and possibly some syntactic sugar for auto-setting num shards >>> to 0). (One downside of this approach is that one couldn't do dynamic >>> destinations, and have each sharded with a distinct sharing function >>> as well.) >>> >>> If this doesn't work, we could look into adding ShardingFunction as a >>> publicly exposed parameter to FileIO. (I'm actually surprised it >>> already exists.) >>> >>> On Thu, Jun 17, 2021 at 9:39 AM <[email protected]> wrote: >>> > >>> > Alright, but what is worth emphasizing is that we talk about batch >>> workloads. The typical scenario is that the output is committed once the >>> job finishes (e.g., by atomic rename of directory). >>> > Jan >>> > >>> > Dne 17. 6. 2021 17:59 napsal uživatel Reuven Lax <[email protected]>: >>> > >>> > Yes - the problem is that Beam makes no guarantees of determinism >>> anywhere in the system. User DoFns might be non deterministic, and have no >>> way to know (we've discussed proving users with an @IsDeterministic >>> annotation, however empirically users often think their functions are >>> deterministic when they are in fact not). _Any_ sort of triggered >>> aggregation (including watermark based) can always be non deterministic. >>> > >>> > Even if everything was deterministic, replaying everything is tricky. >>> The output files already exist - should the system delete them and recreate >>> them, or leave them as is and delete the temp files? Either decision could >>> be problematic. >>> > >>> > On Wed, Jun 16, 2021 at 11:40 PM Jan Lukavský <[email protected]> wrote: >>> > >>> > Correct, by the external shuffle service I pretty much meant >>> "offloading the contents of a shuffle phase out of the system". Looks like >>> that is what the Spark's checkpoint does as well. On the other hand (if I >>> understand the concept correctly), that implies some performance penalty - >>> the data has to be moved to external distributed filesystem. Which then >>> feels weird if we optimize code to avoid computing random numbers, but are >>> okay with moving complete datasets back and forth. I think in this >>> particular case making the Pipeline deterministic - idempotent to be >>> precise - (within the limits, yes, hashCode of enum is not stable between >>> JVMs) would seem more practical to me. >>> > >>> > Jan >>> > >>> > On 6/17/21 7:09 AM, Reuven Lax wrote: >>> > >>> > I have some thoughts here, as Eugene Kirpichov and I spent a lot of >>> time working through these semantics in the past. >>> > >>> > First - about the problem of duplicates: >>> > >>> > A "deterministic" sharding - e.g. hashCode based (though Java makes no >>> guarantee that hashCode is stable across JVM instances, so this technique >>> ends up not being stable) doesn't really help matters in Beam. Unlike other >>> systems, Beam makes no assumptions that transforms are idempotent or >>> deterministic. What's more, _any_ transform that has any sort of triggered >>> grouping (whether the trigger used is watermark based or otherwise) is non >>> deterministic. >>> > >>> > Forcing a hash of every element imposed quite a CPU cost; even >>> generating a random number per-element slowed things down too much, which >>> is why the current code generates a random number only in startBundle. >>> > >>> > Any runner that does not implement RequiresStableInput will not >>> properly execute FileIO. Dataflow and Flink both support this. I believe >>> that the Spark runner implicitly supports it by manually calling checkpoint >>> as Ken mentioned (unless someone removed that from the Spark runner, but if >>> so that would be a correctness regression). Implementing this has nothing >>> to do with external shuffle services - neither Flink, Spark, nor Dataflow >>> appliance (classic shuffle) have any problem correctly implementing >>> RequiresStableInput. >>> > >>> > On Wed, Jun 16, 2021 at 11:18 AM Jan Lukavský <[email protected]> wrote: >>> > >>> > I think that the support for @RequiresStableInput is rather limited. >>> AFAIK it is supported by streaming Flink (where it is not needed in this >>> situation) and by Dataflow. Batch runners without external shuffle service >>> that works as in the case of Dataflow have IMO no way to implement it >>> correctly. In the case of FileIO (which do not use @RequiresStableInput as >>> it would not be supported on Spark) the randomness is easily avoidable >>> (hashCode of key?) and I it seems to me it should be preferred. >>> > >>> > Jan >>> > >>> > On 6/16/21 6:23 PM, Kenneth Knowles wrote: >>> > >>> > >>> > On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský <[email protected]> wrote: >>> > >>> > Hi, >>> > >>> > maybe a little unrelated, but I think we definitely should not use >>> random assignment of shard keys (RandomShardingFunction), at least for >>> bounded workloads (seems to be fine for streaming workloads). Many batch >>> runners simply recompute path in the computation DAG from the failed node >>> (transform) to the root (source). In the case there is any non-determinism >>> involved in the logic, then it can result in duplicates (as the 'previous' >>> attempt might have ended in DAG path that was not affected by the fail). >>> That addresses the option 2) of what Jozef have mentioned. >>> > >>> > This is the reason we introduced "@RequiresStableInput". >>> > >>> > When things were only Dataflow, we knew that each shuffle was a >>> checkpoint, so we inserted a Reshuffle after the random numbers were >>> generated, freezing them so it was safe for replay. Since other engines do >>> not checkpoint at every shuffle, we needed a way to communicate that this >>> checkpointing was required for correctness. I think we still have many IOs >>> that are written using Reshuffle instead of @RequiresStableInput, and I >>> don't know which runners process @RequiresStableInput properly. >>> > >>> > By the way, I believe the SparkRunner explicitly calls materialize() >>> after a GBK specifically so that it gets correct results for IOs that rely >>> on Reshuffle. Has that changed? >>> > >>> > I agree that we should minimize use of RequiresStableInput. It has a >>> significant cost, and the cost varies across runners. If we can use a >>> deterministic function, we should. >>> > >>> > Kenn >>> > >>> > >>> > Jan >>> > >>> > On 6/16/21 1:43 PM, Jozef Vilcek wrote: >>> > >>> > >>> > >>> > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <[email protected]> >>> wrote: >>> > >>> > In general, Beam only deals with keys and grouping by key. I think >>> expanding this idea to some more abstract notion of a sharding function >>> could make sense. >>> > >>> > For FileIO specifically, I wonder if you can use writeDynamic() to get >>> the behavior you are seeking. >>> > >>> > >>> > The change in mind looks like this: >>> > >>> https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18 >>> > >>> > Dynamic Destinations in my mind is more towards the need for >>> "partitioning" data (destination as directory level) or if one needs to >>> handle groups of events differently, e.g. write some events in FormatA and >>> others in FormatB. >>> > Shards are now used for distributing writes or bucketing of events >>> within a particular destination group. More specifically, currently, each >>> element is assigned `ShardedKey<Integer>` [1] before GBK operation. Sharded >>> key is a compound of destination and assigned shard. >>> > >>> > Having said that, I might be able to use dynamic destination for this, >>> possibly with the need of custom FileNaming, and set shards to be always 1. >>> But it feels less natural than allowing the user to swap already present >>> `RandomShardingFunction` [2] with something of his own choosing. >>> > >>> > >>> > [1] >>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java >>> > [2] >>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856 >>> > >>> > Kenn >>> > >>> > On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <[email protected]> >>> wrote: >>> > >>> > Adding sharding to the model may require a wider discussion than >>> FileIO alone. I'm not entirely sure how wide, or if this has been proposed >>> before, but IMO it warrants a design doc or proposal. >>> > >>> > A couple high level questions I can think of are, >>> > - What runners support sharding? >>> > * There will be some work in Dataflow required to support this >>> but I'm not sure how much. >>> > - What does sharding mean for streaming pipelines? >>> > >>> > A more nitty-detail question: >>> > - How can this be achieved performantly? For example, if a shuffle >>> is required to achieve a particular sharding constraint, should we allow >>> transforms to declare they don't modify the sharding property (e.g. key >>> preserving) which may allow a runner to avoid an additional shuffle if a >>> preceding shuffle can guarantee the sharding requirements? >>> > >>> > Where X is the shuffle that could be avoided: input -> shuffle (key >>> sharding fn A) -> transform1 (key preserving) -> transform 2 (key >>> preserving) -> X -> fileio (key sharding fn A) >>> > >>> > On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <[email protected]> >>> wrote: >>> > >>> > I would like to extend FileIO with possibility to specify a custom >>> sharding function: >>> > https://issues.apache.org/jira/browse/BEAM-12493 >>> > >>> > I have 2 use-cases for this: >>> > >>> > I need to generate shards which are compatible with Hive bucketing and >>> therefore need to decide shard assignment based on data fields of input >>> element >>> > When running e.g. on Spark and job encounters kind of failure which >>> cause a loss of some data from previous stages, Spark does issue recompute >>> of necessary task in necessary stages to recover data. Because the shard >>> assignment function is random as default, some data will end up in >>> different shards and cause duplicates in the final output. >>> > >>> > Please let me know your thoughts in case you see a reason to not to >>> add such improvement. >>> > >>> > Thanks, >>> > Jozef >>> > >>> > >>> >>
