I'm not sure I understand your PR. How is this PR different than the by()
method in FileIO?

On Tue, Jun 22, 2021 at 1:22 AM Jozef Vilcek <[email protected]> wrote:

> MR for review for this change is here:
> https://github.com/apache/beam/pull/15051
>
> On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek <[email protected]>
> wrote:
>
>> I would like this thread to stay focused on sharding FileIO only.
>> Possible change to the model is an interesting topic but of a much
>> different scope.
>>
>> Yes, I agree that sharding is mostly a physical rather than logical
>> property of the pipeline. That is why it feels more natural to distinguish
>> between those two on the API level.
>> As for handling sharding requirements by adding more sugar to dynamic
>> destinations + file naming one has to keep in mind that results of dynamic
>> writes can be observed in the form of KV<DestinationT, String>, so written
>> files per dynamic destination. Often we do GBP to post-process files per
>> destination / logical group. If sharding would be encoded there, then it
>> might complicate things either downstream or inside the sugar part to put
>> shard in and then take it out later.
>> From the user perspective I do not see much difference. We would still
>> need to allow API to define both behaviors and it would only be executed
>> differently by implementation.
>> I do not see a value in changing FileIO (WriteFiles) logic to stop using
>> sharding and use dynamic destination for both given that sharding function
>> is already there and in use.
>>
>> To the point of external shuffle and non-deterministic user input.
>> Yes users can create non-deterministic behaviors but they are in control.
>> Here, Beam internally adds non-deterministic behavior and users can not
>> opt-out.
>> All works fine as long as external shuffle service (depends on Runner)
>> holds to the data and hands it out on retries. However if data in shuffle
>> service is lost for some reason - e.g. disk failure, node breaks down -
>> then pipeline have to recover the data by recomputing necessary paths.
>>
>> On Thu, Jun 17, 2021 at 7:36 PM Robert Bradshaw <[email protected]>
>> wrote:
>>
>>> Sharding is typically a physical rather than logical property of the
>>> pipeline, and I'm not convinced it makes sense to add it to Beam in
>>> general. One can already use keys and GBK/Stateful DoFns if some kind
>>> of logical grouping is needed, and adding constraints like this can
>>> prevent opportunities for optimizations (like dynamic sharding and
>>> fusion).
>>>
>>> That being said, file output are one area where it could make sense. I
>>> would expect that dynamic destinations could cover this usecase, and a
>>> general FileNaming subclass could be provided to make this pattern
>>> easier (and possibly some syntactic sugar for auto-setting num shards
>>> to 0). (One downside of this approach is that one couldn't do dynamic
>>> destinations, and have each sharded with a distinct sharing function
>>> as well.)
>>>
>>> If this doesn't work, we could look into adding ShardingFunction as a
>>> publicly exposed parameter to FileIO. (I'm actually surprised it
>>> already exists.)
>>>
>>> On Thu, Jun 17, 2021 at 9:39 AM <[email protected]> wrote:
>>> >
>>> > Alright, but what is worth emphasizing is that we talk about batch
>>> workloads. The typical scenario is that the output is committed once the
>>> job finishes (e.g., by atomic rename of directory).
>>> >  Jan
>>> >
>>> > Dne 17. 6. 2021 17:59 napsal uživatel Reuven Lax <[email protected]>:
>>> >
>>> > Yes - the problem is that Beam makes no guarantees of determinism
>>> anywhere in the system. User DoFns might be non deterministic, and have no
>>> way to know (we've discussed proving users with an @IsDeterministic
>>> annotation, however empirically users often think their functions are
>>> deterministic when they are in fact not). _Any_ sort of triggered
>>> aggregation (including watermark based) can always be non deterministic.
>>> >
>>> > Even if everything was deterministic, replaying everything is tricky.
>>> The output files already exist - should the system delete them and recreate
>>> them, or leave them as is and delete the temp files? Either decision could
>>> be problematic.
>>> >
>>> > On Wed, Jun 16, 2021 at 11:40 PM Jan Lukavský <[email protected]> wrote:
>>> >
>>> > Correct, by the external shuffle service I pretty much meant
>>> "offloading the contents of a shuffle phase out of the system". Looks like
>>> that is what the Spark's checkpoint does as well. On the other hand (if I
>>> understand the concept correctly), that implies some performance penalty -
>>> the data has to be moved to external distributed filesystem. Which then
>>> feels weird if we optimize code to avoid computing random numbers, but are
>>> okay with moving complete datasets back and forth. I think in this
>>> particular case making the Pipeline deterministic - idempotent to be
>>> precise - (within the limits, yes, hashCode of enum is not stable between
>>> JVMs) would seem more practical to me.
>>> >
>>> >  Jan
>>> >
>>> > On 6/17/21 7:09 AM, Reuven Lax wrote:
>>> >
>>> > I have some thoughts here, as Eugene Kirpichov and I spent a lot of
>>> time working through these semantics in the past.
>>> >
>>> > First - about the problem of duplicates:
>>> >
>>> > A "deterministic" sharding - e.g. hashCode based (though Java makes no
>>> guarantee that hashCode is stable across JVM instances, so this technique
>>> ends up not being stable) doesn't really help matters in Beam. Unlike other
>>> systems, Beam makes no assumptions that transforms are idempotent or
>>> deterministic. What's more, _any_ transform that has any sort of triggered
>>> grouping (whether the trigger used is watermark based or otherwise) is non
>>> deterministic.
>>> >
>>> > Forcing a hash of every element imposed quite a CPU cost; even
>>> generating a random number per-element slowed things down too much, which
>>> is why the current code generates a random number only in startBundle.
>>> >
>>> > Any runner that does not implement RequiresStableInput will not
>>> properly execute FileIO. Dataflow and Flink both support this. I believe
>>> that the Spark runner implicitly supports it by manually calling checkpoint
>>> as Ken mentioned (unless someone removed that from the Spark runner, but if
>>> so that would be a correctness regression). Implementing this has nothing
>>> to do with external shuffle services - neither Flink, Spark, nor Dataflow
>>> appliance (classic shuffle) have any problem correctly implementing
>>> RequiresStableInput.
>>> >
>>> > On Wed, Jun 16, 2021 at 11:18 AM Jan Lukavský <[email protected]> wrote:
>>> >
>>> > I think that the support for @RequiresStableInput is rather limited.
>>> AFAIK it is supported by streaming Flink (where it is not needed in this
>>> situation) and by Dataflow. Batch runners without external shuffle service
>>> that works as in the case of Dataflow have IMO no way to implement it
>>> correctly. In the case of FileIO (which do not use @RequiresStableInput as
>>> it would not be supported on Spark) the randomness is easily avoidable
>>> (hashCode of key?) and I it seems to me it should be preferred.
>>> >
>>> >  Jan
>>> >
>>> > On 6/16/21 6:23 PM, Kenneth Knowles wrote:
>>> >
>>> >
>>> > On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský <[email protected]> wrote:
>>> >
>>> > Hi,
>>> >
>>> > maybe a little unrelated, but I think we definitely should not use
>>> random assignment of shard keys (RandomShardingFunction), at least for
>>> bounded workloads (seems to be fine for streaming workloads). Many batch
>>> runners simply recompute path in the computation DAG from the failed node
>>> (transform) to the root (source). In the case there is any non-determinism
>>> involved in the logic, then it can result in duplicates (as the 'previous'
>>> attempt might have ended in DAG path that was not affected by the fail).
>>> That addresses the option 2) of what Jozef have mentioned.
>>> >
>>> > This is the reason we introduced "@RequiresStableInput".
>>> >
>>> > When things were only Dataflow, we knew that each shuffle was a
>>> checkpoint, so we inserted a Reshuffle after the random numbers were
>>> generated, freezing them so it was safe for replay. Since other engines do
>>> not checkpoint at every shuffle, we needed a way to communicate that this
>>> checkpointing was required for correctness. I think we still have many IOs
>>> that are written using Reshuffle instead of @RequiresStableInput, and I
>>> don't know which runners process @RequiresStableInput properly.
>>> >
>>> > By the way, I believe the SparkRunner explicitly calls materialize()
>>> after a GBK specifically so that it gets correct results for IOs that rely
>>> on Reshuffle. Has that changed?
>>> >
>>> > I agree that we should minimize use of RequiresStableInput. It has a
>>> significant cost, and the cost varies across runners. If we can use a
>>> deterministic function, we should.
>>> >
>>> > Kenn
>>> >
>>> >
>>> >  Jan
>>> >
>>> > On 6/16/21 1:43 PM, Jozef Vilcek wrote:
>>> >
>>> >
>>> >
>>> > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <[email protected]>
>>> wrote:
>>> >
>>> > In general, Beam only deals with keys and grouping by key. I think
>>> expanding this idea to some more abstract notion of a sharding function
>>> could make sense.
>>> >
>>> > For FileIO specifically, I wonder if you can use writeDynamic() to get
>>> the behavior you are seeking.
>>> >
>>> >
>>> > The change in mind looks like this:
>>> >
>>> https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18
>>> >
>>> > Dynamic Destinations in my mind is more towards the need for
>>> "partitioning" data (destination as directory level) or if one needs to
>>> handle groups of events differently, e.g. write some events in FormatA and
>>> others in FormatB.
>>> > Shards are now used for distributing writes or bucketing of events
>>> within a particular destination group. More specifically, currently, each
>>> element is assigned `ShardedKey<Integer>` [1] before GBK operation. Sharded
>>> key is a compound of destination and assigned shard.
>>> >
>>> > Having said that, I might be able to use dynamic destination for this,
>>> possibly with the need of custom FileNaming, and set shards to be always 1.
>>> But it feels less natural than allowing the user to swap already present
>>> `RandomShardingFunction` [2] with something of his own choosing.
>>> >
>>> >
>>> > [1]
>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
>>> > [2]
>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856
>>> >
>>> > Kenn
>>> >
>>> > On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <[email protected]>
>>> wrote:
>>> >
>>> > Adding sharding to the model may require a wider discussion than
>>> FileIO alone. I'm not entirely sure how wide, or if this has been proposed
>>> before, but IMO it warrants a design doc or proposal.
>>> >
>>> > A couple high level questions I can think of are,
>>> >   - What runners support sharding?
>>> >       * There will be some work in Dataflow required to support this
>>> but I'm not sure how much.
>>> >   - What does sharding mean for streaming pipelines?
>>> >
>>> > A more nitty-detail question:
>>> >   - How can this be achieved performantly? For example, if a shuffle
>>> is required to achieve a particular sharding constraint, should we allow
>>> transforms to declare they don't modify the sharding property (e.g. key
>>> preserving) which may allow a runner to avoid an additional shuffle if a
>>> preceding shuffle can guarantee the sharding requirements?
>>> >
>>> > Where X is the shuffle that could be avoided: input -> shuffle (key
>>> sharding fn A) -> transform1 (key preserving) -> transform 2 (key
>>> preserving) -> X -> fileio (key sharding fn A)
>>> >
>>> > On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <[email protected]>
>>> wrote:
>>> >
>>> > I would like to extend FileIO with possibility to specify a custom
>>> sharding function:
>>> > https://issues.apache.org/jira/browse/BEAM-12493
>>> >
>>> > I have 2 use-cases for this:
>>> >
>>> > I need to generate shards which are compatible with Hive bucketing and
>>> therefore need to decide shard assignment based on data fields of input
>>> element
>>> > When running e.g. on Spark and job encounters kind of failure which
>>> cause a loss of some data from previous stages, Spark does issue recompute
>>> of necessary task in necessary stages to recover data. Because the shard
>>> assignment function is random as default, some data will end up in
>>> different shards and cause duplicates in the final output.
>>> >
>>> > Please let me know your thoughts in case you see a reason to not to
>>> add such improvement.
>>> >
>>> > Thanks,
>>> > Jozef
>>> >
>>> >
>>>
>>

Reply via email to