I think that the support for @RequiresStableInput is rather limited.
AFAIK it is supported by streaming Flink (where it is not needed in this
situation) and by Dataflow. Batch runners without external shuffle
service that works as in the case of Dataflow have IMO no way to
implement it correctly. In the case of FileIO (which do not use
@RequiresStableInput as it would not be supported on Spark) the
randomness is easily avoidable (hashCode of key?) and I it seems to me
it should be preferred.
Jan
On 6/16/21 6:23 PM, Kenneth Knowles wrote:
On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Hi,
maybe a little unrelated, but I think we definitely should not use
random assignment of shard keys (RandomShardingFunction), at least
for bounded workloads (seems to be fine for streaming workloads).
Many batch runners simply recompute path in the computation DAG
from the failed node (transform) to the root (source). In the case
there is any non-determinism involved in the logic, then it can
result in duplicates (as the 'previous' attempt might have ended
in DAG path that was not affected by the fail). That addresses the
option 2) of what Jozef have mentioned.
This is the reason we introduced "@RequiresStableInput".
When things were only Dataflow, we knew that each shuffle was a
checkpoint, so we inserted a Reshuffle after the random numbers were
generated, freezing them so it was safe for replay. Since other
engines do not checkpoint at every shuffle, we needed a way to
communicate that this checkpointing was required for correctness. I
think we still have many IOs that are written using Reshuffle instead
of @RequiresStableInput, and I don't know which runners process
@RequiresStableInput properly.
By the way, I believe the SparkRunner explicitly calls materialize()
after a GBK specifically so that it gets correct results for IOs that
rely on Reshuffle. Has that changed?
I agree that we should minimize use of RequiresStableInput. It has a
significant cost, and the cost varies across runners. If we can use a
deterministic function, we should.
Kenn
Jan
On 6/16/21 1:43 PM, Jozef Vilcek wrote:
On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
In general, Beam only deals with keys and grouping by key. I
think expanding this idea to some more abstract notion of a
sharding function could make sense.
For FileIO specifically, I wonder if you can use
writeDynamic() to get the behavior you are seeking.
The change in mind looks like this:
https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18
<https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18>
Dynamic Destinations in my mind is more towards the need for
"partitioning" data (destination as directory level) or if one
needs to handle groups of events differently, e.g. write some
events in FormatA and others in FormatB.
Shards are now used for distributing writes or bucketing of
events within a particular destination group. More specifically,
currently, each element is assigned `ShardedKey<Integer>` [1]
before GBK operation. Sharded key is a compound of destination
and assigned shard.
Having said that, I might be able to use dynamic destination for
this, possibly with the need of custom FileNaming, and set shards
to be always 1. But it feels less natural than allowing the user
to swap already present `RandomShardingFunction` [2] with
something of his own choosing.
[1]
https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
<https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java>
[2]
https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856
<https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856>
Kenn
On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton
<tyso...@google.com <mailto:tyso...@google.com>> wrote:
Adding sharding to the model may require a wider
discussion than FileIO alone. I'm not entirely sure how
wide, or if this has been proposed before, but IMO it
warrants a design doc or proposal.
A couple high level questions I can think of are,
- What runners support sharding?
* There will be some work in Dataflow required to
support this but I'm not sure how much.
- What does sharding mean for streaming pipelines?
A more nitty-detail question:
- How can this be achieved performantly? For example,
if a shuffle is required to achieve a particular sharding
constraint, should we allow transforms to declare
they don't modify the sharding property (e.g. key
preserving) which may allow a runner to avoid an
additional shuffle if a preceding shuffle can guarantee
the sharding requirements?
Where X is the shuffle that could be avoided: input ->
shuffle (key sharding fn A) -> transform1 (key
preserving) -> transform 2 (key preserving) -> X ->
fileio (key sharding fn A)
On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek
<jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:
I would like to extend FileIO with possibility to
specify a custom sharding function:
https://issues.apache.org/jira/browse/BEAM-12493
<https://issues.apache.org/jira/browse/BEAM-12493>
I have 2 use-cases for this:
1. I need to generate shards which are compatible
with Hive bucketing and therefore need to decide
shard assignment based on data fields of input
element
2. When running e.g. on Spark and job encounters
kind of failure which cause a loss of some data
from previous stages, Spark does issue recompute
of necessary task in necessary stages to recover
data. Because the shard assignment function is
random as default, some data will end up in
different shards and cause duplicates in the
final output.
Please let me know your thoughts in case you see a
reason to not to add such improvement.
Thanks,
Jozef