Re: FileIO with custom sharding function

2021-07-13 Thread Jozef Vilcek
I would like to bump this thread. So far we did not get much far. Allowing the user to specify a sharing function and not only the desired number of shards seems to be a very unsettling change. So far I do not have a good understanding of why this is so unacceptable. From the thread I sense some

Re: FileIO with custom sharding function

2021-07-07 Thread Jozef Vilcek
On Sat, Jul 3, 2021 at 7:30 PM Reuven Lax wrote: > > > On Sat, Jul 3, 2021 at 1:02 AM Jozef Vilcek wrote: > >> I don't think this has anything to do with external shuffle services. >> >> Arbitrarily recomputing data is fundamentally incompatible with Beam, >> since Beam does not restrict

Re: FileIO with custom sharding function

2021-07-07 Thread Jozef Vilcek
On Sat, Jul 3, 2021 at 12:55 PM Jan Lukavský wrote: > > I don't think this has anything to do with external shuffle services. > > > Sorry, for stepping into this discussion again, but I don't think this > statement is 100% correct. What Spark's checkpoint does is that it saves > intermediate

Re: FileIO with custom sharding function

2021-07-03 Thread Jozef Vilcek
I don't think this has anything to do with external shuffle services. Arbitrarily recomputing data is fundamentally incompatible with Beam, since Beam does not restrict transforms to being deterministic. The Spark runner works (at least it did last I checked) by checkpointing the RDD. Spark will

Re: FileIO with custom sharding function

2021-06-29 Thread Jozef Vilcek
> How will @RequiresStableInput prevent this situation when running batch > use case? > So this is handled in combination of @RequiresStableInput and output file finalization. @RequiresStableInput (or Reshuffle for most runners) makes sure that the input provided for the write stage does not get

Re: FileIO with custom sharding function

2021-06-28 Thread Jozef Vilcek
Hi Cham, thanks for the feedback > Beam has a policy of no knobs (or keeping knobs minimum) to allow runners to optimize better. I think one concern might be that the addition of this option might be going against this. I agree that less knobs is more. But if assignment of a key is specific per

Re: FileIO with custom sharding function

2021-06-27 Thread Jozef Vilcek
Hi, how do we proceed with reviewing MR proposed for this change? I sense there is a concern exposing existing sharding function to the API. But from the discussion here I do not have a clear picture about arguments not doing so. Only one argument was that dynamic destinations should be able to

Re: FileIO with custom sharding function

2021-06-23 Thread Jozef Vilcek
The difference in my opinion is in distinguishing between - as written in this thread - physical vs logical properties of the pipeline. I proposed to keep dynamic destination (logical) and sharding (physical) separate on API level as it is at implementation level. When I reason about using `by()`

Re: FileIO with custom sharding function

2021-06-22 Thread Reuven Lax
I'm not sure I understand your PR. How is this PR different than the by() method in FileIO? On Tue, Jun 22, 2021 at 1:22 AM Jozef Vilcek wrote: > MR for review for this change is here: > https://github.com/apache/beam/pull/15051 > > On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek > wrote: > >> I

Re: FileIO with custom sharding function

2021-06-22 Thread Jozef Vilcek
MR for review for this change is here: https://github.com/apache/beam/pull/15051 On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek wrote: > I would like this thread to stay focused on sharding FileIO only. Possible > change to the model is an interesting topic but of a much different scope. > > Yes,

Re: FileIO with custom sharding function

2021-06-18 Thread Jozef Vilcek
I would like this thread to stay focused on sharding FileIO only. Possible change to the model is an interesting topic but of a much different scope. Yes, I agree that sharding is mostly a physical rather than logical property of the pipeline. That is why it feels more natural to distinguish

Re: FileIO with custom sharding function

2021-06-17 Thread Robert Bradshaw
Sharding is typically a physical rather than logical property of the pipeline, and I'm not convinced it makes sense to add it to Beam in general. One can already use keys and GBK/Stateful DoFns if some kind of logical grouping is needed, and adding constraints like this can prevent opportunities

Re: FileIO with custom sharding function

2021-06-17 Thread je . ik
Alright, but what is worth emphasizing is that we talk about batch workloads. The typical scenario is that the output is committed once the job finishes (e.g., by atomic rename of directory). JanDne 17. 6. 2021 17:59 napsal uživatel Reuven Lax :Yes - the problem is that Beam makes no guarantees of

Re: FileIO with custom sharding function

2021-06-17 Thread Reuven Lax
Yes - the problem is that Beam makes no guarantees of determinism anywhere in the system. User DoFns might be non deterministic, and have no way to know (we've discussed proving users with an @IsDeterministic annotation, however empirically users often think their functions are deterministic when

Re: FileIO with custom sharding function

2021-06-17 Thread Jan Lukavský
Correct, by the external shuffle service I pretty much meant "offloading the contents of a shuffle phase out of the system". Looks like that is what the Spark's checkpoint does as well. On the other hand (if I understand the concept correctly), that implies some performance penalty - the data

Re: FileIO with custom sharding function

2021-06-16 Thread Reuven Lax
I have some thoughts here, as Eugene Kirpichov and I spent a lot of time working through these semantics in the past. First - about the problem of duplicates: A "deterministic" sharding - e.g. hashCode based (though Java makes no guarantee that hashCode is stable across JVM instances, so this

Re: FileIO with custom sharding function

2021-06-16 Thread Jan Lukavský
I think that the support for @RequiresStableInput is rather limited. AFAIK it is supported by streaming Flink (where it is not needed in this situation) and by Dataflow. Batch runners without external shuffle service that works as in the case of Dataflow have IMO no way to implement it

Re: FileIO with custom sharding function

2021-06-16 Thread Kenneth Knowles
On Wed, Jun 16, 2021 at 4:44 AM Jozef Vilcek wrote: > > > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles wrote: > >> In general, Beam only deals with keys and grouping by key. I think >> expanding this idea to some more abstract notion of a sharding function >> could make sense. >> >> For

Re: FileIO with custom sharding function

2021-06-16 Thread Kenneth Knowles
On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský wrote: > Hi, > > maybe a little unrelated, but I think we definitely should not use random > assignment of shard keys (RandomShardingFunction), at least for bounded > workloads (seems to be fine for streaming workloads). Many batch runners > simply

Re: FileIO with custom sharding function

2021-06-16 Thread Jan Lukavský
Hi, maybe a little unrelated, but I think we definitely should not use random assignment of shard keys (RandomShardingFunction), at least for bounded workloads (seems to be fine for streaming workloads). Many batch runners simply recompute path in the computation DAG from the failed node

Re: FileIO with custom sharding function

2021-06-16 Thread Jozef Vilcek
On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles wrote: > In general, Beam only deals with keys and grouping by key. I think > expanding this idea to some more abstract notion of a sharding function > could make sense. > > For FileIO specifically, I wonder if you can use writeDynamic() to get the

Re: FileIO with custom sharding function

2021-06-16 Thread Jozef Vilcek
On Wed, Jun 16, 2021 at 12:49 AM Tyson Hamilton wrote: > Adding sharding to the model may require a wider discussion than FileIO > alone. I'm not entirely sure how wide, or if this has been proposed before, > but IMO it warrants a design doc or proposal. > I should have been more clear about

Re: FileIO with custom sharding function

2021-06-15 Thread Kenneth Knowles
In general, Beam only deals with keys and grouping by key. I think expanding this idea to some more abstract notion of a sharding function could make sense. For FileIO specifically, I wonder if you can use writeDynamic() to get the behavior you are seeking. Kenn On Tue, Jun 15, 2021 at 3:49 PM

Re: FileIO with custom sharding function

2021-06-15 Thread Tyson Hamilton
Adding sharding to the model may require a wider discussion than FileIO alone. I'm not entirely sure how wide, or if this has been proposed before, but IMO it warrants a design doc or proposal. A couple high level questions I can think of are, - What runners support sharding? * There will

FileIO with custom sharding function

2021-06-15 Thread Jozef Vilcek
I would like to extend FileIO with possibility to specify a custom sharding function: https://issues.apache.org/jira/browse/BEAM-12493 I have 2 use-cases for this: 1. I need to generate shards which are compatible with Hive bucketing and therefore need to decide shard assignment based on