Agree, but maybe we can inform the runner if wanted no ?

Honestly, from my side, I'm fine with the current situation as it's runner 
specific.

Regards
JB

On 11/30/2017 06:12 PM, Reuven Lax wrote:
I don't think it belongs in PIpelineOptions, as bundle size is always a runner thing.

We could consider adding a new generic RunnerOptions, however I'm not convinced all runners can actually support this.

On Thu, Nov 30, 2017 at 6:09 AM, Romain Manni-Bucau <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>> wrote:

    Guys,

    what about moving getMaxBundleSize from flink options to pipeline
    options. I think all runners can support it right? Spark code needs
    the merge of the v2 before being able to be implemented probably but I
    don't see any blocker.

    wdyt?

    Romain Manni-Bucau
    @rmannibucau |  Blog | Old Blog | Github | LinkedIn


    2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau <rmannibu...@gmail.com
    <mailto:rmannibu...@gmail.com>>:
     > @Eugene: "workaround" as specific to the IO each time and therefore
     > still highlight a lack in the core.
     >
     > Other comments inline
     >
     >
     > 2017-11-19 7:40 GMT+01:00 Robert Bradshaw <rober...@google.com.invalid>:
     >> There is a possible fourth issue that we don't handle well: efficiency. 
For
     >> very large bundles, it may be advantageous to avoid replaying a bunch of
     >> idempotent operations if there were a way to record what ones we're sure
     >> went through. Not sure if that's the issue here (though one could 
possibly
     >> do this with SDFs, one can preemptively returning periodically before an
     >> element (or portion thereof) is done).
     >
     > +1, also lead to the IO handling its own chunking/bundles and
     > therefore solves all issues at once.
     >
     >>
     >> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov <
     >> kirpic...@google.com.invalid> wrote:
     >>
     >>> I disagree that the usage of document id in ES is a "workaround" - it 
does
     >>> not address any *accidental *complexity
     >>> <https://en.wikipedia.org/wiki/No_Silver_Bullet
    <https://en.wikipedia.org/wiki/No_Silver_Bullet>> coming from shortcomings
     >>> of Beam, it addresses the *essential* complexity that a distributed 
system
     >>> forces one to take it as a fact of nature that the same write
     >>> (mutation) will happen multiple times, so if you want a mutation to 
happen
     >>> "as-if" it happened exactly once, the mutation itself must be 
idempotent
     >>> <https://en.wikipedia.org/wiki/Idempotence
    <https://en.wikipedia.org/wiki/Idempotence>>. Insert-with-id (upsert
     >>> <https://en.wikipedia.org/wiki/Merge_(SQL)
    <https://en.wikipedia.org/wiki/Merge_(SQL)>>) is a classic example of an
     >>> idempotent mutation, and it's very good that Elasticsearch provides it 
- if
     >>> it didn't, no matter how good of an API Beam had, achieving 
exactly-once
     >>> writes would be theoretically impossible. Are we in agreement on this 
so
     >>> far?
     >>>
     >>> Next: you seem to be discussing 3 issues together, all of which are 
valid
     >>> issues, but they seem unrelated to me:
     >>> 1. Exactly-once mutation
     >>> 2. Batching multiple mutations into one RPC.
     >>> 3. Backpressure
     >>>
     >>> #1: was considered above. The system the IO is talking to has to 
support
     >>> idempotent mutations, in an IO-specific way, and the IO has to take
     >>> advantage of them, in the IO-specific way - end of story.
     >
     > Agree but don't forget the original point was about "chunks" and not
     > individual records.
     >
     >>>
     >>> #2: a batch of idempotent operations is also idempotent, so this 
doesn't
     >>> add anything new semantically. Syntactically - Beam already allows you 
to
     >>> write your own batching by notifying you of permitted batch boundaries
     >>> (Start/FinishBundle). Sure, it could do more, but from my experience 
the
     >>> batching in IOs I've seen is one of the easiest and least error-prone
     >>> parts, so I don't see something worth an extended discussion here.
     >
     > "Beam already allows you to
     >  write your own batching by notifying you of permitted batch boundaries
     >  (Start/FinishBundle)"
     >
     > Is wrong since the bundle is potentially the whole PCollection (spark)
     > so this is not even an option until you use the SDF (back to the same
     > point).
     > Once again the API looks fine but no implementation makes it true. It
     > would be easy to change it in spark, flink can be ok since it targets
     > more the streaming case, not sure of others, any idea?
     >
     >
     >>>
     >>> #3: handling backpressure is a complex problem with multiple facets: 
1) how
     >>> do you know you're being throttled, and by how much are you exceeding 
the
     >>> external system's capacity?
     >
     > This is the whole point of backpressure, the system sends it back to
     > you (header like or status technic in general)
     >
     >>> 2) how do you communicate this signal to the
     >>> runner?
     >
     > You are a client so you get the meta in the response - whatever techno.
     >
     >>> 3) what does the runner do in response?
     >
     > Runner nothing but the IO adapts its handling as mentionned before
     > (wait and retry, skip, ... depending the config)
     >
     >>> 4) how do you wait until
     >>> it's ok to try again?
     >
     > This is one point to probably enhance in beam but waiting in the
     > processing is an option if the source has some buffering otherwise it
     > requires to have a buffer fallback and max size if the wait mode is
     > activated.
     >
     >>> You seem to be advocating for solving one facet of this problem, which 
is:
     >>> you want it to be possible to signal to the runner "I'm being 
throttled,
     >>> please end the bundle", right? If so - I think this (ending the 
bundle) is
     >>> unnecessary: the DoFn can simply do an exponential back-off sleep loop.
     >
     > Agree, never said the runner should know but GBK+output doesnt work
     > cause you dont own the GBK.
     >
     >>> This is e.g. what DatastoreIO does
     >>> <https://github.com/apache/beam/blob/master/sdks/java/io/
    <https://github.com/apache/beam/blob/master/sdks/java/io/>
     >>> google-cloud-platform/src/main/java/org/apache/beam/sdk/
     >>> io/gcp/datastore/DatastoreV1.java#L1318>
     >>> and
     >>> this is in general how most systems I've seen handle backpressure. Is 
there
     >>> something I'm missing? In particular, is there any compelling reason 
why
     >>> you think it'd be beneficial e.g. for DatastoreIO to commit the 
results of
     >>> the bundle so far before processing other elements?
     >
     > It was more about ensuring you validate early a subset of the whole
     > bundle and avoid to reprocess it if it fails later.
     >
     >
     > So to summarize I see 2 outcomes:
     >
     > 1. impl SDF in all runners
     > 2. make the bundle size upper bounded - through a pipeline option - in
     > all runners, not sure this one is doable everywhere since I mainly
     > checked spark case
     >
     >>>
     >>> Again, it might be that I'm still misunderstanding what you're trying 
to
     >>> say. One of the things it would help to clarify would be - exactly 
what do
     >>> you mean by "how batch frameworks solved that for years": can you 
point at
     >>> an existing API in some other framework that achieves what you want?
     >>>
     >>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau
    <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
     >>> wrote:
     >>>
     >>> > Eugene, point - and issue with a single sample - is you can always 
find
     >>> > *workarounds* on a case by case basis as the id one with ES but beam
     >>> doesnt
     >>> > solve the problem as a framework.
     >>> >
     >>> > From my past, I clearly dont see how batch frameworks solved that for
     >>> years
     >>> > and beam is not able to do it - keep in mind it is the same kind of
     >>> techno,
     >>> > it just uses different sources and bigger clusters so no real reason 
to
     >>> not
     >>> > have the same feature quality. The only potential reason i see is 
there
     >>> is
     >>> > no tracking of the state into the cluster - e2e. But i dont see why 
there
     >>> > wouldnt be. Do I miss something here?
     >>> >
     >>> > An example could be: take a github crawler computing stats on the 
whole
     >>> > girhub repos which is based on a rest client as example. You will 
need to
     >>> > handle the rate limit and likely want to "commit" each time you 
reach a
     >>> > rate limit with likely some buffering strategy with a max size before
     >>> > really waiting. How do you do it with a GBK independent of your 
dofn? You
     >>> > are not able to compose correctly the fn between them :(.
     >>> >
     >>> >
     >>> > Le 18 nov. 2017 20:48, "Eugene Kirpichov" 
<kirpic...@google.com.invalid>
     >>> a
     >>> > écrit :
     >>> >
     >>> > After giving this thread my best attempt at understanding exactly 
what is
     >>> > the problem and the proposed solution, I'm afraid I still fail to
     >>> > understand both. To reiterate, I think the only way to make progress 
here
     >>> > is to be more concrete: (quote) take some IO that you think could be
     >>> easier
     >>> > to write with your proposed API, give the contents of a hypothetical
     >>> > PCollection being written to this IO, give the code of a hypothetical
     >>> DoFn
     >>> > implementing the write using your API, and explain what you'd expect 
to
     >>> > happen at runtime. I'm going to re-engage in this thread after such 
an
     >>> > example is given.
     >>> >
     >>> > On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau
    <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
     >>> > wrote:
     >>> >
     >>> > > First bundle retry is unusable with dome runners like spark where 
the
     >>> > > bundle size is the collection size / number of work. This means a 
user
     >>> > cant
     >>> > > use bundle API or feature reliably and portably - which is beam
     >>> promise.
     >>> > > Aligning chunking and bundles would guarantee that bit can be not
     >>> > desired,
     >>> > > that is why i thought it can be another feature.
     >>> > >
     >>> > > GBK works until the IO knows about that and both concepts are not
     >>> always
     >>> > > orthogonal - backpressure like systems is a trivial common example.
     >>> This
     >>> > > means the IO (dofn) must be able to do it itself at some point.
     >>> > >
     >>> > > Also note the GBK works only if the IO can take a list which is 
never
     >>> the
     >>> > > case today.
     >>> > >
     >>> > > Big questions for me are: is SDF the way to go since it provides 
the
     >>> > needed
     >>> > > API bit is not yet supported? What about existing IO? Should beam
     >>> provide
     >>> > > an auto wrapping of dofn for that pre-aggregated support and 
simulate
     >>> > > bundles to the actual IO impl to keep the existing API?
     >>> > >
     >>> > >
     >>> > > Le 17 nov. 2017 19:20, "Raghu Angadi" <rang...@google.com.invalid> 
a
     >>> > > écrit :
     >>> > >
     >>> > > On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau <
     >>> > rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>
     >>> > > >
     >>> > > wrote:
     >>> > >
     >>> > > > Yep, just take ES IO, if a part of a bundle fails you are in an
     >>> > > > unmanaged state. This is the case for all O (of IO ;)). Issue is 
not
     >>> > > > much about "1" (the code it takes) but more the fact it doesn't
     >>> > > > integrate with runner features and retries potentially: what 
happens
     >>> > > > if a bundle has a failure? => undefined today. 2. I'm fine with 
it
     >>> > > > while we know exactly what happens when we restart after a bundle
     >>> > > > failure. With ES the timestamp can be used for instance.
     >>> > > >
     >>> > >
     >>> > > This deterministic batching can be achieved even now with an extra
     >>> > > GroupByKey (and if you want ordering on top of that, will need 
another
     >>> > > GBK). Don't know if that is too costly in your case. I would need 
bit
     >>> > more
     >>> > > details on handling ES IO write retries to see it could be 
simplified.
     >>> > Note
     >>> > > that retries occur with or without any failures in your DoFn.
     >>> > >
     >>> > > The biggest negative with GBK approach is that it doesn't provide 
same
     >>> > > guarantees on Flink.
     >>> > >
     >>> > > I don't see how GroubIntoBatches in Beam provides specific 
guarantees
     >>> on
     >>> > > deterministic batches.
     >>> > >
     >>> > > Thinking about it the SDF is really a way to do it since the SDF 
will
     >>> > > > manage the bulking and associated with the runner "retry" it 
seems it
     >>> > > > covers the needs.
     >>> > > >
     >>> > > > Romain Manni-Bucau
     >>> > > > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
     >>> > > >
     >>> > > >
     >>> > > > 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov
     >>> > <kirpic...@google.com.invalid
     >>> > > >:
     >>> > > > > I must admit I'm still failing to understand the problem, so 
let's
     >>> > step
     >>> > > > > back even further.
     >>> > > > >
     >>> > > > > Could you give an example of an IO that is currently difficult 
to
     >>> > > > implement
     >>> > > > > specifically because of lack of the feature you're talking 
about?
     >>> > > > >
     >>> > > > > I'm asking because I've reviewed almost all Beam IOs and don't
     >>> recall
     >>> > > > > seeing a similar problem. Sure, a lot of IOs do batching 
within a
     >>> > > bundle,
     >>> > > > > but 1) it doesn't take up much code (granted, it would be even
     >>> easier
     >>> > > if
     >>> > > > > Beam did it for us) and 2) I don't remember any of them 
requiring
     >>> the
     >>> > > > > batches to be deterministic, and I'm having a hard time 
imagining
     >>> > what
     >>> > > > kind
     >>> > > > > of storage system would be able to deduplicate if batches were
     >>> > > > > deterministic but wouldn't be able to deduplicate if they 
weren't.
     >>> > > > >
     >>> > > > > On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau <
     >>> > > > rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
     >>> > > > > wrote:
     >>> > > > >
     >>> > > > >> Ok, let me try to step back and summarize what we have today 
and
     >>> > what
     >>> > > I
     >>> > > > >> miss:
     >>> > > > >>
     >>> > > > >> 1. we can handle chunking in beam through group in batch (or
     >>> > > equivalent)
     >>> > > > >> but:
     >>> > > > >>    > it is not built-in into the transforms (IO) and it is
     >>> > controlled
     >>> > > > >> from outside the transforms so no way for a transform to do it
     >>> > > > >> properly without handling itself a composition and links 
between
     >>> > > > >> multiple dofns to have notifications and potentially react
     >>> properly
     >>> > or
     >>> > > > >> handle backpressure from its backend
     >>> > > > >> 2. there is no restart feature because there is no real state
     >>> > handling
     >>> > > > >> at the moment. this sounds fully delegated to the runner but 
I was
     >>> > > > >> hoping to have more guarantees from the used API to be able to
     >>> > restart
     >>> > > > >> a pipeline (mainly batch since it can be irrelevant or 
delegates
     >>> to
     >>> > > > >> the backend for streams) and handle only not commited records 
so
     >>> it
     >>> > > > >> requires some persistence outside the main IO storages to do 
it
     >>> > > > >> properly
     >>> > > > >>    > note this is somehow similar to the monitoring topic 
which
     >>> miss
     >>> > > > >> persistence ATM so it can end up to beam to have a pluggable
     >>> storage
     >>> > > > >> for a few concerns
     >>> > > > >>
     >>> > > > >>
     >>> > > > >> Short term I would be happy with 1 solved properly, long term 
I
     >>> hope
     >>> > 2
     >>> > > > >> will be tackled without workarounds requiring custom wrapping 
of
     >>> IO
     >>> > to
     >>> > > > >> use a custom state persistence.
     >>> > > > >>
     >>> > > > >>
     >>> > > > >>
     >>> > > > >> Romain Manni-Bucau
     >>> > > > >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
     >>> > > > >>
     >>> > > > >>
     >>> > > > >> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré
    <j...@nanthrax.net <mailto:j...@nanthrax.net>>:
     >>> > > > >> > Thanks for the explanation. Agree, we might talk about 
different
     >>> > > > things
     >>> > > > >> > using the same wording.
     >>> > > > >> >
     >>> > > > >> > I'm also struggling to understand the use case (for a 
generic
     >>> > DoFn).
     >>> > > > >> >
     >>> > > > >> > Regards
     >>> > > > >> > JB
     >>> > > > >> >
     >>> > > > >> >
     >>> > > > >> > On 11/17/2017 07:40 AM, Eugene Kirpichov wrote:
     >>> > > > >> >>
     >>> > > > >> >> To avoid spending a lot of time pursuing a false path, I'd 
like
     >>> > to
     >>> > > > say
     >>> > > > >> >> straight up that SDF is definitely not going to help here,
     >>> > despite
     >>> > > > the
     >>> > > > >> >> fact
     >>> > > > >> >> that its API includes the term "checkpoint". In SDF, the
     >>> > > "checkpoint"
     >>> > > > >> >> captures the state of processing within a single element. 
If
     >>> > you're
     >>> > > > >> >> applying an SDF to 1000 elements, it will, like any other 
DoFn,
     >>> > be
     >>> > > > >> applied
     >>> > > > >> >> to each of them independently and in parallel, and you'll 
have
     >>> > 1000
     >>> > > > >> >> checkpoints capturing the state of processing each of these
     >>> > > elements,
     >>> > > > >> >> which
     >>> > > > >> >> is probably not what you want.
     >>> > > > >> >>
     >>> > > > >> >> I'm afraid I still don't understand what kind of 
checkpoint you
     >>> > > > need, if
     >>> > > > >> >> it
     >>> > > > >> >> is not just deterministic grouping into batches. 
"Checkpoint"
     >>> is
     >>> > a
     >>> > > > very
     >>> > > > >> >> broad term and it's very possible that everybody in this 
thread
     >>> > is
     >>> > > > >> talking
     >>> > > > >> >> about different things when saying it. So it would help if 
you
     >>> > > could
     >>> > > > >> give
     >>> > > > >> >> a
     >>> > > > >> >> more concrete example: for example, take some IO that you 
think
     >>> > > > could be
     >>> > > > >> >> easier to write with your proposed API, give the contents 
of a
     >>> > > > >> >> hypothetical
     >>> > > > >> >> PCollection being written to this IO, give the code of a
     >>> > > hypothetical
     >>> > > > >> DoFn
     >>> > > > >> >> implementing the write using your API, and explain what 
you'd
     >>> > > expect
     >>> > > > to
     >>> > > > >> >> happen at runtime.
     >>> > > > >> >>
     >>> > > > >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau
     >>> > > > >> >> <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
     >>> > > > >> >> wrote:
     >>> > > > >> >>
     >>> > > > >> >>> @Eugene: yes and the other alternative of Reuven too but 
it is
     >>> > > still
     >>> > > > >> >>> 1. relying on timers, 2. not really checkpointed
     >>> > > > >> >>>
     >>> > > > >> >>> In other words it seems all solutions are to create a 
chunk of
     >>> > > size
     >>> > > > 1
     >>> > > > >> >>> and replayable to fake the lack of chunking in the 
framework.
     >>> > This
     >>> > > > >> >>> always implies a chunk handling outside the component
     >>> (typically
     >>> > > > >> >>> before for an output). My point is I think IO need it in 
their
     >>> > own
     >>> > > > >> >>> "internal" or at least control it themselves since the 
chunk
     >>> > size
     >>> > > is
     >>> > > > >> >>> part of the IO handling most of the time.
     >>> > > > >> >>>
     >>> > > > >> >>> I think JB spoke of the same "group before" trick using
     >>> > > restrictions
     >>> > > > >> >>> which can work I have to admit if SDF are implemented by
     >>> > runners.
     >>> > > Is
     >>> > > > >> >>> there a roadmap/status on that? Last time I checked SDF 
was a
     >>> > > great
     >>> > > > >> >>> API without support :(.
     >>> > > > >> >>>
     >>> > > > >> >>>
     >>> > > > >> >>>
     >>> > > > >> >>> Romain Manni-Bucau
     >>> > > > >> >>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
     >>> > > > >> >>>
     >>> > > > >> >>>
     >>> > > > >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
     >>> > > > >> >>> <kirpic...@google.com.invalid>:
     >>> > > > >> >>>>
     >>> > > > >> >>>> JB, not sure what you mean? SDFs and triggers are 
unrelated,
     >>> > and
     >>> > > > the
     >>> > > > >> >>>> post
     >>> > > > >> >>>> doesn't mention the word. Did you mean something else, 
e.g.
     >>> > > > >> restriction
     >>> > > > >> >>>> perhaps? Either way I don't think SDFs are the solution 
here;
     >>> > > SDFs
     >>> > > > >> have
     >>> > > > >> >>>
     >>> > > > >> >>> to
     >>> > > > >> >>>>
     >>> > > > >> >>>> do with the ability to split the processing of *a single
     >>> > element*
     >>> > > > over
     >>> > > > >> >>>> multiple calls, whereas Romain I think is asking for
     >>> repeatable
     >>> > > > >> grouping
     >>> > > > >> >>>
     >>> > > > >> >>> of
     >>> > > > >> >>>>
     >>> > > > >> >>>> *multiple* elements.
     >>> > > > >> >>>>
     >>> > > > >> >>>> Romain - does
     >>> > > > >> >>>>
     >>> > > > >> >>>
     >>> > > > >> >>>
     >>> > > > >> https://github.com/apache/beam/blob/master/sdks/java/
    <https://github.com/apache/beam/blob/master/sdks/java/>
     >>> > > > core/src/main/java/org/apache/beam/sdk/transforms/
     >>> GroupIntoBatches.java
     >>> > > > >> >>>>
     >>> > > > >> >>>> do what
     >>> > > > >> >>>> you want?
     >>> > > > >> >>>>
     >>> > > > >> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <
     >>> > > > >> j...@nanthrax.net <mailto:j...@nanthrax.net>>
     >>> > > > >> >>>> wrote:
     >>> > > > >> >>>>
     >>> > > > >> >>>>> It sounds like the "Trigger" in the Splittable DoFn, no 
?
     >>> > > > >> >>>>>
     >>> > > > >> >>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn
    <https://beam.apache.org/blog/2017/08/16/splittable-do-fn>.
     >>> html
     >>> > > > >> >>>>>
     >>> > > > >> >>>>> Regards
     >>> > > > >> >>>>> JB
     >>> > > > >> >>>>>
     >>> > > > >> >>>>>
     >>> > > > >> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
     >>> > > > >> >>>>>>
     >>> > > > >> >>>>>> it gives the fn/transform the ability to save a state 
- it
     >>> > can
     >>> > > > get
     >>> > > > >> >>>>>> back on "restart" / whatever unit we can use, probably
     >>> runner
     >>> > > > >> >>>>>> dependent? Without that you need to rewrite all IO 
usage
     >>> with
     >>> > > > >> >>>>>> something like the previous pattern which makes the IO 
not
     >>> > self
     >>> > > > >> >>>>>> sufficient and kind of makes the entry cost and usage 
of
     >>> beam
     >>> > > way
     >>> > > > >> >>>>>> further.
     >>> > > > >> >>>>>>
     >>> > > > >> >>>>>> In my mind it is exactly what jbatch/spring-batch uses 
but
     >>> > > > adapted
     >>> > > > >> to
     >>> > > > >> >>>>>> beam (stream in particular) case.
     >>> > > > >> >>>>>>
     >>> > > > >> >>>>>> Romain Manni-Bucau
     >>> > > > >> >>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
     >>> > > > >> >>>>>>
     >>> > > > >> >>>>>>
     >>> > > > >> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax
     >>> > <re...@google.com.invalid
     >>> > > >:
     >>> > > > >> >>>>>>>
     >>> > > > >> >>>>>>> Romain,
     >>> > > > >> >>>>>>>
     >>> > > > >> >>>>>>> Can you define what you mean by checkpoint? What are 
the
     >>> > > > semantics,
     >>> > > > >> >>>
     >>> > > > >> >>> what
     >>> > > > >> >>>>>>>
     >>> > > > >> >>>>>>> does it accomplish?
     >>> > > > >> >>>>>>>
     >>> > > > >> >>>>>>> Reuven
     >>> > > > >> >>>>>>>
     >>> > > > >> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <
     >>> > > > >> >>>>>
     >>> > > > >> >>>>> rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
     >>> > > > >> >>>>>>>
     >>> > > > >> >>>>>>> wrote:
     >>> > > > >> >>>>>>>
     >>> > > > >> >>>>>>>> Yes, what I propose earlier was:
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> I. checkpoint marker:
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> @AnyBeamAnnotation
     >>> > > > >> >>>>>>>> @CheckpointAfter
     >>> > > > >> >>>>>>>> public void someHook(SomeContext ctx);
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> II. pipeline.apply(ParDo.of(new
     >>> > > > >> MyFn()).withCheckpointAlgorithm(new
     >>> > > > >> >>>>>>>> CountingAlgo()))
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> III. (I like this one less)
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> // in the dofn
     >>> > > > >> >>>>>>>> @CheckpointTester
     >>> > > > >> >>>>>>>> public boolean shouldCheckpoint();
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); in 
the
     >>> dofn
     >>> > > per
     >>> > > > >> >>>
     >>> > > > >> >>> element
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> Romain Manni-Bucau
     >>> > > > >> >>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi
     >>> > > > <rang...@google.com.invalid
     >>> > > > >> >>>>
     >>> > > > >> >>>> :
     >>> > > > >> >>>>>>>>>
     >>> > > > >> >>>>>>>>> How would you define it (rough API is fine)?. 
Without
     >>> more
     >>> > > > >> details,
     >>> > > > >> >>>>>
     >>> > > > >> >>>>> it is
     >>> > > > >> >>>>>>>>>
     >>> > > > >> >>>>>>>>> not easy to see wider applicability and feasibility 
in
     >>> > > > runners.
     >>> > > > >> >>>>>>>>>
     >>> > > > >> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau 
<
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
     >>> > > > >> >>>>>>>>>
     >>> > > > >> >>>>>>>>> wrote:
     >>> > > > >> >>>>>>>>>
     >>> > > > >> >>>>>>>>>> This is a fair summary of the current state but 
also
     >>> > where
     >>> > > > beam
     >>> > > > >> >>>
     >>> > > > >> >>> can
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> have a
     >>> > > > >> >>>>>>>>>>
     >>> > > > >> >>>>>>>>>> very strong added value and make big data great and
     >>> > smooth.
     >>> > > > >> >>>>>>>>>>
     >>> > > > >> >>>>>>>>>> Instead of this replay feature isnt checkpointing
     >>> > willable?
     >>> > > > In
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> particular
     >>> > > > >> >>>>>>>>>>
     >>> > > > >> >>>>>>>>>> with SDF no?
     >>> > > > >> >>>>>>>>>>
     >>> > > > >> >>>>>>>>>>
     >>> > > > >> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi"
     >>> > > > >> <rang...@google.com.invalid>
     >>> > > > >> >>>
     >>> > > > >> >>> a
     >>> > > > >> >>>>>>>>>>
     >>> > > > >> >>>>>>>>>> écrit :
     >>> > > > >> >>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> Core issue here is that there is no explicit 
concept
     >>> of
     >>> > > > >> >>>
     >>> > > > >> >>> 'checkpoint'
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> in
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> Beam (UnboundedSource has a method 
'getCheckpointMark'
     >>> > but
     >>> > > > that
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> refers to
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> the checkoint on external source). Runners do
     >>> checkpoint
     >>> > > > >> >>>
     >>> > > > >> >>> internally
     >>> > > > >> >>>>>
     >>> > > > >> >>>>> as
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> implementation detail. Flink's checkpoint model is
     >>> > > entirely
     >>> > > > >> >>>>>
     >>> > > > >> >>>>> different
     >>> > > > >> >>>>>>>>>>
     >>> > > > >> >>>>>>>>>> from
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> Dataflow's and Spark's.
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> @StableReplay helps, but it does not explicitly 
talk
     >>> > about
     >>> > > a
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> checkpoint
     >>> > > > >> >>>>>>>>>>
     >>> > > > >> >>>>>>>>>> by
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> design.
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> If you are looking to achieve some guarantees 
with a
     >>> > > > >> sink/DoFn, I
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> think
     >>> > > > >> >>>>>>>>>>
     >>> > > > >> >>>>>>>>>> it
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> is better to start with the requirements. I 
worked on
     >>> > > > >> >>>
     >>> > > > >> >>> exactly-once
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> sink
     >>> > > > >> >>>>>>>>>>
     >>> > > > >> >>>>>>>>>> for
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where we
     >>> > > essentially
     >>> > > > >> >>>
     >>> > > > >> >>> reshard
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> the
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> elements and assign sequence numbers to elements 
with
     >>> in
     >>> > > > each
     >>> > > > >> >>>
     >>> > > > >> >>> shard.
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> Duplicates in replays are avoided based on these
     >>> > sequence
     >>> > > > >> >>>
     >>> > > > >> >>> numbers.
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> DoFn
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> state API is used to buffer out-of order replays. 
The
     >>> > > > >> >>>
     >>> > > > >> >>> implementation
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> strategy works in Dataflow but not in Flink which 
has
     >>> a
     >>> > > > >> >>>
     >>> > > > >> >>> horizontal
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> checkpoint. KafkaIO checks for compatibility.
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau 
<
     >>> > > > >> >>>>>>>>>>> rmannibu...@gmail.com 
<mailto:rmannibu...@gmail.com>>
     >>> > > > >> >>>>>>>>>>> wrote:
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>> Hi guys,
     >>> > > > >> >>>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>> The subject is a bit provocative but the topic is
     >>> real
     >>> > > and
     >>> > > > >> >>>
     >>> > > > >> >>> coming
     >>> > > > >> >>>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>> again and again with the beam usage: how a dofn 
can
     >>> > > handle
     >>> > > > >> some
     >>> > > > >> >>>>>>>>>>>> "chunking".
     >>> > > > >> >>>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>> The need is to be able to commit each N records 
but
     >>> > with
     >>> > > N
     >>> > > > not
     >>> > > > >> >>>
     >>> > > > >> >>> too
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> big.
     >>> > > > >> >>>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>> The natural API for that in beam is the bundle 
one
     >>> but
     >>> > > > bundles
     >>> > > > >> >>>
     >>> > > > >> >>> are
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> not
     >>> > > > >> >>>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>> reliable since they can be very small (flink) - 
we
     >>> can
     >>> > > say
     >>> > > > it
     >>> > > > >> is
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> "ok"
     >>> > > > >> >>>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>> even if it has some perf impacts - or too big 
(spark
     >>> > does
     >>> > > > full
     >>> > > > >> >>>
     >>> > > > >> >>> size
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> /
     >>> > > > >> >>>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>> #workers).
     >>> > > > >> >>>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>> The workaround is what we see in the ES I/O: a
     >>> maxSize
     >>> > > > which
     >>> > > > >> >>>
     >>> > > > >> >>> does
     >>> > > > >> >>>>>
     >>> > > > >> >>>>> an
     >>> > > > >> >>>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>> eager flush. The issue is that then the 
checkpoint is
     >>> > not
     >>> > > > >> >>>
     >>> > > > >> >>> respected
     >>> > > > >> >>>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>> and you can process multiple times the same 
records.
     >>> > > > >> >>>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>> Any plan to make this API reliable and 
controllable
     >>> > from
     >>> > > a
     >>> > > > >> beam
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>>>> point
     >>> > > > >> >>>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>> of view (at least in a max manner)?
     >>> > > > >> >>>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>> Thanks,
     >>> > > > >> >>>>>>>>>>>> Romain Manni-Bucau
     >>> > > > >> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | 
LinkedIn
     >>> > > > >> >>>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>>
     >>> > > > >> >>>>>>>>>>
     >>> > > > >> >>>>>>>>
     >>> > > > >> >>>>>
     >>> > > > >> >>>>> --
     >>> > > > >> >>>>> Jean-Baptiste Onofré
     >>> > > > >> >>>>> jbono...@apache.org <mailto:jbono...@apache.org>
     >>> > > > >> >>>>> http://blog.nanthrax.net
     >>> > > > >> >>>>> Talend - http://www.talend.com
     >>> > > > >> >>>>>
     >>> > > > >> >>>
     >>> > > > >> >>
     >>> > > > >> >
     >>> > > > >> > --
     >>> > > > >> > Jean-Baptiste Onofré
     >>> > > > >> > jbono...@apache.org <mailto:jbono...@apache.org>
     >>> > > > >> > http://blog.nanthrax.net
     >>> > > > >> > Talend - http://www.talend.com
     >>> > > > >>
     >>> > > >
     >>> > >
     >>> >
     >>>



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to