I would like to bump this thread. So far we did not get much far.
Allowing the user to specify a sharing function and not only the desired
number of shards seems to be a very unsettling change. So far I do not have
a good understanding of why this is so unacceptable. From the thread I
sense some
On Sat, Jul 3, 2021 at 7:30 PM Reuven Lax wrote:
>
>
> On Sat, Jul 3, 2021 at 1:02 AM Jozef Vilcek wrote:
>
>> I don't think this has anything to do with external shuffle services.
>>
>> Arbitrarily recomputing data is fundamentally incompatible with Beam,
>> since Beam does not restrict
On Sat, Jul 3, 2021 at 12:55 PM Jan Lukavský wrote:
>
> I don't think this has anything to do with external shuffle services.
>
>
> Sorry, for stepping into this discussion again, but I don't think this
> statement is 100% correct. What Spark's checkpoint does is that it saves
> intermediate
I don't think this has anything to do with external shuffle services.
Arbitrarily recomputing data is fundamentally incompatible with Beam, since
Beam does not restrict transforms to being deterministic. The Spark runner
works (at least it did last I checked) by checkpointing the RDD. Spark will
> How will @RequiresStableInput prevent this situation when running batch
> use case?
>
So this is handled in combination of @RequiresStableInput and output file
finalization. @RequiresStableInput (or Reshuffle for most runners) makes
sure that the input provided for the write stage does not get
Hi Cham, thanks for the feedback
> Beam has a policy of no knobs (or keeping knobs minimum) to allow runners
to optimize better. I think one concern might be that the addition of this
option might be going against this.
I agree that less knobs is more. But if assignment of a key is specific per
Hi,
how do we proceed with reviewing MR proposed for this change?
I sense there is a concern exposing existing sharding function to the API.
But from the discussion here I do not have a clear picture about arguments
not doing so.
Only one argument was that dynamic destinations should be able to
The difference in my opinion is in distinguishing between - as written in
this thread - physical vs logical properties of the pipeline. I proposed to
keep dynamic destination (logical) and sharding (physical) separate on API
level as it is at implementation level.
When I reason about using `by()`
I'm not sure I understand your PR. How is this PR different than the by()
method in FileIO?
On Tue, Jun 22, 2021 at 1:22 AM Jozef Vilcek wrote:
> MR for review for this change is here:
> https://github.com/apache/beam/pull/15051
>
> On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek
> wrote:
>
>> I
MR for review for this change is here:
https://github.com/apache/beam/pull/15051
On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek wrote:
> I would like this thread to stay focused on sharding FileIO only. Possible
> change to the model is an interesting topic but of a much different scope.
>
> Yes,
I would like this thread to stay focused on sharding FileIO only. Possible
change to the model is an interesting topic but of a much different scope.
Yes, I agree that sharding is mostly a physical rather than logical
property of the pipeline. That is why it feels more natural to distinguish
Sharding is typically a physical rather than logical property of the
pipeline, and I'm not convinced it makes sense to add it to Beam in
general. One can already use keys and GBK/Stateful DoFns if some kind
of logical grouping is needed, and adding constraints like this can
prevent opportunities
Alright, but what is worth emphasizing is that we talk about batch workloads. The typical scenario is that the output is committed once the job finishes (e.g., by atomic rename of directory). JanDne 17. 6. 2021 17:59 napsal uživatel Reuven Lax :Yes - the problem is that Beam makes no guarantees of
Yes - the problem is that Beam makes no guarantees of determinism anywhere
in the system. User DoFns might be non deterministic, and have no way to
know (we've discussed proving users with an @IsDeterministic annotation,
however empirically users often think their functions are deterministic
when
Correct, by the external shuffle service I pretty much meant "offloading
the contents of a shuffle phase out of the system". Looks like that is
what the Spark's checkpoint does as well. On the other hand (if I
understand the concept correctly), that implies some performance penalty
- the data
I have some thoughts here, as Eugene Kirpichov and I spent a lot of time
working through these semantics in the past.
First - about the problem of duplicates:
A "deterministic" sharding - e.g. hashCode based (though Java makes no
guarantee that hashCode is stable across JVM instances, so this
I think that the support for @RequiresStableInput is rather limited.
AFAIK it is supported by streaming Flink (where it is not needed in this
situation) and by Dataflow. Batch runners without external shuffle
service that works as in the case of Dataflow have IMO no way to
implement it
On Wed, Jun 16, 2021 at 4:44 AM Jozef Vilcek wrote:
>
>
> On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles wrote:
>
>> In general, Beam only deals with keys and grouping by key. I think
>> expanding this idea to some more abstract notion of a sharding function
>> could make sense.
>>
>> For
On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský wrote:
> Hi,
>
> maybe a little unrelated, but I think we definitely should not use random
> assignment of shard keys (RandomShardingFunction), at least for bounded
> workloads (seems to be fine for streaming workloads). Many batch runners
> simply
Hi,
maybe a little unrelated, but I think we definitely should not use
random assignment of shard keys (RandomShardingFunction), at least for
bounded workloads (seems to be fine for streaming workloads). Many batch
runners simply recompute path in the computation DAG from the failed
node
On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles wrote:
> In general, Beam only deals with keys and grouping by key. I think
> expanding this idea to some more abstract notion of a sharding function
> could make sense.
>
> For FileIO specifically, I wonder if you can use writeDynamic() to get the
On Wed, Jun 16, 2021 at 12:49 AM Tyson Hamilton wrote:
> Adding sharding to the model may require a wider discussion than FileIO
> alone. I'm not entirely sure how wide, or if this has been proposed before,
> but IMO it warrants a design doc or proposal.
>
I should have been more clear about
In general, Beam only deals with keys and grouping by key. I think
expanding this idea to some more abstract notion of a sharding function
could make sense.
For FileIO specifically, I wonder if you can use writeDynamic() to get the
behavior you are seeking.
Kenn
On Tue, Jun 15, 2021 at 3:49 PM
Adding sharding to the model may require a wider discussion than FileIO
alone. I'm not entirely sure how wide, or if this has been proposed before,
but IMO it warrants a design doc or proposal.
A couple high level questions I can think of are,
- What runners support sharding?
* There will
I would like to extend FileIO with possibility to specify a custom sharding
function:
https://issues.apache.org/jira/browse/BEAM-12493
I have 2 use-cases for this:
1. I need to generate shards which are compatible with Hive bucketing
and therefore need to decide shard assignment based on
25 matches
Mail list logo