I would like to bump this thread. So far we did not get much far. Allowing the user to specify a sharing function and not only the desired number of shards seems to be a very unsettling change. So far I do not have a good understanding of why this is so unacceptable. From the thread I sense some fear from users not using it correctly and complaining about performance. While true, this could be the same for any GBK operation with a bad key and can be mitigated to some extent by documentation.
What else do you see feature wise as a blocker for this change? Right now I do not have much data based on which I can evolve my proposal. On Wed, Jul 7, 2021 at 10:01 AM Jozef Vilcek <[email protected]> wrote: > > > On Sat, Jul 3, 2021 at 7:30 PM Reuven Lax <[email protected]> wrote: > >> >> >> On Sat, Jul 3, 2021 at 1:02 AM Jozef Vilcek <[email protected]> >> wrote: >> >>> I don't think this has anything to do with external shuffle services. >>> >>> Arbitrarily recomputing data is fundamentally incompatible with Beam, >>> since Beam does not restrict transforms to being deterministic. The Spark >>> runner works (at least it did last I checked) by checkpointing the RDD. >>> Spark will not recompute the DAG past a checkpoint, so this creates stable >>> input to a transform. This adds some cost to the Spark runner, but makes >>> things correct. You should not have sharding problems due to replay unless >>> there is a bug in the current Spark runner. >>> >>> Beam does not restrict non-determinism, true. If users do add it, then >>> they can work around it should they need to address any side effects. But >>> should non-determinism be deliberately added by Beam core? Probably can if >>> runners can 100% deal with that effectively. To the point of RDD >>> `checkpoint`, afaik SparkRunner does use `cache`, not checkpoint. Also, I >>> thought cache is invoked on fork points in DAG. I have just a join and map >>> and in such cases I thought data is always served out by shuffle service. >>> Am I mistaken? >>> >> >> Non determinism is already there in core Beam features. If any sort of >> trigger is used anywhere in the pipeline, the result is non deterministic. >> We've also found that users tend not to know when their ParDos are non >> deterministic, so telling users to works around it tends not to work. >> >> The Spark runner definitely used to use checkpoint in this case. >> > > It is beyond my knowledge level of Beam how triggers introduce > non-determinism into the code processing in batch mode. So I leave that > out. Reuven, do you mind pointing me to the Spark runner code which is > supposed to handle this by using checkpoint? I can not find it myself. > > > >> >>> >>> On Fri, Jul 2, 2021 at 8:23 PM Reuven Lax <[email protected]> wrote: >>> >>>> >>>> >>>> On Tue, Jun 29, 2021 at 1:03 AM Jozef Vilcek <[email protected]> >>>> wrote: >>>> >>>>> >>>>> How will @RequiresStableInput prevent this situation when running >>>>>> batch use case? >>>>>> >>>>> >>>>> So this is handled in combination of @RequiresStableInput and output >>>>> file finalization. @RequiresStableInput (or Reshuffle for most runners) >>>>> makes sure that the input provided for the write stage does not get >>>>> recomputed in the presence of failures. Output finalization makes sure >>>>> that >>>>> we only finalize one run of each bundle and discard the rest. >>>>> >>>>> So this I believe relies on having robust external service to hold >>>>> shuffle data and serve it out when needed so pipeline does not need to >>>>> recompute it via non-deterministic function. In Spark however, shuffle >>>>> service can not be (if I am not mistaking) deployed in this fashion (HA + >>>>> replicated shuffle data). Therefore, if instance of shuffle service >>>>> holding >>>>> a portion of shuffle data fails, spark recovers it by recomputing parts of >>>>> a DAG from source to recover lost shuffle results. I am not sure what Beam >>>>> can do here to prevent it or make it stable? Will @RequiresStableInput >>>>> work >>>>> as expected? ... Note that this, I believe, concerns only the batch. >>>>> >>>> >>>> I don't think this has anything to do with external shuffle services. >>>> >>>> Arbitrarily recomputing data is fundamentally incompatible with Beam, >>>> since Beam does not restrict transforms to being deterministic. The Spark >>>> runner works (at least it did last I checked) by checkpointing the RDD. >>>> Spark will not recompute the DAG past a checkpoint, so this creates stable >>>> input to a transform. This adds some cost to the Spark runner, but makes >>>> things correct. You should not have sharding problems due to replay unless >>>> there is a bug in the current Spark runner. >>>> >>>> >>>>> >>>>> Also, when withNumShards() truly have to be used, round robin >>>>> assignment of elements to shards sounds like the optimal solution (at >>>>> least >>>>> for the vast majority of pipelines) >>>>> >>>>> I agree. But random seed is my problem here with respect to the >>>>> situation mentioned above. >>>>> >>>>> Right, I would like to know if there are more true use-cases before >>>>> adding this. This essentially allows users to map elements to exact output >>>>> shards which could change the characteristics of pipelines in very >>>>> significant ways without users being aware of it. For example, this could >>>>> result in extremely imbalanced workloads for any downstream processors. If >>>>> it's just Hive I would rather work around it (even with a perf penalty for >>>>> that case). >>>>> >>>>> User would have to explicitly ask FileIO to use specific sharding. >>>>> Documentation can educate about the tradeoffs. But I am open to workaround >>>>> alternatives. >>>>> >>>>> >>>>> On Mon, Jun 28, 2021 at 5:50 PM Chamikara Jayalath < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Mon, Jun 28, 2021 at 2:47 AM Jozef Vilcek <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Cham, thanks for the feedback >>>>>>> >>>>>>> > Beam has a policy of no knobs (or keeping knobs minimum) to allow >>>>>>> runners to optimize better. I think one concern might be that the >>>>>>> addition >>>>>>> of this option might be going against this. >>>>>>> >>>>>>> I agree that less knobs is more. But if assignment of a key is >>>>>>> specific per user request via API (optional), than runner should not >>>>>>> optimize that but need to respect how data is requested to be laid >>>>>>> down. It >>>>>>> could however optimize number of shards as it is doing right now if >>>>>>> numShards is not set explicitly. >>>>>>> Anyway FileIO feels fluid here. You can leave shards empty and let >>>>>>> runner decide or optimize, but only in batch and not in streaming. >>>>>>> Then >>>>>>> it allows you to set number of shards but never let you change the logic >>>>>>> how it is assigned and defaults to round robin with random seed. It begs >>>>>>> question for me why I can manipulate number of shards but not the logic >>>>>>> of >>>>>>> assignment. Are there plans to (or already case implemented) optimize >>>>>>> around and have runner changing that under different circumstances? >>>>>>> >>>>>> >>>>>> I don't know the exact history behind withNumShards() feature for >>>>>> batch. I would guess it was originally introduced due to limitations for >>>>>> streaming but was made available to batch as well with a strong >>>>>> performance >>>>>> warning. >>>>>> >>>>>> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L159 >>>>>> >>>>>> Also, when withNumShards() truly have to be used, round robin >>>>>> assignment of elements to shards sounds like the optimal solution (at >>>>>> least >>>>>> for the vast majority of pipelines) >>>>>> >>>>>> >>>>>>> >>>>>>> > Also looking at your original reasoning for adding this. >>>>>>> >>>>>>> > > I need to generate shards which are compatible with Hive >>>>>>> bucketing and therefore need to decide shard assignment based on data >>>>>>> fields of input element >>>>>>> >>>>>>> > This sounds very specific to Hive. Again I think the downside here >>>>>>> is that we will be giving up the flexibility for the runner to optimize >>>>>>> the >>>>>>> pipeline and decide sharding. Do you think it's possible to somehow >>>>>>> relax >>>>>>> this requirement for > > > Hive or use a secondary process to format the >>>>>>> output to a form that is suitable for Hive after being written to an >>>>>>> intermediate storage. >>>>>>> >>>>>>> It is possible. But it is quite suboptimal because of extra IO >>>>>>> penalty and I would have to use completely custom file writing as FileIO >>>>>>> would not support this. Then naturally the question would be, should I >>>>>>> use >>>>>>> FileIO at all? I am quite not sure if I am doing something so rare that >>>>>>> it >>>>>>> is not and can not be supported. Maybe I am. I remember an old >>>>>>> discussion >>>>>>> from Scio guys when they wanted to introduce Sorted Merge Buckets to >>>>>>> FileIO >>>>>>> where sharding was also needed to be manipulated (among other things) >>>>>>> >>>>>> >>>>>> Right, I would like to know if there are more true use-cases before >>>>>> adding this. This essentially allows users to map elements to exact >>>>>> output >>>>>> shards which could change the characteristics of pipelines in very >>>>>> significant ways without users being aware of it. For example, this could >>>>>> result in extremely imbalanced workloads for any downstream processors. >>>>>> If >>>>>> it's just Hive I would rather work around it (even with a perf penalty >>>>>> for >>>>>> that case). >>>>>> >>>>>> >>>>>>> >>>>>>> > > When running e.g. on Spark and job encounters kind of failure >>>>>>> which cause a loss of some data from previous stages, Spark does issue >>>>>>> recompute of necessary task in necessary stages to recover data. Because >>>>>>> the shard assignment function is random as default, some data will end >>>>>>> up >>>>>>> in different shards and cause duplicates in the final output. >>>>>>> >>>>>>> > I think this was already addressed. The correct solution here is >>>>>>> to implement RequiresStableInput for runners that do not already support >>>>>>> that and update FileIO to use that. >>>>>>> >>>>>>> How will @RequiresStableInput prevent this situation when running >>>>>>> batch use case? >>>>>>> >>>>>> >>>>>> So this is handled in combination of @RequiresStableInput and output >>>>>> file finalization. @RequiresStableInput (or Reshuffle for most runners) >>>>>> makes sure that the input provided for the write stage does not get >>>>>> recomputed in the presence of failures. Output finalization makes sure >>>>>> that >>>>>> we only finalize one run of each bundle and discard the rest. >>>>>> >>>>>> Thanks, >>>>>> Cham >>>>>> >>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Jun 28, 2021 at 10:29 AM Chamikara Jayalath < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Jun 27, 2021 at 10:48 PM Jozef Vilcek < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> how do we proceed with reviewing MR proposed for this change? >>>>>>>>> >>>>>>>>> I sense there is a concern exposing existing sharding function to >>>>>>>>> the API. But from the discussion here I do not have a clear picture >>>>>>>>> about arguments not doing so. >>>>>>>>> Only one argument was that dynamic destinations should be able to >>>>>>>>> do the same. While this is true, as it is illustrated in previous >>>>>>>>> commnet, it is not simple nor convenient to use and requires more >>>>>>>>> customization than exposing sharding which is already there. >>>>>>>>> Are there more negatives to exposing sharding function? >>>>>>>>> >>>>>>>> >>>>>>>> Seems like currently FlinkStreamingPipelineTranslator is the only >>>>>>>> real usage of WriteFiles.withShardingFunction() : >>>>>>>> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L281 >>>>>>>> WriteFiles is not expected to be used by pipeline authors but >>>>>>>> exposing this through the FileIO will make this available to pipeline >>>>>>>> authors (and some may choose to do so for various reasons). >>>>>>>> This will result in sharding being strict and runners being >>>>>>>> inflexible when it comes to optimizing execution of pipelines that use >>>>>>>> FileIO. >>>>>>>> >>>>>>>> Beam has a policy of no knobs (or keeping knobs minimum) to allow >>>>>>>> runners to optimize better. I think one concern might be that the >>>>>>>> addition >>>>>>>> of this option might be going against this. >>>>>>>> >>>>>>>> Also looking at your original reasoning for adding this. >>>>>>>> >>>>>>>> > I need to generate shards which are compatible with Hive >>>>>>>> bucketing and therefore need to decide shard assignment based on data >>>>>>>> fields of input element >>>>>>>> >>>>>>>> This sounds very specific to Hive. Again I think the downside here >>>>>>>> is that we will be giving up the flexibility for the runner to >>>>>>>> optimize the >>>>>>>> pipeline and decide sharding. Do you think it's possible to somehow >>>>>>>> relax >>>>>>>> this requirement for Hive or use a secondary process to format the >>>>>>>> output >>>>>>>> to a form that is suitable for Hive after being written to an >>>>>>>> intermediate >>>>>>>> storage. >>>>>>>> >>>>>>>> > When running e.g. on Spark and job encounters kind of failure >>>>>>>> which cause a loss of some data from previous stages, Spark does issue >>>>>>>> recompute of necessary task in necessary stages to recover data. >>>>>>>> Because >>>>>>>> the shard assignment function is random as default, some data will end >>>>>>>> up >>>>>>>> in different shards and cause duplicates in the final output. >>>>>>>> >>>>>>>> I think this was already addressed. The correct solution here is to >>>>>>>> implement RequiresStableInput for runners that do not already support >>>>>>>> that >>>>>>>> and update FileIO to use that. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Cham >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Jun 23, 2021 at 9:36 AM Jozef Vilcek < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> The difference in my opinion is in distinguishing between - as >>>>>>>>>> written in this thread - physical vs logical properties of the >>>>>>>>>> pipeline. I >>>>>>>>>> proposed to keep dynamic destination (logical) and sharding >>>>>>>>>> (physical) >>>>>>>>>> separate on API level as it is at implementation level. >>>>>>>>>> >>>>>>>>>> When I reason about using `by()` for my case ... I am using >>>>>>>>>> dynamic destination to partition data into hourly folders. So my >>>>>>>>>> destination is e.g. `2021-06-23-07`. To add a shard there I assume I >>>>>>>>>> will >>>>>>>>>> need >>>>>>>>>> * encode shard to the destination .. e.g. in form of file prefix >>>>>>>>>> `2021-06-23-07/shard-1` >>>>>>>>>> * dynamic destination now does not span a group of files but must >>>>>>>>>> be exactly one file >>>>>>>>>> * to respect above, I have to. "disable" sharding in WriteFiles >>>>>>>>>> and make sure to use `.withNumShards(1)` ... I am not sure what >>>>>>>>>> `runnner >>>>>>>>>> determined` sharding would do, if it would not split destination >>>>>>>>>> further >>>>>>>>>> into more files >>>>>>>>>> * make sure that FileNameing will respect my intent and name >>>>>>>>>> files as I expect based on that destination >>>>>>>>>> * post process `WriteFilesResult` to turn my destination which >>>>>>>>>> targets physical single file (date-with-hour + shard-num) back into >>>>>>>>>> the >>>>>>>>>> destination with targets logical group of files (date-with-hour) so >>>>>>>>>> I hook >>>>>>>>>> it up do downstream post-process as usual >>>>>>>>>> >>>>>>>>>> Am I roughly correct? Or do I miss something more straight >>>>>>>>>> forward? >>>>>>>>>> If the above is correct then it feels more fragile and less >>>>>>>>>> intuitive to me than the option in my MR. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Jun 22, 2021 at 4:28 PM Reuven Lax <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I'm not sure I understand your PR. How is this PR different than >>>>>>>>>>> the by() method in FileIO? >>>>>>>>>>> >>>>>>>>>>> On Tue, Jun 22, 2021 at 1:22 AM Jozef Vilcek < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> MR for review for this change is here: >>>>>>>>>>>> https://github.com/apache/beam/pull/15051 >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I would like this thread to stay focused on sharding FileIO >>>>>>>>>>>>> only. Possible change to the model is an interesting topic but of >>>>>>>>>>>>> a much >>>>>>>>>>>>> different scope. >>>>>>>>>>>>> >>>>>>>>>>>>> Yes, I agree that sharding is mostly a physical rather than >>>>>>>>>>>>> logical property of the pipeline. That is why it feels more >>>>>>>>>>>>> natural to >>>>>>>>>>>>> distinguish between those two on the API level. >>>>>>>>>>>>> As for handling sharding requirements by adding more sugar to >>>>>>>>>>>>> dynamic destinations + file naming one has to keep in mind that >>>>>>>>>>>>> results of >>>>>>>>>>>>> dynamic writes can be observed in the form of KV<DestinationT, >>>>>>>>>>>>> String>, >>>>>>>>>>>>> so written files per dynamic destination. Often we do GBP to >>>>>>>>>>>>> post-process >>>>>>>>>>>>> files per destination / logical group. If sharding would be >>>>>>>>>>>>> encoded there, >>>>>>>>>>>>> then it might complicate things either downstream or inside the >>>>>>>>>>>>> sugar part >>>>>>>>>>>>> to put shard in and then take it out later. >>>>>>>>>>>>> From the user perspective I do not see much difference. We >>>>>>>>>>>>> would still need to allow API to define both behaviors and it >>>>>>>>>>>>> would only be >>>>>>>>>>>>> executed differently by implementation. >>>>>>>>>>>>> I do not see a value in changing FileIO (WriteFiles) logic to >>>>>>>>>>>>> stop using sharding and use dynamic destination for both given >>>>>>>>>>>>> that >>>>>>>>>>>>> sharding function is already there and in use. >>>>>>>>>>>>> >>>>>>>>>>>>> To the point of external shuffle and non-deterministic user >>>>>>>>>>>>> input. >>>>>>>>>>>>> Yes users can create non-deterministic behaviors but they are >>>>>>>>>>>>> in control. Here, Beam internally adds non-deterministic behavior >>>>>>>>>>>>> and users >>>>>>>>>>>>> can not opt-out. >>>>>>>>>>>>> All works fine as long as external shuffle service (depends on >>>>>>>>>>>>> Runner) holds to the data and hands it out on retries. However if >>>>>>>>>>>>> data in >>>>>>>>>>>>> shuffle service is lost for some reason - e.g. disk failure, node >>>>>>>>>>>>> breaks >>>>>>>>>>>>> down - then pipeline have to recover the data by recomputing >>>>>>>>>>>>> necessary >>>>>>>>>>>>> paths. >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jun 17, 2021 at 7:36 PM Robert Bradshaw < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Sharding is typically a physical rather than logical property >>>>>>>>>>>>>> of the >>>>>>>>>>>>>> pipeline, and I'm not convinced it makes sense to add it to >>>>>>>>>>>>>> Beam in >>>>>>>>>>>>>> general. One can already use keys and GBK/Stateful DoFns if >>>>>>>>>>>>>> some kind >>>>>>>>>>>>>> of logical grouping is needed, and adding constraints like >>>>>>>>>>>>>> this can >>>>>>>>>>>>>> prevent opportunities for optimizations (like dynamic >>>>>>>>>>>>>> sharding and >>>>>>>>>>>>>> fusion). >>>>>>>>>>>>>> >>>>>>>>>>>>>> That being said, file output are one area where it could make >>>>>>>>>>>>>> sense. I >>>>>>>>>>>>>> would expect that dynamic destinations could cover this >>>>>>>>>>>>>> usecase, and a >>>>>>>>>>>>>> general FileNaming subclass could be provided to make this >>>>>>>>>>>>>> pattern >>>>>>>>>>>>>> easier (and possibly some syntactic sugar for auto-setting >>>>>>>>>>>>>> num shards >>>>>>>>>>>>>> to 0). (One downside of this approach is that one couldn't do >>>>>>>>>>>>>> dynamic >>>>>>>>>>>>>> destinations, and have each sharded with a distinct sharing >>>>>>>>>>>>>> function >>>>>>>>>>>>>> as well.) >>>>>>>>>>>>>> >>>>>>>>>>>>>> If this doesn't work, we could look into adding >>>>>>>>>>>>>> ShardingFunction as a >>>>>>>>>>>>>> publicly exposed parameter to FileIO. (I'm actually surprised >>>>>>>>>>>>>> it >>>>>>>>>>>>>> already exists.) >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Jun 17, 2021 at 9:39 AM <[email protected]> wrote: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Alright, but what is worth emphasizing is that we talk >>>>>>>>>>>>>> about batch workloads. The typical scenario is that the output >>>>>>>>>>>>>> is committed >>>>>>>>>>>>>> once the job finishes (e.g., by atomic rename of directory). >>>>>>>>>>>>>> > Jan >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Dne 17. 6. 2021 17:59 napsal uživatel Reuven Lax < >>>>>>>>>>>>>> [email protected]>: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Yes - the problem is that Beam makes no guarantees of >>>>>>>>>>>>>> determinism anywhere in the system. User DoFns might be non >>>>>>>>>>>>>> deterministic, >>>>>>>>>>>>>> and have no way to know (we've discussed proving users with an >>>>>>>>>>>>>> @IsDeterministic annotation, however empirically users often >>>>>>>>>>>>>> think their >>>>>>>>>>>>>> functions are deterministic when they are in fact not). _Any_ >>>>>>>>>>>>>> sort of >>>>>>>>>>>>>> triggered aggregation (including watermark based) can always be >>>>>>>>>>>>>> non >>>>>>>>>>>>>> deterministic. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Even if everything was deterministic, replaying everything >>>>>>>>>>>>>> is tricky. The output files already exist - should the system >>>>>>>>>>>>>> delete them >>>>>>>>>>>>>> and recreate them, or leave them as is and delete the temp >>>>>>>>>>>>>> files? Either >>>>>>>>>>>>>> decision could be problematic. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 11:40 PM Jan Lukavský < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Correct, by the external shuffle service I pretty much >>>>>>>>>>>>>> meant "offloading the contents of a shuffle phase out of the >>>>>>>>>>>>>> system". Looks >>>>>>>>>>>>>> like that is what the Spark's checkpoint does as well. On the >>>>>>>>>>>>>> other hand >>>>>>>>>>>>>> (if I understand the concept correctly), that implies some >>>>>>>>>>>>>> performance >>>>>>>>>>>>>> penalty - the data has to be moved to external distributed >>>>>>>>>>>>>> filesystem. >>>>>>>>>>>>>> Which then feels weird if we optimize code to avoid computing >>>>>>>>>>>>>> random >>>>>>>>>>>>>> numbers, but are okay with moving complete datasets back and >>>>>>>>>>>>>> forth. I think >>>>>>>>>>>>>> in this particular case making the Pipeline deterministic - >>>>>>>>>>>>>> idempotent to >>>>>>>>>>>>>> be precise - (within the limits, yes, hashCode of enum is not >>>>>>>>>>>>>> stable >>>>>>>>>>>>>> between JVMs) would seem more practical to me. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Jan >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > On 6/17/21 7:09 AM, Reuven Lax wrote: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > I have some thoughts here, as Eugene Kirpichov and I spent >>>>>>>>>>>>>> a lot of time working through these semantics in the past. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > First - about the problem of duplicates: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > A "deterministic" sharding - e.g. hashCode based (though >>>>>>>>>>>>>> Java makes no guarantee that hashCode is stable across JVM >>>>>>>>>>>>>> instances, so >>>>>>>>>>>>>> this technique ends up not being stable) doesn't really help >>>>>>>>>>>>>> matters in >>>>>>>>>>>>>> Beam. Unlike other systems, Beam makes no assumptions that >>>>>>>>>>>>>> transforms are >>>>>>>>>>>>>> idempotent or deterministic. What's more, _any_ transform that >>>>>>>>>>>>>> has any sort >>>>>>>>>>>>>> of triggered grouping (whether the trigger used is watermark >>>>>>>>>>>>>> based or >>>>>>>>>>>>>> otherwise) is non deterministic. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Forcing a hash of every element imposed quite a CPU cost; >>>>>>>>>>>>>> even generating a random number per-element slowed things down >>>>>>>>>>>>>> too much, >>>>>>>>>>>>>> which is why the current code generates a random number only in >>>>>>>>>>>>>> startBundle. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Any runner that does not implement RequiresStableInput will >>>>>>>>>>>>>> not properly execute FileIO. Dataflow and Flink both support >>>>>>>>>>>>>> this. I >>>>>>>>>>>>>> believe that the Spark runner implicitly supports it by manually >>>>>>>>>>>>>> calling >>>>>>>>>>>>>> checkpoint as Ken mentioned (unless someone removed that from >>>>>>>>>>>>>> the Spark >>>>>>>>>>>>>> runner, but if so that would be a correctness regression). >>>>>>>>>>>>>> Implementing >>>>>>>>>>>>>> this has nothing to do with external shuffle services - neither >>>>>>>>>>>>>> Flink, >>>>>>>>>>>>>> Spark, nor Dataflow appliance (classic shuffle) have any problem >>>>>>>>>>>>>> correctly >>>>>>>>>>>>>> implementing RequiresStableInput. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 11:18 AM Jan Lukavský < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > I think that the support for @RequiresStableInput is rather >>>>>>>>>>>>>> limited. AFAIK it is supported by streaming Flink (where it is >>>>>>>>>>>>>> not needed >>>>>>>>>>>>>> in this situation) and by Dataflow. Batch runners without >>>>>>>>>>>>>> external shuffle >>>>>>>>>>>>>> service that works as in the case of Dataflow have IMO no way to >>>>>>>>>>>>>> implement >>>>>>>>>>>>>> it correctly. In the case of FileIO (which do not use >>>>>>>>>>>>>> @RequiresStableInput >>>>>>>>>>>>>> as it would not be supported on Spark) the randomness is easily >>>>>>>>>>>>>> avoidable >>>>>>>>>>>>>> (hashCode of key?) and I it seems to me it should be preferred. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Jan >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > On 6/16/21 6:23 PM, Kenneth Knowles wrote: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Hi, >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > maybe a little unrelated, but I think we definitely should >>>>>>>>>>>>>> not use random assignment of shard keys >>>>>>>>>>>>>> (RandomShardingFunction), at least >>>>>>>>>>>>>> for bounded workloads (seems to be fine for streaming >>>>>>>>>>>>>> workloads). Many >>>>>>>>>>>>>> batch runners simply recompute path in the computation DAG from >>>>>>>>>>>>>> the failed >>>>>>>>>>>>>> node (transform) to the root (source). In the case there is any >>>>>>>>>>>>>> non-determinism involved in the logic, then it can result in >>>>>>>>>>>>>> duplicates (as >>>>>>>>>>>>>> the 'previous' attempt might have ended in DAG path that was not >>>>>>>>>>>>>> affected >>>>>>>>>>>>>> by the fail). That addresses the option 2) of what Jozef have >>>>>>>>>>>>>> mentioned. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > This is the reason we introduced "@RequiresStableInput". >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > When things were only Dataflow, we knew that each shuffle >>>>>>>>>>>>>> was a checkpoint, so we inserted a Reshuffle after the random >>>>>>>>>>>>>> numbers were >>>>>>>>>>>>>> generated, freezing them so it was safe for replay. Since other >>>>>>>>>>>>>> engines do >>>>>>>>>>>>>> not checkpoint at every shuffle, we needed a way to communicate >>>>>>>>>>>>>> that this >>>>>>>>>>>>>> checkpointing was required for correctness. I think we still >>>>>>>>>>>>>> have many IOs >>>>>>>>>>>>>> that are written using Reshuffle instead of >>>>>>>>>>>>>> @RequiresStableInput, and I >>>>>>>>>>>>>> don't know which runners process @RequiresStableInput properly. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > By the way, I believe the SparkRunner explicitly calls >>>>>>>>>>>>>> materialize() after a GBK specifically so that it gets correct >>>>>>>>>>>>>> results for >>>>>>>>>>>>>> IOs that rely on Reshuffle. Has that changed? >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > I agree that we should minimize use of RequiresStableInput. >>>>>>>>>>>>>> It has a significant cost, and the cost varies across runners. >>>>>>>>>>>>>> If we can >>>>>>>>>>>>>> use a deterministic function, we should. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Kenn >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Jan >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > On 6/16/21 1:43 PM, Jozef Vilcek wrote: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > In general, Beam only deals with keys and grouping by key. >>>>>>>>>>>>>> I think expanding this idea to some more abstract notion of a >>>>>>>>>>>>>> sharding >>>>>>>>>>>>>> function could make sense. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > For FileIO specifically, I wonder if you can use >>>>>>>>>>>>>> writeDynamic() to get the behavior you are seeking. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > The change in mind looks like this: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18 >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Dynamic Destinations in my mind is more towards the need >>>>>>>>>>>>>> for "partitioning" data (destination as directory level) or if >>>>>>>>>>>>>> one needs to >>>>>>>>>>>>>> handle groups of events differently, e.g. write some events in >>>>>>>>>>>>>> FormatA and >>>>>>>>>>>>>> others in FormatB. >>>>>>>>>>>>>> > Shards are now used for distributing writes or bucketing of >>>>>>>>>>>>>> events within a particular destination group. More specifically, >>>>>>>>>>>>>> currently, >>>>>>>>>>>>>> each element is assigned `ShardedKey<Integer>` [1] before GBK >>>>>>>>>>>>>> operation. >>>>>>>>>>>>>> Sharded key is a compound of destination and assigned shard. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Having said that, I might be able to use dynamic >>>>>>>>>>>>>> destination for this, possibly with the need of custom >>>>>>>>>>>>>> FileNaming, and set >>>>>>>>>>>>>> shards to be always 1. But it feels less natural than allowing >>>>>>>>>>>>>> the user to >>>>>>>>>>>>>> swap already present `RandomShardingFunction` [2] with something >>>>>>>>>>>>>> of his own >>>>>>>>>>>>>> choosing. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > [1] >>>>>>>>>>>>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java >>>>>>>>>>>>>> > [2] >>>>>>>>>>>>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856 >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Kenn >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Adding sharding to the model may require a wider discussion >>>>>>>>>>>>>> than FileIO alone. I'm not entirely sure how wide, or if this >>>>>>>>>>>>>> has been >>>>>>>>>>>>>> proposed before, but IMO it warrants a design doc or proposal. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > A couple high level questions I can think of are, >>>>>>>>>>>>>> > - What runners support sharding? >>>>>>>>>>>>>> > * There will be some work in Dataflow required to >>>>>>>>>>>>>> support this but I'm not sure how much. >>>>>>>>>>>>>> > - What does sharding mean for streaming pipelines? >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > A more nitty-detail question: >>>>>>>>>>>>>> > - How can this be achieved performantly? For example, if >>>>>>>>>>>>>> a shuffle is required to achieve a particular sharding >>>>>>>>>>>>>> constraint, should >>>>>>>>>>>>>> we allow transforms to declare they don't modify the sharding >>>>>>>>>>>>>> property >>>>>>>>>>>>>> (e.g. key preserving) which may allow a runner to avoid an >>>>>>>>>>>>>> additional >>>>>>>>>>>>>> shuffle if a preceding shuffle can guarantee the sharding >>>>>>>>>>>>>> requirements? >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Where X is the shuffle that could be avoided: input -> >>>>>>>>>>>>>> shuffle (key sharding fn A) -> transform1 (key preserving) -> >>>>>>>>>>>>>> transform 2 >>>>>>>>>>>>>> (key preserving) -> X -> fileio (key sharding fn A) >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > I would like to extend FileIO with possibility to specify a >>>>>>>>>>>>>> custom sharding function: >>>>>>>>>>>>>> > https://issues.apache.org/jira/browse/BEAM-12493 >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > I have 2 use-cases for this: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > I need to generate shards which are compatible with Hive >>>>>>>>>>>>>> bucketing and therefore need to decide shard assignment based on >>>>>>>>>>>>>> data >>>>>>>>>>>>>> fields of input element >>>>>>>>>>>>>> > When running e.g. on Spark and job encounters kind of >>>>>>>>>>>>>> failure which cause a loss of some data from previous stages, >>>>>>>>>>>>>> Spark does >>>>>>>>>>>>>> issue recompute of necessary task in necessary stages to recover >>>>>>>>>>>>>> data. >>>>>>>>>>>>>> Because the shard assignment function is random as default, some >>>>>>>>>>>>>> data will >>>>>>>>>>>>>> end up in different shards and cause duplicates in the final >>>>>>>>>>>>>> output. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Please let me know your thoughts in case you see a reason >>>>>>>>>>>>>> to not to add such improvement. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Thanks, >>>>>>>>>>>>>> > Jozef >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> >>>>>>>>>>>>>
