makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
Hi guys, The subject is a bit provocative but the topic is real and coming again and again with the beam usage: how a dofn can handle some "chunking". The need is to be able to commit each N records but with N not too big. The natural API for that in beam is the bundle one but bundles are not re

Re: makes bundle concept usable?

2017-11-15 Thread Reuven Lax
Bundles are currently completely up to the runner, and different runners do them differently. In addition to Flink, the Dataflow runner creates smallish bundles when run in streaming mode, as the streaming-mode runner is optimizing for latency (so a bundle might be small simply because not enough t

Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
Hi Reuven, how does it help since you will still "send to the transform" the data before the commit point of beam and therefore not be able to reprocess the same data in case of a failure between your eager flush and next commit point? Romain Manni-Bucau @rmannibucau | Blog | Old Blog | Github |

Re: makes bundle concept usable?

2017-11-15 Thread Jean-Baptiste Onofré
Hi Romain, You are right: currently, the chunking is related to bundles. Today, the bundle size is under the runner responsibility. I think it's fine because only the runner know an efficient bundle size. I'm afraid giving the "control" of the bundle size to the end user (via pipeline) can r

Re: makes bundle concept usable?

2017-11-15 Thread Reuven Lax
Romain, Are you saying that you need the set of elements in the bundle to be deterministic, so on retry you get the same bundle back? Reuven On Wed, Nov 15, 2017 at 4:44 PM, Romain Manni-Bucau wrote: > Hi Reuven, > > how does it help since you will still "send to the transform" the data > befo

Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré : > Hi Romain, > > You are right: currently, the chunking is related to bundles. Today, the > bundle size is under the runner responsibility. > > I think it's fine because only the runner know an efficient bundle size. I'm > afraid giving the "control"

Re: makes bundle concept usable?

2017-11-15 Thread Reuven Lax
Romain, I think the @StableReplay semantic that Kenn proposed a month or so ago is what is needed here. Essentially it will ensure that the GroupByKey iterable is stable and checkpointed. So on replay, the GroupByKey is guaranteed to receive the exact same iterable as it did before. The annotatio

Re: makes bundle concept usable?

2017-11-15 Thread Jean-Baptiste Onofré
Got it now. AFAIR, Kenn discussed about an annotation related to that recently. I don't remember the annotation, but basically it was a link between a group of elements (batch) and the checkpoint. Regards JB On 11/15/2017 09:49 AM, Romain Manni-Bucau wrote: 2017-11-15 9:46 GMT+01:00 Jean-Ba

Re: makes bundle concept usable?

2017-11-15 Thread Jean-Baptiste Onofré
Yes, @StableReplay, that's the annotation. Thanks. On 11/15/2017 09:52 AM, Reuven Lax wrote: Romain, I think the @StableReplay semantic that Kenn proposed a month or so ago is what is needed here. Essentially it will ensure that the GroupByKey iterable is stable and checkpointed. So on replay,

Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
It sounds a good start. I'm not sure how a group by key (and not by size) can help controlling the checkpointing interval. Wonder if we shouldn't be able to have a CheckpointPolicy { boolean shouldCheckpoint() } used in the processing event loop. Default could be up to the runner but if set on the

Re: makes bundle concept usable?

2017-11-15 Thread Reuven Lax
If you set @StableReplay before a ParDo, it forces a checkpoint before that ParDo. On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau wrote: > It sounds a good start. I'm not sure how a group by key (and not by > size) can help controlling the checkpointing interval. Wonder if we > shouldn't be

Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
Hmm, I didn't find the doc - if you have the link not far it would be appreciated - but "before" sounds not enough, it should be "after" in case there was a "flush" no? Romain Manni-Bucau @rmannibucau | Blog | Old Blog | Github | LinkedIn 2017-11-15 10:10 GMT+01:00 Reuven Lax : > If you set @St

Re: makes bundle concept usable?

2017-11-15 Thread Reuven Lax
It's in the dev list archives, not sure if there's a doc yet. I'm not quite sure I understand what you mean by a "flush" Can you describe the problem you're trying to solve? Reuven On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau wrote: > Hmm, I didn't find the doc - if you have the link no

Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
Overall goal is to ensure each 100 elements max, a "backend" (as datastore) flush/commit/push is done and is aligned with beam checkpoints. You can see it as bringing the "general" commit-interval notion to beam and kind of get rid of the bundle notion which is almost impossible to use today. Roma

Re: makes bundle concept usable?

2017-11-15 Thread Jean-Baptiste Onofré
And the control is given to the DoFn developer via annotations, right ? So, bundle would be "hidden" and be internal to the runner (which makes sense I think) and we introduce "control" points for the DoFn developer that the runner will deal with. Correct ? Regards JB On 11/15/2017 10:58 AM

Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
can be, here the options I have in mind: I. checkpoint marker: @AnyBeamAnnotation @CheckpointAfter public void someHook(SomeContext ctx); II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new CountingAlgo())) III. (I like this one less) // in the dofn @CheckpointTester public bo

Re: makes bundle concept usable?

2017-11-15 Thread Reuven Lax
Can we describe this at a higher level? I think what you want is the following. Please correct if I'm misunderstanding. Batches of 100 elements (is this a hard requirement, or do they have to be "approximately" 100 element?) Once you see a batch, you're guaranteed to see the same batch on retrie

Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
2017-11-15 11:42 GMT+01:00 Reuven Lax : > Can we describe this at a higher level? > > I think what you want is the following. Please correct if I'm > misunderstanding. > > Batches of 100 elements (is this a hard requirement, or do they have to be > "approximately" 100 element?) Approximately is fi

Re: makes bundle concept usable?

2017-11-15 Thread Reuven Lax
so I think the following will do exactly that and can be easily factored into a reusable transform (modulo Java type boilerplate): pCollection.apply(WithKeys.of((Element e) -> ThreadLocalRandom.current().nextInt(N)) .apply(Window.into(new GlobalWindows()) .triggering(AfterWaterm

Re: makes bundle concept usable?

2017-11-15 Thread Kenneth Knowles
In case the connection is not clear to folks on this thread, I pinged the thread on @StableReplay / @RequiresStableInput / etc and opened a draft PR at https://github.com/apache/beam/pull/4135. On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax wrote: > so I think the following will do exactly that and

Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
@Reuven: it looks like a good workaround @Ken: thks a lot for the link! @all: 1. do you think it is doable without windowing usage (to have something more reliable in term of runner since it will depend on less primitives? 2. what about allowing the user to define when to checkpoint? 3. can we ge

Re: makes bundle concept usable?

2017-11-15 Thread Jean-Baptiste Onofré
Hi Romain, 1. you always have the GlobalWindow at least. It's more related to trigger. 2. How would you define this ? With annotation (on what in that case) or using checkpoint method ? 3. Agree to have a core PTransform for that. Regards JB On 11/15/2017 02:16 PM, Romain Manni-Bucau wrote:

Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
2017-11-15 15:23 GMT+01:00 Jean-Baptiste Onofré : > Hi Romain, > > 1. you always have the GlobalWindow at least. It's more related to trigger. Yep but having an implicit window and being able to rely on window features are 2 things, think some runners don't support it yet so can be something not r

Re: makes bundle concept usable?

2017-11-16 Thread Reuven Lax
On Wed, Nov 15, 2017 at 9:16 PM, Romain Manni-Bucau wrote: > @Reuven: it looks like a good workaround > @Ken: thks a lot for the link! > > @all: > > 1. do you think it is doable without windowing usage (to have > something more reliable in term of runner since it will depend on less > primitives?

Re: makes bundle concept usable?

2017-11-16 Thread Romain Manni-Bucau
2017-11-16 12:18 GMT+01:00 Reuven Lax : > On Wed, Nov 15, 2017 at 9:16 PM, Romain Manni-Bucau > wrote: > >> @Reuven: it looks like a good workaround >> @Ken: thks a lot for the link! >> >> @all: >> >> 1. do you think it is doable without windowing usage (to have >> something more reliable in term

Re: makes bundle concept usable?

2017-11-16 Thread Raghu Angadi
Core issue here is that there is no explicit concept of 'checkpoint' in Beam (UnboundedSource has a method 'getCheckpointMark' but that refers to the checkoint on external source). Runners do checkpoint internally as implementation detail. Flink's checkpoint model is entirely different from Dataflo

Re: makes bundle concept usable?

2017-11-16 Thread Romain Manni-Bucau
This is a fair summary of the current state but also where beam can have a very strong added value and make big data great and smooth. Instead of this replay feature isnt checkpointing willable? In particular with SDF no? Le 16 nov. 2017 19:50, "Raghu Angadi" a écrit : > Core issue here is tha

Re: makes bundle concept usable?

2017-11-16 Thread Raghu Angadi
How would you define it (rough API is fine)?. Without more details, it is not easy to see wider applicability and feasibility in runners. On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau wrote: > This is a fair summary of the current state but also where beam can have a > very strong added va

Re: makes bundle concept usable?

2017-11-16 Thread Romain Manni-Bucau
Yes, what I propose earlier was: I. checkpoint marker: @AnyBeamAnnotation @CheckpointAfter public void someHook(SomeContext ctx); II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new CountingAlgo())) III. (I like this one less) // in the dofn @CheckpointTester public boolean sh

Re: makes bundle concept usable?

2017-11-16 Thread Reuven Lax
Romain, Can you define what you mean by checkpoint? What are the semantics, what does it accomplish? Reuven On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau wrote: > Yes, what I propose earlier was: > > I. checkpoint marker: > > @AnyBeamAnnotation > @CheckpointAfter > public void someHook(S

Re: makes bundle concept usable?

2017-11-16 Thread Romain Manni-Bucau
it gives the fn/transform the ability to save a state - it can get back on "restart" / whatever unit we can use, probably runner dependent? Without that you need to rewrite all IO usage with something like the previous pattern which makes the IO not self sufficient and kind of makes the entry cost

Re: makes bundle concept usable?

2017-11-16 Thread Jean-Baptiste Onofré
It sounds like the "Trigger" in the Splittable DoFn, no ? https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html Regards JB On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote: it gives the fn/transform the ability to save a state - it can get back on "restart" / whatever unit we can use,

Re: makes bundle concept usable?

2017-11-16 Thread Eugene Kirpichov
JB, not sure what you mean? SDFs and triggers are unrelated, and the post doesn't mention the word. Did you mean something else, e.g. restriction perhaps? Either way I don't think SDFs are the solution here; SDFs have to do with the ability to split the processing of *a single element* over multipl

Re: makes bundle concept usable?

2017-11-16 Thread Jean-Baptiste Onofré
Sorry, not trigger, I meant tracker (a bit early in the morning for me) ;) The tracker in the SDF controls the restriction/offset/etc. So I think it could be used to group elements no ? Regards JB On 11/17/2017 07:25 AM, Eugene Kirpichov wrote: JB, not sure what you mean? SDFs and triggers a

Re: makes bundle concept usable?

2017-11-16 Thread Romain Manni-Bucau
@Eugene: yes and the other alternative of Reuven too but it is still 1. relying on timers, 2. not really checkpointed In other words it seems all solutions are to create a chunk of size 1 and replayable to fake the lack of chunking in the framework. This always implies a chunk handling outside the

Re: makes bundle concept usable?

2017-11-16 Thread Eugene Kirpichov
To avoid spending a lot of time pursuing a false path, I'd like to say straight up that SDF is definitely not going to help here, despite the fact that its API includes the term "checkpoint". In SDF, the "checkpoint" captures the state of processing within a single element. If you're applying an SD

Re: makes bundle concept usable?

2017-11-16 Thread Jean-Baptiste Onofré
Thanks for the explanation. Agree, we might talk about different things using the same wording. I'm also struggling to understand the use case (for a generic DoFn). Regards JB On 11/17/2017 07:40 AM, Eugene Kirpichov wrote: To avoid spending a lot of time pursuing a false path, I'd like to sa

Re: makes bundle concept usable?

2017-11-16 Thread Romain Manni-Bucau
Ok, let me try to step back and summarize what we have today and what I miss: 1. we can handle chunking in beam through group in batch (or equivalent) but: > it is not built-in into the transforms (IO) and it is controlled from outside the transforms so no way for a transform to do it properly

Re: makes bundle concept usable?

2017-11-17 Thread Eugene Kirpichov
I must admit I'm still failing to understand the problem, so let's step back even further. Could you give an example of an IO that is currently difficult to implement specifically because of lack of the feature you're talking about? I'm asking because I've reviewed almost all Beam IOs and don't r

Re: makes bundle concept usable?

2017-11-17 Thread Romain Manni-Bucau
Yep, just take ES IO, if a part of a bundle fails you are in an unmanaged state. This is the case for all O (of IO ;)). Issue is not much about "1" (the code it takes) but more the fact it doesn't integrate with runner features and retries potentially: what happens if a bundle has a failure? => und

Re: makes bundle concept usable?

2017-11-17 Thread Eugene Kirpichov
The behavior if a bundle has a failure is quite defined: the entire bundle is considered failed and processing of the bundle's elements will get retried. The level at which retries are performed is unspecified: a runner would be allowed to retry the bundle, or it would be allowed to split the remai

Re: makes bundle concept usable?

2017-11-17 Thread Eugene Kirpichov
In case of Elasticsearch: Elasticsearch takes a PCollection with JSON documents, which may contain a document id. ES will overwrite a document with the same id if it exists, so in case of retries inserting the same document multiple times will not lead to duplicates. I guess the solution is to simp

Re: makes bundle concept usable?

2017-11-17 Thread Raghu Angadi
On Thu, Nov 16, 2017 at 10:40 PM, Eugene Kirpichov < kirpic...@google.com.invalid> wrote: > > [...] So it would help if you could give a > more concrete example: for example, take some IO that you think could be > easier to write with your proposed API, give the contents of a hypothetical > PCollec

Re: makes bundle concept usable?

2017-11-17 Thread Raghu Angadi
On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau wrote: > Yep, just take ES IO, if a part of a bundle fails you are in an > unmanaged state. This is the case for all O (of IO ;)). Issue is not > much about "1" (the code it takes) but more the fact it doesn't > integrate with runner features an

Re: makes bundle concept usable?

2017-11-18 Thread Romain Manni-Bucau
First bundle retry is unusable with dome runners like spark where the bundle size is the collection size / number of work. This means a user cant use bundle API or feature reliably and portably - which is beam promise. Aligning chunking and bundles would guarantee that bit can be not desired, that

Re: makes bundle concept usable?

2017-11-18 Thread Eugene Kirpichov
After giving this thread my best attempt at understanding exactly what is the problem and the proposed solution, I'm afraid I still fail to understand both. To reiterate, I think the only way to make progress here is to be more concrete: (quote) take some IO that you think could be easier to write

Re: makes bundle concept usable?

2017-11-18 Thread Romain Manni-Bucau
Eugene, point - and issue with a single sample - is you can always find *workarounds* on a case by case basis as the id one with ES but beam doesnt solve the problem as a framework. >From my past, I clearly dont see how batch frameworks solved that for years and beam is not able to do it - keep in

Re: makes bundle concept usable?

2017-11-18 Thread Eugene Kirpichov
I disagree that the usage of document id in ES is a "workaround" - it does not address any *accidental *complexity coming from shortcomings of Beam, it addresses the *essential* complexity that a distributed system forces one to take it as a fact of

Re: makes bundle concept usable?

2017-11-18 Thread Robert Bradshaw
There is a possible fourth issue that we don't handle well: efficiency. For very large bundles, it may be advantageous to avoid replaying a bunch of idempotent operations if there were a way to record what ones we're sure went through. Not sure if that's the issue here (though one could possibly do

Re: makes bundle concept usable?

2017-11-18 Thread Romain Manni-Bucau
@Eugene: "workaround" as specific to the IO each time and therefore still highlight a lack in the core. Other comments inline 2017-11-19 7:40 GMT+01:00 Robert Bradshaw : > There is a possible fourth issue that we don't handle well: efficiency. For > very large bundles, it may be advantageous to

Re: makes bundle concept usable?

2017-11-30 Thread Romain Manni-Bucau
Guys, what about moving getMaxBundleSize from flink options to pipeline options. I think all runners can support it right? Spark code needs the merge of the v2 before being able to be implemented probably but I don't see any blocker. wdyt? Romain Manni-Bucau @rmannibucau | Blog | Old Blog | Git

Re: makes bundle concept usable?

2017-11-30 Thread Jean-Baptiste Onofré
It sounds reasonable to me. And agree for Spark, I would like to merge Spark 2 update first. Regards JB On 11/30/2017 03:09 PM, Romain Manni-Bucau wrote: Guys, what about moving getMaxBundleSize from flink options to pipeline options. I think all runners can support it right? Spark code needs

Re: makes bundle concept usable?

2017-11-30 Thread Eugene Kirpichov
Very strong -1 from me: - Having a pipeline-global parameter is bad because it will apply to all transforms, with no ability to control it for individual transforms. This can go especially poorly because it means that when I write a transform, I don't know whether a user will set this parameter in

Re: makes bundle concept usable?

2017-11-30 Thread Reuven Lax
I don't think it belongs in PIpelineOptions, as bundle size is always a runner thing. We could consider adding a new generic RunnerOptions, however I'm not convinced all runners can actually support this. On Thu, Nov 30, 2017 at 6:09 AM, Romain Manni-Bucau wrote: > Guys, > > what about moving g

Re: makes bundle concept usable?

2017-11-30 Thread Romain Manni-Bucau
2017-11-30 18:11 GMT+01:00 Eugene Kirpichov : > Very strong -1 from me: > - Having a pipeline-global parameter is bad because it will apply to all > transforms, with no ability to control it for individual transforms. This > can go especially poorly because it means that when I write a transform, I

Re: makes bundle concept usable?

2017-11-30 Thread Ben Chambers
Beam includes a GroupIntoBatches transform (see https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java) which I believe was intended to be used as part of such a portable IO. It can be used to request that elements are divided in

Re: makes bundle concept usable?

2017-11-30 Thread Romain Manni-Bucau
@Ben: would all IO be rewritten to use that and the bundle concept dropped from the API to avoid any ambiguity and misleading usage like in current IOs? Romain Manni-Bucau @rmannibucau | Blog | Old Blog | Github | LinkedIn 2017-11-30 18:43 GMT+01:00 Ben Chambers : > Beam includes a GroupIntoBat

Re: makes bundle concept usable?

2017-11-30 Thread Eugene Kirpichov
"First immediately blocking issue is how to batch records reliably and *portably* (the biggest beam added-value IMHO). Since bundles are "flush" trigger for most IO it means ensuring the bundle size is somehow controllable or at least not set to a very small value OOTB." Please cite an existing IO

Re: makes bundle concept usable?

2017-11-30 Thread Kenneth Knowles
Bundles: they are not for user-controlled batching; they are for runner-controlled amortization across elements. Trying to use them in another way is a misunderstanding of the model. IOs: It is perfectly fine for an IO to use bundle boundaries. But that is just one tool that an IO can use to achie

Re: makes bundle concept usable?

2017-11-30 Thread Ben Chambers
I think both concepts likely need to co-exist: As described in the execution model [1] bundling is a runner-specific choice about how to execute a pipeline. This affects how frequently it may need to checkpoint during process, how much communication overhead there is between workers, the scope of

Re: makes bundle concept usable?

2017-11-30 Thread Jean-Baptiste Onofré
Agree, but maybe we can inform the runner if wanted no ? Honestly, from my side, I'm fine with the current situation as it's runner specific. Regards JB On 11/30/2017 06:12 PM, Reuven Lax wrote: I don't think it belongs in PIpelineOptions, as bundle size is always a runner thing. We could c

Re: makes bundle concept usable?

2017-11-30 Thread Romain Manni-Bucau
Hmm, ESIO: https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L847 JDBCIO: https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L592 MongoIO:

Re: makes bundle concept usable?

2017-11-30 Thread Kenneth Knowles
On Thu, Nov 30, 2017 at 10:03 AM, Romain Manni-Bucau wrote: > Hmm, > > ESIO: https://github.com/apache/beam/blob/master/sdks/java/io/ > elasticsearch/src/main/java/org/apache/beam/sdk/io/ > elasticsearch/ElasticsearchIO.java#L847 > JDBCIO: https://github.com/apache/beam/blob/master/sdks/java/io/

Re: makes bundle concept usable?

2017-11-30 Thread Romain Manni-Bucau
Le 30 nov. 2017 19:23, "Kenneth Knowles" a écrit : On Thu, Nov 30, 2017 at 10:03 AM, Romain Manni-Bucau wrote: > Hmm, > > ESIO: https://github.com/apache/beam/blob/master/sdks/java/io/elas > ticsearch/src/main/java/org/apache/beam/sdk/io/elasticsear > ch/ElasticsearchIO.java#L847 > JDBCIO: http

Re: makes bundle concept usable?

2017-11-30 Thread Eugene Kirpichov
So is your main concern potential poor performance on runners that choose to use a very small bundle size? (Currently an IO can trivially handle too large bundles, simply by flushing when enough data accumulates, which is what all IOs do - but indeed working around having unreasonably small bundles

Re: makes bundle concept usable?

2017-11-30 Thread Eugene Kirpichov
I mean: if these runners have some limitation that forces them into only supporting tiny bundles, there's a good chance that this limitation will also apply to whatever beam model API you propose as a fix, and they won't be able to implement it. On Thu, Nov 30, 2017, 11:19 AM Eugene Kirpichov wro

Re: makes bundle concept usable?

2017-11-30 Thread Romain Manni-Bucau
This is my short term concern yes. Note that the opposite is not sane neither (too big) cause it forces eager flushes in all IO (so instead of fixing it once in a single code location you impact everyone N times). However it is not blocking as the small bundle size issue. Next concenr is commit ha

Re: makes bundle concept usable?

2017-11-30 Thread Romain Manni-Bucau
2017-11-30 20:36 GMT+01:00 Kenneth Knowles : > On Thu, Nov 30, 2017 at 11:28 AM, Romain Manni-Bucau > wrote: >> >> This is my short term concern yes. Note that the opposite is not sane >> neither (too big) cause it forces eager flushes in all IO (so instead >> of fixing it once in a single code lo