I would like to bump this thread. So far we did not get much far.
Allowing the user to specify a sharing function and not only the desired
number of shards seems to be a very unsettling change. So far I do not have
a good understanding of why this is so unacceptable. From the thread I
sense some fear from users not using it correctly and complaining about
performance. While true, this could be the same for any GBK operation with
a bad key and can be mitigated to some extent by documentation.

What else do you see feature wise as a blocker for this change? Right now I
do not have much data based on which I can evolve my proposal.


On Wed, Jul 7, 2021 at 10:01 AM Jozef Vilcek <[email protected]> wrote:

>
>
> On Sat, Jul 3, 2021 at 7:30 PM Reuven Lax <[email protected]> wrote:
>
>>
>>
>> On Sat, Jul 3, 2021 at 1:02 AM Jozef Vilcek <[email protected]>
>> wrote:
>>
>>> I don't think this has anything to do with external shuffle services.
>>>
>>> Arbitrarily recomputing data is fundamentally incompatible with Beam,
>>> since Beam does not restrict transforms to being deterministic. The Spark
>>> runner works (at least it did last I checked) by checkpointing the RDD.
>>> Spark will not recompute the DAG past a checkpoint, so this creates stable
>>> input to a transform. This adds some cost to the Spark runner, but makes
>>> things correct. You should not have sharding problems due to replay unless
>>> there is a bug in the current Spark runner.
>>>
>>> Beam does not restrict non-determinism, true. If users do add it, then
>>> they can work around it should they need to address any side effects. But
>>> should non-determinism be deliberately added by Beam core? Probably can if
>>> runners can 100% deal with that effectively. To the point of RDD
>>> `checkpoint`, afaik SparkRunner does use `cache`, not checkpoint. Also, I
>>> thought cache is invoked on fork points in DAG. I have just a join and map
>>> and in such cases I thought data is always served out by shuffle service.
>>> Am I mistaken?
>>>
>>
>> Non determinism is already there  in core Beam features. If any sort of
>> trigger is used anywhere in the pipeline, the result is non deterministic.
>> We've also found that users tend not to know when their ParDos are non
>> deterministic, so telling users to works around it tends not to work.
>>
>> The Spark runner definitely used to use checkpoint in this case.
>>
>
> It is beyond my knowledge level of Beam how triggers introduce
> non-determinism into the code processing in batch mode. So I leave that
> out. Reuven, do you mind pointing me to the Spark runner code which is
> supposed to handle this by using checkpoint? I can not find it myself.
>
>
>
>>
>>>
>>> On Fri, Jul 2, 2021 at 8:23 PM Reuven Lax <[email protected]> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Jun 29, 2021 at 1:03 AM Jozef Vilcek <[email protected]>
>>>> wrote:
>>>>
>>>>>
>>>>> How will @RequiresStableInput prevent this situation when running
>>>>>> batch use case?
>>>>>>
>>>>>
>>>>> So this is handled in combination of @RequiresStableInput and output
>>>>> file finalization. @RequiresStableInput (or Reshuffle for most runners)
>>>>> makes sure that the input provided for the write stage does not get
>>>>> recomputed in the presence of failures. Output finalization makes sure 
>>>>> that
>>>>> we only finalize one run of each bundle and discard the rest.
>>>>>
>>>>> So this I believe relies on having robust external service to hold
>>>>> shuffle data and serve it out when needed so pipeline does not need to
>>>>> recompute it via non-deterministic function. In Spark however, shuffle
>>>>> service can not be (if I am not mistaking) deployed in this fashion (HA +
>>>>> replicated shuffle data). Therefore, if instance of shuffle service 
>>>>> holding
>>>>> a portion of shuffle data fails, spark recovers it by recomputing parts of
>>>>> a DAG from source to recover lost shuffle results. I am not sure what Beam
>>>>> can do here to prevent it or make it stable? Will @RequiresStableInput 
>>>>> work
>>>>> as expected? ... Note that this, I believe, concerns only the batch.
>>>>>
>>>>
>>>> I don't think this has anything to do with external shuffle services.
>>>>
>>>> Arbitrarily recomputing data is fundamentally incompatible with Beam,
>>>> since Beam does not restrict transforms to being deterministic. The Spark
>>>> runner works (at least it did last I checked) by checkpointing the RDD.
>>>> Spark will not recompute the DAG past a checkpoint, so this creates stable
>>>> input to a transform. This adds some cost to the Spark runner, but makes
>>>> things correct. You should not have sharding problems due to replay unless
>>>> there is a bug in the current Spark runner.
>>>>
>>>>
>>>>>
>>>>> Also, when withNumShards() truly have to be used, round robin
>>>>> assignment of elements to shards sounds like the optimal solution (at 
>>>>> least
>>>>> for the vast majority of pipelines)
>>>>>
>>>>> I agree. But random seed is my problem here with respect to the
>>>>> situation mentioned above.
>>>>>
>>>>> Right, I would like to know if there are more true use-cases before
>>>>> adding this. This essentially allows users to map elements to exact output
>>>>> shards which could change the characteristics of pipelines in very
>>>>> significant ways without users being aware of it. For example, this could
>>>>> result in extremely imbalanced workloads for any downstream processors. If
>>>>> it's just Hive I would rather work around it (even with a perf penalty for
>>>>> that case).
>>>>>
>>>>> User would have to explicitly ask FileIO to use specific sharding.
>>>>> Documentation can educate about the tradeoffs. But I am open to workaround
>>>>> alternatives.
>>>>>
>>>>>
>>>>> On Mon, Jun 28, 2021 at 5:50 PM Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 28, 2021 at 2:47 AM Jozef Vilcek <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Cham, thanks for the feedback
>>>>>>>
>>>>>>> > Beam has a policy of no knobs (or keeping knobs minimum) to allow
>>>>>>> runners to optimize better. I think one concern might be that the 
>>>>>>> addition
>>>>>>> of this option might be going against this.
>>>>>>>
>>>>>>> I agree that less knobs is more. But if assignment of a key is
>>>>>>> specific per user request via API (optional), than runner should not
>>>>>>> optimize that but need to respect how data is requested to be laid 
>>>>>>> down. It
>>>>>>> could however optimize number of shards as it is doing right now if
>>>>>>> numShards is not set explicitly.
>>>>>>> Anyway FileIO feels fluid here. You can leave shards empty and let
>>>>>>> runner decide or optimize,  but only in batch and not in streaming.  
>>>>>>> Then
>>>>>>> it allows you to set number of shards but never let you change the logic
>>>>>>> how it is assigned and defaults to round robin with random seed. It begs
>>>>>>> question for me why I can manipulate number of shards but not the logic 
>>>>>>> of
>>>>>>> assignment. Are there plans to (or already case implemented) optimize
>>>>>>> around and have runner changing that under different circumstances?
>>>>>>>
>>>>>>
>>>>>> I don't know the exact history behind withNumShards() feature for
>>>>>> batch. I would guess it was originally introduced due to limitations for
>>>>>> streaming but was made available to batch as well with a strong 
>>>>>> performance
>>>>>> warning.
>>>>>>
>>>>>> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L159
>>>>>>
>>>>>> Also, when withNumShards() truly have to be used, round robin
>>>>>> assignment of elements to shards sounds like the optimal solution (at 
>>>>>> least
>>>>>> for the vast majority of pipelines)
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> > Also looking at your original reasoning for adding this.
>>>>>>>
>>>>>>> > > I need to generate shards which are compatible with Hive
>>>>>>> bucketing and therefore need to decide shard assignment based on data
>>>>>>> fields of input element
>>>>>>>
>>>>>>> > This sounds very specific to Hive. Again I think the downside here
>>>>>>> is that we will be giving up the flexibility for the runner to optimize 
>>>>>>> the
>>>>>>> pipeline and decide sharding. Do you think it's possible to somehow 
>>>>>>> relax
>>>>>>> this requirement for > > > Hive or use a secondary process to format the
>>>>>>> output to a form that is suitable for Hive after being written to an
>>>>>>> intermediate storage.
>>>>>>>
>>>>>>> It is possible. But it is quite suboptimal because of extra IO
>>>>>>> penalty and I would have to use completely custom file writing as FileIO
>>>>>>> would not support this. Then naturally the question would be, should I 
>>>>>>> use
>>>>>>> FileIO at all? I am quite not sure if I am doing something so rare that 
>>>>>>> it
>>>>>>> is not and can not be supported. Maybe I am. I remember an old 
>>>>>>> discussion
>>>>>>> from Scio guys when they wanted to introduce Sorted Merge Buckets to 
>>>>>>> FileIO
>>>>>>> where sharding was also needed to be manipulated (among other things)
>>>>>>>
>>>>>>
>>>>>> Right, I would like to know if there are more true use-cases before
>>>>>> adding this. This essentially allows users to map elements to exact 
>>>>>> output
>>>>>> shards which could change the characteristics of pipelines in very
>>>>>> significant ways without users being aware of it. For example, this could
>>>>>> result in extremely imbalanced workloads for any downstream processors. 
>>>>>> If
>>>>>> it's just Hive I would rather work around it (even with a perf penalty 
>>>>>> for
>>>>>> that case).
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> > > When running e.g. on Spark and job encounters kind of failure
>>>>>>> which cause a loss of some data from previous stages, Spark does issue
>>>>>>> recompute of necessary task in necessary stages to recover data. Because
>>>>>>> the shard assignment function is random as default, some data will end 
>>>>>>> up
>>>>>>> in different shards and cause duplicates in the final output.
>>>>>>>
>>>>>>> > I think this was already addressed. The correct solution here is
>>>>>>> to implement RequiresStableInput for runners that do not already support
>>>>>>> that and update FileIO to use that.
>>>>>>>
>>>>>>> How will @RequiresStableInput prevent this situation when running
>>>>>>> batch use case?
>>>>>>>
>>>>>>
>>>>>> So this is handled in combination of @RequiresStableInput and output
>>>>>> file finalization. @RequiresStableInput (or Reshuffle for most runners)
>>>>>> makes sure that the input provided for the write stage does not get
>>>>>> recomputed in the presence of failures. Output finalization makes sure 
>>>>>> that
>>>>>> we only finalize one run of each bundle and discard the rest.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jun 28, 2021 at 10:29 AM Chamikara Jayalath <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Jun 27, 2021 at 10:48 PM Jozef Vilcek <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> how do we proceed with reviewing MR proposed for this change?
>>>>>>>>>
>>>>>>>>> I sense there is a concern exposing existing sharding function to
>>>>>>>>> the API. But from the discussion here I do not have a clear picture
>>>>>>>>> about arguments not doing so.
>>>>>>>>> Only one argument was that dynamic destinations should be able to
>>>>>>>>> do the same. While this is true, as it is illustrated in previous
>>>>>>>>> commnet, it is not simple nor convenient to use and requires more
>>>>>>>>> customization than exposing sharding which is already there.
>>>>>>>>> Are there more negatives to exposing sharding function?
>>>>>>>>>
>>>>>>>>
>>>>>>>> Seems like currently FlinkStreamingPipelineTranslator is the only
>>>>>>>> real usage of WriteFiles.withShardingFunction() :
>>>>>>>> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L281
>>>>>>>> WriteFiles is not expected to be used by pipeline authors but
>>>>>>>> exposing this through the FileIO will make this available to pipeline
>>>>>>>> authors (and some may choose to do so for various reasons).
>>>>>>>> This will result in sharding being strict and runners being
>>>>>>>> inflexible when it comes to optimizing execution of pipelines that use
>>>>>>>> FileIO.
>>>>>>>>
>>>>>>>> Beam has a policy of no knobs (or keeping knobs minimum) to allow
>>>>>>>> runners to optimize better. I think one concern might be that the 
>>>>>>>> addition
>>>>>>>> of this option might be going against this.
>>>>>>>>
>>>>>>>> Also looking at your original reasoning for adding this.
>>>>>>>>
>>>>>>>> > I need to generate shards which are compatible with Hive
>>>>>>>> bucketing and therefore need to decide shard assignment based on data
>>>>>>>> fields of input element
>>>>>>>>
>>>>>>>> This sounds very specific to Hive. Again I think the downside here
>>>>>>>> is that we will be giving up the flexibility for the runner to 
>>>>>>>> optimize the
>>>>>>>> pipeline and decide sharding. Do you think it's possible to somehow 
>>>>>>>> relax
>>>>>>>> this requirement for Hive or use a secondary process to format the 
>>>>>>>> output
>>>>>>>> to a form that is suitable for Hive after being written to an 
>>>>>>>> intermediate
>>>>>>>> storage.
>>>>>>>>
>>>>>>>> > When running e.g. on Spark and job encounters kind of failure
>>>>>>>> which cause a loss of some data from previous stages, Spark does issue
>>>>>>>> recompute of necessary task in necessary stages to recover data. 
>>>>>>>> Because
>>>>>>>> the shard assignment function is random as default, some data will end 
>>>>>>>> up
>>>>>>>> in different shards and cause duplicates in the final output.
>>>>>>>>
>>>>>>>> I think this was already addressed. The correct solution here is to
>>>>>>>> implement RequiresStableInput for runners that do not already support 
>>>>>>>> that
>>>>>>>> and update FileIO to use that.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Cham
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jun 23, 2021 at 9:36 AM Jozef Vilcek <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> The difference in my opinion is in distinguishing between - as
>>>>>>>>>> written in this thread - physical vs logical properties of the 
>>>>>>>>>> pipeline. I
>>>>>>>>>> proposed to keep dynamic destination (logical) and sharding 
>>>>>>>>>> (physical)
>>>>>>>>>> separate on API level as it is at implementation level.
>>>>>>>>>>
>>>>>>>>>> When I reason about using `by()` for my case ... I am using
>>>>>>>>>> dynamic destination to partition data into hourly folders. So my
>>>>>>>>>> destination is e.g. `2021-06-23-07`. To add a shard there I assume I 
>>>>>>>>>> will
>>>>>>>>>> need
>>>>>>>>>> * encode shard to the destination .. e.g. in form of file prefix
>>>>>>>>>> `2021-06-23-07/shard-1`
>>>>>>>>>> * dynamic destination now does not span a group of files but must
>>>>>>>>>> be exactly one file
>>>>>>>>>> * to respect above, I have to. "disable" sharding in WriteFiles
>>>>>>>>>> and make sure to use `.withNumShards(1)` ... I am not sure what 
>>>>>>>>>> `runnner
>>>>>>>>>> determined` sharding would do, if it would not split destination 
>>>>>>>>>> further
>>>>>>>>>> into more files
>>>>>>>>>> * make sure that FileNameing will respect my intent and name
>>>>>>>>>> files as I expect based on that destination
>>>>>>>>>> * post process `WriteFilesResult` to turn my destination which
>>>>>>>>>> targets physical single file (date-with-hour + shard-num) back into 
>>>>>>>>>> the
>>>>>>>>>> destination with targets logical group of files (date-with-hour) so 
>>>>>>>>>> I hook
>>>>>>>>>> it up do downstream post-process as usual
>>>>>>>>>>
>>>>>>>>>> Am I roughly correct? Or do I miss something more straight
>>>>>>>>>> forward?
>>>>>>>>>> If the above is correct then it feels more fragile and less
>>>>>>>>>> intuitive to me than the option in my MR.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 22, 2021 at 4:28 PM Reuven Lax <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm not sure I understand your PR. How is this PR different than
>>>>>>>>>>> the by() method in FileIO?
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jun 22, 2021 at 1:22 AM Jozef Vilcek <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> MR for review for this change is here:
>>>>>>>>>>>> https://github.com/apache/beam/pull/15051
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I would like this thread to stay focused on sharding FileIO
>>>>>>>>>>>>> only. Possible change to the model is an interesting topic but of 
>>>>>>>>>>>>> a much
>>>>>>>>>>>>> different scope.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, I agree that sharding is mostly a physical rather than
>>>>>>>>>>>>> logical property of the pipeline. That is why it feels more 
>>>>>>>>>>>>> natural to
>>>>>>>>>>>>> distinguish between those two on the API level.
>>>>>>>>>>>>> As for handling sharding requirements by adding more sugar to
>>>>>>>>>>>>> dynamic destinations + file naming one has to keep in mind that 
>>>>>>>>>>>>> results of
>>>>>>>>>>>>> dynamic writes can be observed in the form of KV<DestinationT, 
>>>>>>>>>>>>> String>,
>>>>>>>>>>>>> so written files per dynamic destination. Often we do GBP to 
>>>>>>>>>>>>> post-process
>>>>>>>>>>>>> files per destination / logical group. If sharding would be 
>>>>>>>>>>>>> encoded there,
>>>>>>>>>>>>> then it might complicate things either downstream or inside the 
>>>>>>>>>>>>> sugar part
>>>>>>>>>>>>> to put shard in and then take it out later.
>>>>>>>>>>>>> From the user perspective I do not see much difference. We
>>>>>>>>>>>>> would still need to allow API to define both behaviors and it 
>>>>>>>>>>>>> would only be
>>>>>>>>>>>>> executed differently by implementation.
>>>>>>>>>>>>> I do not see a value in changing FileIO (WriteFiles) logic to
>>>>>>>>>>>>> stop using sharding and use dynamic destination for both given 
>>>>>>>>>>>>> that
>>>>>>>>>>>>> sharding function is already there and in use.
>>>>>>>>>>>>>
>>>>>>>>>>>>> To the point of external shuffle and non-deterministic user
>>>>>>>>>>>>> input.
>>>>>>>>>>>>> Yes users can create non-deterministic behaviors but they are
>>>>>>>>>>>>> in control. Here, Beam internally adds non-deterministic behavior 
>>>>>>>>>>>>> and users
>>>>>>>>>>>>> can not opt-out.
>>>>>>>>>>>>> All works fine as long as external shuffle service (depends on
>>>>>>>>>>>>> Runner) holds to the data and hands it out on retries. However if 
>>>>>>>>>>>>> data in
>>>>>>>>>>>>> shuffle service is lost for some reason - e.g. disk failure, node 
>>>>>>>>>>>>> breaks
>>>>>>>>>>>>> down - then pipeline have to recover the data by recomputing 
>>>>>>>>>>>>> necessary
>>>>>>>>>>>>> paths.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jun 17, 2021 at 7:36 PM Robert Bradshaw <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Sharding is typically a physical rather than logical property
>>>>>>>>>>>>>> of the
>>>>>>>>>>>>>> pipeline, and I'm not convinced it makes sense to add it to
>>>>>>>>>>>>>> Beam in
>>>>>>>>>>>>>> general. One can already use keys and GBK/Stateful DoFns if
>>>>>>>>>>>>>> some kind
>>>>>>>>>>>>>> of logical grouping is needed, and adding constraints like
>>>>>>>>>>>>>> this can
>>>>>>>>>>>>>> prevent opportunities for optimizations (like dynamic
>>>>>>>>>>>>>> sharding and
>>>>>>>>>>>>>> fusion).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> That being said, file output are one area where it could make
>>>>>>>>>>>>>> sense. I
>>>>>>>>>>>>>> would expect that dynamic destinations could cover this
>>>>>>>>>>>>>> usecase, and a
>>>>>>>>>>>>>> general FileNaming subclass could be provided to make this
>>>>>>>>>>>>>> pattern
>>>>>>>>>>>>>> easier (and possibly some syntactic sugar for auto-setting
>>>>>>>>>>>>>> num shards
>>>>>>>>>>>>>> to 0). (One downside of this approach is that one couldn't do
>>>>>>>>>>>>>> dynamic
>>>>>>>>>>>>>> destinations, and have each sharded with a distinct sharing
>>>>>>>>>>>>>> function
>>>>>>>>>>>>>> as well.)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If this doesn't work, we could look into adding
>>>>>>>>>>>>>> ShardingFunction as a
>>>>>>>>>>>>>> publicly exposed parameter to FileIO. (I'm actually surprised
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>> already exists.)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Jun 17, 2021 at 9:39 AM <[email protected]> wrote:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Alright, but what is worth emphasizing is that we talk
>>>>>>>>>>>>>> about batch workloads. The typical scenario is that the output 
>>>>>>>>>>>>>> is committed
>>>>>>>>>>>>>> once the job finishes (e.g., by atomic rename of directory).
>>>>>>>>>>>>>> >  Jan
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Dne 17. 6. 2021 17:59 napsal uživatel Reuven Lax <
>>>>>>>>>>>>>> [email protected]>:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Yes - the problem is that Beam makes no guarantees of
>>>>>>>>>>>>>> determinism anywhere in the system. User DoFns might be non 
>>>>>>>>>>>>>> deterministic,
>>>>>>>>>>>>>> and have no way to know (we've discussed proving users with an
>>>>>>>>>>>>>> @IsDeterministic annotation, however empirically users often 
>>>>>>>>>>>>>> think their
>>>>>>>>>>>>>> functions are deterministic when they are in fact not). _Any_ 
>>>>>>>>>>>>>> sort of
>>>>>>>>>>>>>> triggered aggregation (including watermark based) can always be 
>>>>>>>>>>>>>> non
>>>>>>>>>>>>>> deterministic.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Even if everything was deterministic, replaying everything
>>>>>>>>>>>>>> is tricky. The output files already exist - should the system 
>>>>>>>>>>>>>> delete them
>>>>>>>>>>>>>> and recreate them, or leave them as is and delete the temp 
>>>>>>>>>>>>>> files? Either
>>>>>>>>>>>>>> decision could be problematic.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 11:40 PM Jan Lukavský <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Correct, by the external shuffle service I pretty much
>>>>>>>>>>>>>> meant "offloading the contents of a shuffle phase out of the 
>>>>>>>>>>>>>> system". Looks
>>>>>>>>>>>>>> like that is what the Spark's checkpoint does as well. On the 
>>>>>>>>>>>>>> other hand
>>>>>>>>>>>>>> (if I understand the concept correctly), that implies some 
>>>>>>>>>>>>>> performance
>>>>>>>>>>>>>> penalty - the data has to be moved to external distributed 
>>>>>>>>>>>>>> filesystem.
>>>>>>>>>>>>>> Which then feels weird if we optimize code to avoid computing 
>>>>>>>>>>>>>> random
>>>>>>>>>>>>>> numbers, but are okay with moving complete datasets back and 
>>>>>>>>>>>>>> forth. I think
>>>>>>>>>>>>>> in this particular case making the Pipeline deterministic - 
>>>>>>>>>>>>>> idempotent to
>>>>>>>>>>>>>> be precise - (within the limits, yes, hashCode of enum is not 
>>>>>>>>>>>>>> stable
>>>>>>>>>>>>>> between JVMs) would seem more practical to me.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >  Jan
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > On 6/17/21 7:09 AM, Reuven Lax wrote:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > I have some thoughts here, as Eugene Kirpichov and I spent
>>>>>>>>>>>>>> a lot of time working through these semantics in the past.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > First - about the problem of duplicates:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > A "deterministic" sharding - e.g. hashCode based (though
>>>>>>>>>>>>>> Java makes no guarantee that hashCode is stable across JVM 
>>>>>>>>>>>>>> instances, so
>>>>>>>>>>>>>> this technique ends up not being stable) doesn't really help 
>>>>>>>>>>>>>> matters in
>>>>>>>>>>>>>> Beam. Unlike other systems, Beam makes no assumptions that 
>>>>>>>>>>>>>> transforms are
>>>>>>>>>>>>>> idempotent or deterministic. What's more, _any_ transform that 
>>>>>>>>>>>>>> has any sort
>>>>>>>>>>>>>> of triggered grouping (whether the trigger used is watermark 
>>>>>>>>>>>>>> based or
>>>>>>>>>>>>>> otherwise) is non deterministic.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Forcing a hash of every element imposed quite a CPU cost;
>>>>>>>>>>>>>> even generating a random number per-element slowed things down 
>>>>>>>>>>>>>> too much,
>>>>>>>>>>>>>> which is why the current code generates a random number only in 
>>>>>>>>>>>>>> startBundle.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Any runner that does not implement RequiresStableInput will
>>>>>>>>>>>>>> not properly execute FileIO. Dataflow and Flink both support 
>>>>>>>>>>>>>> this. I
>>>>>>>>>>>>>> believe that the Spark runner implicitly supports it by manually 
>>>>>>>>>>>>>> calling
>>>>>>>>>>>>>> checkpoint as Ken mentioned (unless someone removed that from 
>>>>>>>>>>>>>> the Spark
>>>>>>>>>>>>>> runner, but if so that would be a correctness regression). 
>>>>>>>>>>>>>> Implementing
>>>>>>>>>>>>>> this has nothing to do with external shuffle services - neither 
>>>>>>>>>>>>>> Flink,
>>>>>>>>>>>>>> Spark, nor Dataflow appliance (classic shuffle) have any problem 
>>>>>>>>>>>>>> correctly
>>>>>>>>>>>>>> implementing RequiresStableInput.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 11:18 AM Jan Lukavský <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > I think that the support for @RequiresStableInput is rather
>>>>>>>>>>>>>> limited. AFAIK it is supported by streaming Flink (where it is 
>>>>>>>>>>>>>> not needed
>>>>>>>>>>>>>> in this situation) and by Dataflow. Batch runners without 
>>>>>>>>>>>>>> external shuffle
>>>>>>>>>>>>>> service that works as in the case of Dataflow have IMO no way to 
>>>>>>>>>>>>>> implement
>>>>>>>>>>>>>> it correctly. In the case of FileIO (which do not use 
>>>>>>>>>>>>>> @RequiresStableInput
>>>>>>>>>>>>>> as it would not be supported on Spark) the randomness is easily 
>>>>>>>>>>>>>> avoidable
>>>>>>>>>>>>>> (hashCode of key?) and I it seems to me it should be preferred.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >  Jan
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > On 6/16/21 6:23 PM, Kenneth Knowles wrote:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Hi,
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > maybe a little unrelated, but I think we definitely should
>>>>>>>>>>>>>> not use random assignment of shard keys 
>>>>>>>>>>>>>> (RandomShardingFunction), at least
>>>>>>>>>>>>>> for bounded workloads (seems to be fine for streaming 
>>>>>>>>>>>>>> workloads). Many
>>>>>>>>>>>>>> batch runners simply recompute path in the computation DAG from 
>>>>>>>>>>>>>> the failed
>>>>>>>>>>>>>> node (transform) to the root (source). In the case there is any
>>>>>>>>>>>>>> non-determinism involved in the logic, then it can result in 
>>>>>>>>>>>>>> duplicates (as
>>>>>>>>>>>>>> the 'previous' attempt might have ended in DAG path that was not 
>>>>>>>>>>>>>> affected
>>>>>>>>>>>>>> by the fail). That addresses the option 2) of what Jozef have 
>>>>>>>>>>>>>> mentioned.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > This is the reason we introduced "@RequiresStableInput".
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > When things were only Dataflow, we knew that each shuffle
>>>>>>>>>>>>>> was a checkpoint, so we inserted a Reshuffle after the random 
>>>>>>>>>>>>>> numbers were
>>>>>>>>>>>>>> generated, freezing them so it was safe for replay. Since other 
>>>>>>>>>>>>>> engines do
>>>>>>>>>>>>>> not checkpoint at every shuffle, we needed a way to communicate 
>>>>>>>>>>>>>> that this
>>>>>>>>>>>>>> checkpointing was required for correctness. I think we still 
>>>>>>>>>>>>>> have many IOs
>>>>>>>>>>>>>> that are written using Reshuffle instead of 
>>>>>>>>>>>>>> @RequiresStableInput, and I
>>>>>>>>>>>>>> don't know which runners process @RequiresStableInput properly.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > By the way, I believe the SparkRunner explicitly calls
>>>>>>>>>>>>>> materialize() after a GBK specifically so that it gets correct 
>>>>>>>>>>>>>> results for
>>>>>>>>>>>>>> IOs that rely on Reshuffle. Has that changed?
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > I agree that we should minimize use of RequiresStableInput.
>>>>>>>>>>>>>> It has a significant cost, and the cost varies across runners. 
>>>>>>>>>>>>>> If we can
>>>>>>>>>>>>>> use a deterministic function, we should.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Kenn
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >  Jan
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > On 6/16/21 1:43 PM, Jozef Vilcek wrote:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > In general, Beam only deals with keys and grouping by key.
>>>>>>>>>>>>>> I think expanding this idea to some more abstract notion of a 
>>>>>>>>>>>>>> sharding
>>>>>>>>>>>>>> function could make sense.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > For FileIO specifically, I wonder if you can use
>>>>>>>>>>>>>> writeDynamic() to get the behavior you are seeking.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > The change in mind looks like this:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Dynamic Destinations in my mind is more towards the need
>>>>>>>>>>>>>> for "partitioning" data (destination as directory level) or if 
>>>>>>>>>>>>>> one needs to
>>>>>>>>>>>>>> handle groups of events differently, e.g. write some events in 
>>>>>>>>>>>>>> FormatA and
>>>>>>>>>>>>>> others in FormatB.
>>>>>>>>>>>>>> > Shards are now used for distributing writes or bucketing of
>>>>>>>>>>>>>> events within a particular destination group. More specifically, 
>>>>>>>>>>>>>> currently,
>>>>>>>>>>>>>> each element is assigned `ShardedKey<Integer>` [1] before GBK 
>>>>>>>>>>>>>> operation.
>>>>>>>>>>>>>> Sharded key is a compound of destination and assigned shard.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Having said that, I might be able to use dynamic
>>>>>>>>>>>>>> destination for this, possibly with the need of custom 
>>>>>>>>>>>>>> FileNaming, and set
>>>>>>>>>>>>>> shards to be always 1. But it feels less natural than allowing 
>>>>>>>>>>>>>> the user to
>>>>>>>>>>>>>> swap already present `RandomShardingFunction` [2] with something 
>>>>>>>>>>>>>> of his own
>>>>>>>>>>>>>> choosing.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > [1]
>>>>>>>>>>>>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
>>>>>>>>>>>>>> > [2]
>>>>>>>>>>>>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Kenn
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Adding sharding to the model may require a wider discussion
>>>>>>>>>>>>>> than FileIO alone. I'm not entirely sure how wide, or if this 
>>>>>>>>>>>>>> has been
>>>>>>>>>>>>>> proposed before, but IMO it warrants a design doc or proposal.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > A couple high level questions I can think of are,
>>>>>>>>>>>>>> >   - What runners support sharding?
>>>>>>>>>>>>>> >       * There will be some work in Dataflow required to
>>>>>>>>>>>>>> support this but I'm not sure how much.
>>>>>>>>>>>>>> >   - What does sharding mean for streaming pipelines?
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > A more nitty-detail question:
>>>>>>>>>>>>>> >   - How can this be achieved performantly? For example, if
>>>>>>>>>>>>>> a shuffle is required to achieve a particular sharding 
>>>>>>>>>>>>>> constraint, should
>>>>>>>>>>>>>> we allow transforms to declare they don't modify the sharding 
>>>>>>>>>>>>>> property
>>>>>>>>>>>>>> (e.g. key preserving) which may allow a runner to avoid an 
>>>>>>>>>>>>>> additional
>>>>>>>>>>>>>> shuffle if a preceding shuffle can guarantee the sharding 
>>>>>>>>>>>>>> requirements?
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Where X is the shuffle that could be avoided: input ->
>>>>>>>>>>>>>> shuffle (key sharding fn A) -> transform1 (key preserving) -> 
>>>>>>>>>>>>>> transform 2
>>>>>>>>>>>>>> (key preserving) -> X -> fileio (key sharding fn A)
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > I would like to extend FileIO with possibility to specify a
>>>>>>>>>>>>>> custom sharding function:
>>>>>>>>>>>>>> > https://issues.apache.org/jira/browse/BEAM-12493
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > I have 2 use-cases for this:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > I need to generate shards which are compatible with Hive
>>>>>>>>>>>>>> bucketing and therefore need to decide shard assignment based on 
>>>>>>>>>>>>>> data
>>>>>>>>>>>>>> fields of input element
>>>>>>>>>>>>>> > When running e.g. on Spark and job encounters kind of
>>>>>>>>>>>>>> failure which cause a loss of some data from previous stages, 
>>>>>>>>>>>>>> Spark does
>>>>>>>>>>>>>> issue recompute of necessary task in necessary stages to recover 
>>>>>>>>>>>>>> data.
>>>>>>>>>>>>>> Because the shard assignment function is random as default, some 
>>>>>>>>>>>>>> data will
>>>>>>>>>>>>>> end up in different shards and cause duplicates in the final 
>>>>>>>>>>>>>> output.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Please let me know your thoughts in case you see a reason
>>>>>>>>>>>>>> to not to add such improvement.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Thanks,
>>>>>>>>>>>>>> > Jozef
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>
>>>>>>>>>>>>>

Reply via email to