Correct, by the external shuffle service I pretty much meant "offloading
the contents of a shuffle phase out of the system". Looks like that is
what the Spark's checkpoint does as well. On the other hand (if I
understand the concept correctly), that implies some performance penalty
- the data has to be moved to external distributed filesystem. Which
then feels weird if we optimize code to avoid computing random numbers,
but are okay with moving complete datasets back and forth. I think in
this particular case making the Pipeline deterministic - idempotent to
be precise - (within the limits, yes, hashCode of enum is not stable
between JVMs) would seem more practical to me.
Jan
On 6/17/21 7:09 AM, Reuven Lax wrote:
I have some thoughts here, as Eugene Kirpichov and I spent a lot of
time working through these semantics in the past.
First - about the problem of duplicates:
A "deterministic" sharding - e.g. hashCode based (though Java makes no
guarantee that hashCode is stable across JVM instances, so this
technique ends up not being stable) doesn't really help matters in
Beam. Unlike other systems, Beam makes no assumptions that transforms
are idempotent or deterministic. What's more, _any_ transform that has
any sort of triggered grouping (whether the trigger used is watermark
based or otherwise) is non deterministic.
Forcing a hash of every element imposed quite a CPU cost; even
generating a random number per-element slowed things down too much,
which is why the current code generates a random number only in
startBundle.
Any runner that does not implement RequiresStableInput will not
properly execute FileIO. Dataflow and Flink both support this. I
believe that the Spark runner implicitly supports it by manually
calling checkpoint as Ken mentioned (unless someone removed that from
the Spark runner, but if so that would be a correctness regression).
Implementing this has nothing to do with external shuffle services -
neither Flink, Spark, nor Dataflow appliance (classic shuffle) have
any problem correctly implementing RequiresStableInput.
On Wed, Jun 16, 2021 at 11:18 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
I think that the support for @RequiresStableInput is rather
limited. AFAIK it is supported by streaming Flink (where it is not
needed in this situation) and by Dataflow. Batch runners without
external shuffle service that works as in the case of Dataflow
have IMO no way to implement it correctly. In the case of FileIO
(which do not use @RequiresStableInput as it would not be
supported on Spark) the randomness is easily avoidable (hashCode
of key?) and I it seems to me it should be preferred.
Jan
On 6/16/21 6:23 PM, Kenneth Knowles wrote:
On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Hi,
maybe a little unrelated, but I think we definitely should
not use random assignment of shard keys
(RandomShardingFunction), at least for bounded workloads
(seems to be fine for streaming workloads). Many batch
runners simply recompute path in the computation DAG from the
failed node (transform) to the root (source). In the case
there is any non-determinism involved in the logic, then it
can result in duplicates (as the 'previous' attempt might
have ended in DAG path that was not affected by the fail).
That addresses the option 2) of what Jozef have mentioned.
This is the reason we introduced "@RequiresStableInput".
When things were only Dataflow, we knew that each shuffle was a
checkpoint, so we inserted a Reshuffle after the random numbers
were generated, freezing them so it was safe for replay. Since
other engines do not checkpoint at every shuffle, we needed a way
to communicate that this checkpointing was required for
correctness. I think we still have many IOs that are written
using Reshuffle instead of @RequiresStableInput, and I don't know
which runners process @RequiresStableInput properly.
By the way, I believe the SparkRunner explicitly calls
materialize() after a GBK specifically so that it gets correct
results for IOs that rely on Reshuffle. Has that changed?
I agree that we should minimize use of RequiresStableInput. It
has a significant cost, and the cost varies across runners. If we
can use a deterministic function, we should.
Kenn
Jan
On 6/16/21 1:43 PM, Jozef Vilcek wrote:
On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
In general, Beam only deals with keys and grouping by
key. I think expanding this idea to some more abstract
notion of a sharding function could make sense.
For FileIO specifically, I wonder if you can use
writeDynamic() to get the behavior you are seeking.
The change in mind looks like this:
https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18
<https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18>
Dynamic Destinations in my mind is more towards the need for
"partitioning" data (destination as directory level) or if
one needs to handle groups of events differently, e.g. write
some events in FormatA and others in FormatB.
Shards are now used for distributing writes or bucketing of
events within a particular destination group. More
specifically, currently, each element is assigned
`ShardedKey<Integer>` [1] before GBK operation. Sharded key
is a compound of destination and assigned shard.
Having said that, I might be able to use dynamic destination
for this, possibly with the need of custom FileNaming, and
set shards to be always 1. But it feels less natural than
allowing the user to swap already present
`RandomShardingFunction` [2] with something of his own choosing.
[1]
https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
<https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java>
[2]
https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856
<https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856>
Kenn
On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton
<tyso...@google.com <mailto:tyso...@google.com>> wrote:
Adding sharding to the model may require a wider
discussion than FileIO alone. I'm not entirely sure
how wide, or if this has been proposed before, but
IMO it warrants a design doc or proposal.
A couple high level questions I can think of are,
- What runners support sharding?
* There will be some work in Dataflow required
to support this but I'm not sure how much.
- What does sharding mean for streaming pipelines?
A more nitty-detail question:
- How can this be achieved performantly? For
example, if a shuffle is required to achieve a
particular sharding constraint, should we
allow transforms to declare they don't modify the
sharding property (e.g. key preserving) which may
allow a runner to avoid an additional shuffle if a
preceding shuffle can guarantee the sharding
requirements?
Where X is the shuffle that could be avoided: input
-> shuffle (key sharding fn A) -> transform1 (key
preserving) -> transform 2 (key preserving) -> X ->
fileio (key sharding fn A)
On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek
<jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>> wrote:
I would like to extend FileIO with
possibility to specify a custom sharding function:
https://issues.apache.org/jira/browse/BEAM-12493
<https://issues.apache.org/jira/browse/BEAM-12493>
I have 2 use-cases for this:
1. I need to generate shards which are
compatible with Hive bucketing and therefore
need to decide shard assignment based on
data fields of input element
2. When running e.g. on Spark and job
encounters kind of failure which cause a
loss of some data from previous stages,
Spark does issue recompute of necessary task
in necessary stages to recover data. Because
the shard assignment function is random as
default, some data will end up in different
shards and cause duplicates in the final output.
Please let me know your thoughts in case you see
a reason to not to add such improvement.
Thanks,
Jozef