Hmm,

ESIO: 
https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L847
JDBCIO: 
https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L592
MongoIO: 
https://github.com/apache/beam/blob/master/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java#L657
etc...

They all use the same pattern.

>From what you wrote - and technically I agree but in current state my
point is valid I think - you should drop bundle from the whole user
API and make it all @Internal, no?


Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-30 18:58 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
> Agree, but maybe we can inform the runner if wanted no ?
>
> Honestly, from my side, I'm fine with the current situation as it's runner
> specific.
>
> Regards
> JB
>
> On 11/30/2017 06:12 PM, Reuven Lax wrote:
>>
>> I don't think it belongs in PIpelineOptions, as bundle size is always a
>> runner thing.
>>
>> We could consider adding a new generic RunnerOptions, however I'm not
>> convinced all runners can actually support this.
>>
>> On Thu, Nov 30, 2017 at 6:09 AM, Romain Manni-Bucau <rmannibu...@gmail.com
>> <mailto:rmannibu...@gmail.com>> wrote:
>>
>>     Guys,
>>
>>     what about moving getMaxBundleSize from flink options to pipeline
>>     options. I think all runners can support it right? Spark code needs
>>     the merge of the v2 before being able to be implemented probably but I
>>     don't see any blocker.
>>
>>     wdyt?
>>
>>     Romain Manni-Bucau
>>     @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>>     2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau <rmannibu...@gmail.com
>>     <mailto:rmannibu...@gmail.com>>:
>>
>>      > @Eugene: "workaround" as specific to the IO each time and therefore
>>      > still highlight a lack in the core.
>>      >
>>      > Other comments inline
>>      >
>>      >
>>      > 2017-11-19 7:40 GMT+01:00 Robert Bradshaw
>> <rober...@google.com.invalid>:
>>      >> There is a possible fourth issue that we don't handle well:
>> efficiency. For
>>      >> very large bundles, it may be advantageous to avoid replaying a
>> bunch of
>>      >> idempotent operations if there were a way to record what ones
>> we're sure
>>      >> went through. Not sure if that's the issue here (though one could
>> possibly
>>      >> do this with SDFs, one can preemptively returning periodically
>> before an
>>      >> element (or portion thereof) is done).
>>      >
>>      > +1, also lead to the IO handling its own chunking/bundles and
>>      > therefore solves all issues at once.
>>      >
>>      >>
>>      >> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov <
>>      >> kirpic...@google.com.invalid> wrote:
>>      >>
>>      >>> I disagree that the usage of document id in ES is a "workaround"
>> - it does
>>      >>> not address any *accidental *complexity
>>      >>> <https://en.wikipedia.org/wiki/No_Silver_Bullet
>>     <https://en.wikipedia.org/wiki/No_Silver_Bullet>> coming from
>> shortcomings
>>      >>> of Beam, it addresses the *essential* complexity that a
>> distributed system
>>      >>> forces one to take it as a fact of nature that the same write
>>      >>> (mutation) will happen multiple times, so if you want a mutation
>> to happen
>>      >>> "as-if" it happened exactly once, the mutation itself must be
>> idempotent
>>      >>> <https://en.wikipedia.org/wiki/Idempotence
>>     <https://en.wikipedia.org/wiki/Idempotence>>. Insert-with-id (upsert
>>      >>> <https://en.wikipedia.org/wiki/Merge_(SQL)
>>     <https://en.wikipedia.org/wiki/Merge_(SQL)>>) is a classic example of
>> an
>>      >>> idempotent mutation, and it's very good that Elasticsearch
>> provides it - if
>>      >>> it didn't, no matter how good of an API Beam had, achieving
>> exactly-once
>>      >>> writes would be theoretically impossible. Are we in agreement on
>> this so
>>      >>> far?
>>      >>>
>>      >>> Next: you seem to be discussing 3 issues together, all of which
>> are valid
>>      >>> issues, but they seem unrelated to me:
>>      >>> 1. Exactly-once mutation
>>      >>> 2. Batching multiple mutations into one RPC.
>>      >>> 3. Backpressure
>>      >>>
>>      >>> #1: was considered above. The system the IO is talking to has to
>> support
>>      >>> idempotent mutations, in an IO-specific way, and the IO has to
>> take
>>      >>> advantage of them, in the IO-specific way - end of story.
>>      >
>>      > Agree but don't forget the original point was about "chunks" and
>> not
>>      > individual records.
>>      >
>>      >>>
>>      >>> #2: a batch of idempotent operations is also idempotent, so this
>> doesn't
>>      >>> add anything new semantically. Syntactically - Beam already
>> allows you to
>>      >>> write your own batching by notifying you of permitted batch
>> boundaries
>>      >>> (Start/FinishBundle). Sure, it could do more, but from my
>> experience the
>>      >>> batching in IOs I've seen is one of the easiest and least
>> error-prone
>>      >>> parts, so I don't see something worth an extended discussion
>> here.
>>      >
>>      > "Beam already allows you to
>>      >  write your own batching by notifying you of permitted batch
>> boundaries
>>      >  (Start/FinishBundle)"
>>      >
>>      > Is wrong since the bundle is potentially the whole PCollection
>> (spark)
>>      > so this is not even an option until you use the SDF (back to the
>> same
>>      > point).
>>      > Once again the API looks fine but no implementation makes it true.
>> It
>>      > would be easy to change it in spark, flink can be ok since it
>> targets
>>      > more the streaming case, not sure of others, any idea?
>>      >
>>      >
>>      >>>
>>      >>> #3: handling backpressure is a complex problem with multiple
>> facets: 1) how
>>      >>> do you know you're being throttled, and by how much are you
>> exceeding the
>>      >>> external system's capacity?
>>      >
>>      > This is the whole point of backpressure, the system sends it back
>> to
>>      > you (header like or status technic in general)
>>      >
>>      >>> 2) how do you communicate this signal to the
>>      >>> runner?
>>      >
>>      > You are a client so you get the meta in the response - whatever
>> techno.
>>      >
>>      >>> 3) what does the runner do in response?
>>      >
>>      > Runner nothing but the IO adapts its handling as mentionned before
>>      > (wait and retry, skip, ... depending the config)
>>      >
>>      >>> 4) how do you wait until
>>      >>> it's ok to try again?
>>      >
>>      > This is one point to probably enhance in beam but waiting in the
>>      > processing is an option if the source has some buffering otherwise
>> it
>>      > requires to have a buffer fallback and max size if the wait mode is
>>      > activated.
>>      >
>>      >>> You seem to be advocating for solving one facet of this problem,
>> which is:
>>      >>> you want it to be possible to signal to the runner "I'm being
>> throttled,
>>      >>> please end the bundle", right? If so - I think this (ending the
>> bundle) is
>>      >>> unnecessary: the DoFn can simply do an exponential back-off sleep
>> loop.
>>      >
>>      > Agree, never said the runner should know but GBK+output doesnt work
>>      > cause you dont own the GBK.
>>      >
>>      >>> This is e.g. what DatastoreIO does
>>      >>> <https://github.com/apache/beam/blob/master/sdks/java/io/
>>     <https://github.com/apache/beam/blob/master/sdks/java/io/>
>>      >>> google-cloud-platform/src/main/java/org/apache/beam/sdk/
>>      >>> io/gcp/datastore/DatastoreV1.java#L1318>
>>      >>> and
>>      >>> this is in general how most systems I've seen handle
>> backpressure. Is there
>>      >>> something I'm missing? In particular, is there any compelling
>> reason why
>>      >>> you think it'd be beneficial e.g. for DatastoreIO to commit the
>> results of
>>      >>> the bundle so far before processing other elements?
>>      >
>>      > It was more about ensuring you validate early a subset of the whole
>>      > bundle and avoid to reprocess it if it fails later.
>>      >
>>      >
>>      > So to summarize I see 2 outcomes:
>>      >
>>      > 1. impl SDF in all runners
>>      > 2. make the bundle size upper bounded - through a pipeline option -
>> in
>>      > all runners, not sure this one is doable everywhere since I mainly
>>      > checked spark case
>>      >
>>      >>>
>>      >>> Again, it might be that I'm still misunderstanding what you're
>> trying to
>>      >>> say. One of the things it would help to clarify would be -
>> exactly what do
>>      >>> you mean by "how batch frameworks solved that for years": can you
>> point at
>>      >>> an existing API in some other framework that achieves what you
>> want?
>>      >>>
>>      >>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau
>>     <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
>>
>>      >>> wrote:
>>      >>>
>>      >>> > Eugene, point - and issue with a single sample - is you can
>> always find
>>      >>> > *workarounds* on a case by case basis as the id one with ES but
>> beam
>>      >>> doesnt
>>      >>> > solve the problem as a framework.
>>      >>> >
>>      >>> > From my past, I clearly dont see how batch frameworks solved
>> that for
>>      >>> years
>>      >>> > and beam is not able to do it - keep in mind it is the same
>> kind of
>>      >>> techno,
>>      >>> > it just uses different sources and bigger clusters so no real
>> reason to
>>      >>> not
>>      >>> > have the same feature quality. The only potential reason i see
>> is there
>>      >>> is
>>      >>> > no tracking of the state into the cluster - e2e. But i dont see
>> why there
>>      >>> > wouldnt be. Do I miss something here?
>>      >>> >
>>      >>> > An example could be: take a github crawler computing stats on
>> the whole
>>      >>> > girhub repos which is based on a rest client as example. You
>> will need to
>>      >>> > handle the rate limit and likely want to "commit" each time you
>> reach a
>>      >>> > rate limit with likely some buffering strategy with a max size
>> before
>>      >>> > really waiting. How do you do it with a GBK independent of your
>> dofn? You
>>      >>> > are not able to compose correctly the fn between them :(.
>>      >>> >
>>      >>> >
>>      >>> > Le 18 nov. 2017 20:48, "Eugene Kirpichov"
>> <kirpic...@google.com.invalid>
>>      >>> a
>>      >>> > écrit :
>>      >>> >
>>      >>> > After giving this thread my best attempt at understanding
>> exactly what is
>>      >>> > the problem and the proposed solution, I'm afraid I still fail
>> to
>>      >>> > understand both. To reiterate, I think the only way to make
>> progress here
>>      >>> > is to be more concrete: (quote) take some IO that you think
>> could be
>>      >>> easier
>>      >>> > to write with your proposed API, give the contents of a
>> hypothetical
>>      >>> > PCollection being written to this IO, give the code of a
>> hypothetical
>>      >>> DoFn
>>      >>> > implementing the write using your API, and explain what you'd
>> expect to
>>      >>> > happen at runtime. I'm going to re-engage in this thread after
>> such an
>>      >>> > example is given.
>>      >>> >
>>      >>> > On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau
>>     <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
>>
>>      >>> > wrote:
>>      >>> >
>>      >>> > > First bundle retry is unusable with dome runners like spark
>> where the
>>      >>> > > bundle size is the collection size / number of work. This
>> means a user
>>      >>> > cant
>>      >>> > > use bundle API or feature reliably and portably - which is
>> beam
>>      >>> promise.
>>      >>> > > Aligning chunking and bundles would guarantee that bit can be
>> not
>>      >>> > desired,
>>      >>> > > that is why i thought it can be another feature.
>>      >>> > >
>>      >>> > > GBK works until the IO knows about that and both concepts are
>> not
>>      >>> always
>>      >>> > > orthogonal - backpressure like systems is a trivial common
>> example.
>>      >>> This
>>      >>> > > means the IO (dofn) must be able to do it itself at some
>> point.
>>      >>> > >
>>      >>> > > Also note the GBK works only if the IO can take a list which
>> is never
>>      >>> the
>>      >>> > > case today.
>>      >>> > >
>>      >>> > > Big questions for me are: is SDF the way to go since it
>> provides the
>>      >>> > needed
>>      >>> > > API bit is not yet supported? What about existing IO? Should
>> beam
>>      >>> provide
>>      >>> > > an auto wrapping of dofn for that pre-aggregated support and
>> simulate
>>      >>> > > bundles to the actual IO impl to keep the existing API?
>>      >>> > >
>>      >>> > >
>>      >>> > > Le 17 nov. 2017 19:20, "Raghu Angadi"
>> <rang...@google.com.invalid> a
>>      >>> > > écrit :
>>      >>> > >
>>      >>> > > On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau <
>>      >>> > rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>
>>
>>      >>> > > >
>>      >>> > > wrote:
>>      >>> > >
>>      >>> > > > Yep, just take ES IO, if a part of a bundle fails you are
>> in an
>>      >>> > > > unmanaged state. This is the case for all O (of IO ;)).
>> Issue is not
>>      >>> > > > much about "1" (the code it takes) but more the fact it
>> doesn't
>>      >>> > > > integrate with runner features and retries potentially:
>> what happens
>>      >>> > > > if a bundle has a failure? => undefined today. 2. I'm fine
>> with it
>>      >>> > > > while we know exactly what happens when we restart after a
>> bundle
>>      >>> > > > failure. With ES the timestamp can be used for instance.
>>      >>> > > >
>>      >>> > >
>>      >>> > > This deterministic batching can be achieved even now with an
>> extra
>>      >>> > > GroupByKey (and if you want ordering on top of that, will
>> need another
>>      >>> > > GBK). Don't know if that is too costly in your case. I would
>> need bit
>>      >>> > more
>>      >>> > > details on handling ES IO write retries to see it could be
>> simplified.
>>      >>> > Note
>>      >>> > > that retries occur with or without any failures in your DoFn.
>>      >>> > >
>>      >>> > > The biggest negative with GBK approach is that it doesn't
>> provide same
>>      >>> > > guarantees on Flink.
>>      >>> > >
>>      >>> > > I don't see how GroubIntoBatches in Beam provides specific
>> guarantees
>>      >>> on
>>      >>> > > deterministic batches.
>>      >>> > >
>>      >>> > > Thinking about it the SDF is really a way to do it since the
>> SDF will
>>      >>> > > > manage the bulking and associated with the runner "retry"
>> it seems it
>>      >>> > > > covers the needs.
>>      >>> > > >
>>      >>> > > > Romain Manni-Bucau
>>      >>> > > > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>      >>> > > >
>>      >>> > > >
>>      >>> > > > 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov
>>      >>> > <kirpic...@google.com.invalid
>>      >>> > > >:
>>      >>> > > > > I must admit I'm still failing to understand the problem,
>> so let's
>>      >>> > step
>>      >>> > > > > back even further.
>>      >>> > > > >
>>      >>> > > > > Could you give an example of an IO that is currently
>> difficult to
>>      >>> > > > implement
>>      >>> > > > > specifically because of lack of the feature you're
>> talking about?
>>      >>> > > > >
>>      >>> > > > > I'm asking because I've reviewed almost all Beam IOs and
>> don't
>>      >>> recall
>>      >>> > > > > seeing a similar problem. Sure, a lot of IOs do batching
>> within a
>>      >>> > > bundle,
>>      >>> > > > > but 1) it doesn't take up much code (granted, it would be
>> even
>>      >>> easier
>>      >>> > > if
>>      >>> > > > > Beam did it for us) and 2) I don't remember any of them
>> requiring
>>      >>> the
>>      >>> > > > > batches to be deterministic, and I'm having a hard time
>> imagining
>>      >>> > what
>>      >>> > > > kind
>>      >>> > > > > of storage system would be able to deduplicate if batches
>> were
>>      >>> > > > > deterministic but wouldn't be able to deduplicate if they
>> weren't.
>>      >>> > > > >
>>      >>> > > > > On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau <
>>      >>> > > > rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
>>
>>      >>> > > > > wrote:
>>      >>> > > > >
>>      >>> > > > >> Ok, let me try to step back and summarize what we have
>> today and
>>      >>> > what
>>      >>> > > I
>>      >>> > > > >> miss:
>>      >>> > > > >>
>>      >>> > > > >> 1. we can handle chunking in beam through group in batch
>> (or
>>      >>> > > equivalent)
>>      >>> > > > >> but:
>>      >>> > > > >>    > it is not built-in into the transforms (IO) and it
>> is
>>      >>> > controlled
>>      >>> > > > >> from outside the transforms so no way for a transform to
>> do it
>>      >>> > > > >> properly without handling itself a composition and links
>> between
>>      >>> > > > >> multiple dofns to have notifications and potentially
>> react
>>      >>> properly
>>      >>> > or
>>      >>> > > > >> handle backpressure from its backend
>>      >>> > > > >> 2. there is no restart feature because there is no real
>> state
>>      >>> > handling
>>      >>> > > > >> at the moment. this sounds fully delegated to the runner
>> but I was
>>      >>> > > > >> hoping to have more guarantees from the used API to be
>> able to
>>      >>> > restart
>>      >>> > > > >> a pipeline (mainly batch since it can be irrelevant or
>> delegates
>>      >>> to
>>      >>> > > > >> the backend for streams) and handle only not commited
>> records so
>>      >>> it
>>      >>> > > > >> requires some persistence outside the main IO storages
>> to do it
>>      >>> > > > >> properly
>>      >>> > > > >>    > note this is somehow similar to the monitoring
>> topic which
>>      >>> miss
>>      >>> > > > >> persistence ATM so it can end up to beam to have a
>> pluggable
>>      >>> storage
>>      >>> > > > >> for a few concerns
>>      >>> > > > >>
>>      >>> > > > >>
>>      >>> > > > >> Short term I would be happy with 1 solved properly, long
>> term I
>>      >>> hope
>>      >>> > 2
>>      >>> > > > >> will be tackled without workarounds requiring custom
>> wrapping of
>>      >>> IO
>>      >>> > to
>>      >>> > > > >> use a custom state persistence.
>>      >>> > > > >>
>>      >>> > > > >>
>>      >>> > > > >>
>>      >>> > > > >> Romain Manni-Bucau
>>      >>> > > > >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>      >>> > > > >>
>>      >>> > > > >>
>>      >>> > > > >> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré
>>     <j...@nanthrax.net <mailto:j...@nanthrax.net>>:
>>
>>      >>> > > > >> > Thanks for the explanation. Agree, we might talk about
>> different
>>      >>> > > > things
>>      >>> > > > >> > using the same wording.
>>      >>> > > > >> >
>>      >>> > > > >> > I'm also struggling to understand the use case (for a
>> generic
>>      >>> > DoFn).
>>      >>> > > > >> >
>>      >>> > > > >> > Regards
>>      >>> > > > >> > JB
>>      >>> > > > >> >
>>      >>> > > > >> >
>>      >>> > > > >> > On 11/17/2017 07:40 AM, Eugene Kirpichov wrote:
>>      >>> > > > >> >>
>>      >>> > > > >> >> To avoid spending a lot of time pursuing a false
>> path, I'd like
>>      >>> > to
>>      >>> > > > say
>>      >>> > > > >> >> straight up that SDF is definitely not going to help
>> here,
>>      >>> > despite
>>      >>> > > > the
>>      >>> > > > >> >> fact
>>      >>> > > > >> >> that its API includes the term "checkpoint". In SDF,
>> the
>>      >>> > > "checkpoint"
>>      >>> > > > >> >> captures the state of processing within a single
>> element. If
>>      >>> > you're
>>      >>> > > > >> >> applying an SDF to 1000 elements, it will, like any
>> other DoFn,
>>      >>> > be
>>      >>> > > > >> applied
>>      >>> > > > >> >> to each of them independently and in parallel, and
>> you'll have
>>      >>> > 1000
>>      >>> > > > >> >> checkpoints capturing the state of processing each of
>> these
>>      >>> > > elements,
>>      >>> > > > >> >> which
>>      >>> > > > >> >> is probably not what you want.
>>      >>> > > > >> >>
>>      >>> > > > >> >> I'm afraid I still don't understand what kind of
>> checkpoint you
>>      >>> > > > need, if
>>      >>> > > > >> >> it
>>      >>> > > > >> >> is not just deterministic grouping into batches.
>> "Checkpoint"
>>      >>> is
>>      >>> > a
>>      >>> > > > very
>>      >>> > > > >> >> broad term and it's very possible that everybody in
>> this thread
>>      >>> > is
>>      >>> > > > >> talking
>>      >>> > > > >> >> about different things when saying it. So it would
>> help if you
>>      >>> > > could
>>      >>> > > > >> give
>>      >>> > > > >> >> a
>>      >>> > > > >> >> more concrete example: for example, take some IO that
>> you think
>>      >>> > > > could be
>>      >>> > > > >> >> easier to write with your proposed API, give the
>> contents of a
>>      >>> > > > >> >> hypothetical
>>      >>> > > > >> >> PCollection being written to this IO, give the code
>> of a
>>      >>> > > hypothetical
>>      >>> > > > >> DoFn
>>      >>> > > > >> >> implementing the write using your API, and explain
>> what you'd
>>      >>> > > expect
>>      >>> > > > to
>>      >>> > > > >> >> happen at runtime.
>>      >>> > > > >> >>
>>      >>> > > > >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau
>>      >>> > > > >> >> <rmannibu...@gmail.com
>> <mailto:rmannibu...@gmail.com>>
>>
>>      >>> > > > >> >> wrote:
>>      >>> > > > >> >>
>>      >>> > > > >> >>> @Eugene: yes and the other alternative of Reuven too
>> but it is
>>      >>> > > still
>>      >>> > > > >> >>> 1. relying on timers, 2. not really checkpointed
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> In other words it seems all solutions are to create
>> a chunk of
>>      >>> > > size
>>      >>> > > > 1
>>      >>> > > > >> >>> and replayable to fake the lack of chunking in the
>> framework.
>>      >>> > This
>>      >>> > > > >> >>> always implies a chunk handling outside the
>> component
>>      >>> (typically
>>      >>> > > > >> >>> before for an output). My point is I think IO need
>> it in their
>>      >>> > own
>>      >>> > > > >> >>> "internal" or at least control it themselves since
>> the chunk
>>      >>> > size
>>      >>> > > is
>>      >>> > > > >> >>> part of the IO handling most of the time.
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> I think JB spoke of the same "group before" trick
>> using
>>      >>> > > restrictions
>>      >>> > > > >> >>> which can work I have to admit if SDF are
>> implemented by
>>      >>> > runners.
>>      >>> > > Is
>>      >>> > > > >> >>> there a roadmap/status on that? Last time I checked
>> SDF was a
>>      >>> > > great
>>      >>> > > > >> >>> API without support :(.
>>      >>> > > > >> >>>
>>      >>> > > > >> >>>
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> Romain Manni-Bucau
>>      >>> > > > >> >>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>      >>> > > > >> >>>
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
>>      >>> > > > >> >>> <kirpic...@google.com.invalid>:
>>      >>> > > > >> >>>>
>>      >>> > > > >> >>>> JB, not sure what you mean? SDFs and triggers are
>> unrelated,
>>      >>> > and
>>      >>> > > > the
>>      >>> > > > >> >>>> post
>>      >>> > > > >> >>>> doesn't mention the word. Did you mean something
>> else, e.g.
>>      >>> > > > >> restriction
>>      >>> > > > >> >>>> perhaps? Either way I don't think SDFs are the
>> solution here;
>>      >>> > > SDFs
>>      >>> > > > >> have
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> to
>>      >>> > > > >> >>>>
>>      >>> > > > >> >>>> do with the ability to split the processing of *a
>> single
>>      >>> > element*
>>      >>> > > > over
>>      >>> > > > >> >>>> multiple calls, whereas Romain I think is asking
>> for
>>      >>> repeatable
>>      >>> > > > >> grouping
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> of
>>      >>> > > > >> >>>>
>>      >>> > > > >> >>>> *multiple* elements.
>>      >>> > > > >> >>>>
>>      >>> > > > >> >>>> Romain - does
>>      >>> > > > >> >>>>
>>      >>> > > > >> >>>
>>      >>> > > > >> >>>
>>      >>> > > > >> https://github.com/apache/beam/blob/master/sdks/java/
>>     <https://github.com/apache/beam/blob/master/sdks/java/>
>>      >>> > > > core/src/main/java/org/apache/beam/sdk/transforms/
>>      >>> GroupIntoBatches.java
>>      >>> > > > >> >>>>
>>      >>> > > > >> >>>> do what
>>      >>> > > > >> >>>> you want?
>>      >>> > > > >> >>>>
>>      >>> > > > >> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste
>> Onofré <
>>      >>> > > > >> j...@nanthrax.net <mailto:j...@nanthrax.net>>
>>      >>> > > > >> >>>> wrote:
>>      >>> > > > >> >>>>
>>      >>> > > > >> >>>>> It sounds like the "Trigger" in the Splittable
>> DoFn, no ?
>>      >>> > > > >> >>>>>
>>      >>> > > > >> >>>>>
>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn
>>     <https://beam.apache.org/blog/2017/08/16/splittable-do-fn>.
>>
>>      >>> html
>>      >>> > > > >> >>>>>
>>      >>> > > > >> >>>>> Regards
>>      >>> > > > >> >>>>> JB
>>      >>> > > > >> >>>>>
>>      >>> > > > >> >>>>>
>>      >>> > > > >> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
>>      >>> > > > >> >>>>>>
>>      >>> > > > >> >>>>>> it gives the fn/transform the ability to save a
>> state - it
>>      >>> > can
>>      >>> > > > get
>>      >>> > > > >> >>>>>> back on "restart" / whatever unit we can use,
>> probably
>>      >>> runner
>>      >>> > > > >> >>>>>> dependent? Without that you need to rewrite all
>> IO usage
>>      >>> with
>>      >>> > > > >> >>>>>> something like the previous pattern which makes
>> the IO not
>>      >>> > self
>>      >>> > > > >> >>>>>> sufficient and kind of makes the entry cost and
>> usage of
>>      >>> beam
>>      >>> > > way
>>      >>> > > > >> >>>>>> further.
>>      >>> > > > >> >>>>>>
>>      >>> > > > >> >>>>>> In my mind it is exactly what jbatch/spring-batch
>> uses but
>>      >>> > > > adapted
>>      >>> > > > >> to
>>      >>> > > > >> >>>>>> beam (stream in particular) case.
>>      >>> > > > >> >>>>>>
>>      >>> > > > >> >>>>>> Romain Manni-Bucau
>>      >>> > > > >> >>>>>> @rmannibucau |  Blog | Old Blog | Github |
>> LinkedIn
>>      >>> > > > >> >>>>>>
>>      >>> > > > >> >>>>>>
>>      >>> > > > >> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax
>>      >>> > <re...@google.com.invalid
>>      >>> > > >:
>>      >>> > > > >> >>>>>>>
>>      >>> > > > >> >>>>>>> Romain,
>>      >>> > > > >> >>>>>>>
>>      >>> > > > >> >>>>>>> Can you define what you mean by checkpoint? What
>> are the
>>      >>> > > > semantics,
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> what
>>      >>> > > > >> >>>>>>>
>>      >>> > > > >> >>>>>>> does it accomplish?
>>      >>> > > > >> >>>>>>>
>>      >>> > > > >> >>>>>>> Reuven
>>      >>> > > > >> >>>>>>>
>>      >>> > > > >> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain
>> Manni-Bucau <
>>      >>> > > > >> >>>>>
>>      >>> > > > >> >>>>> rmannibu...@gmail.com
>> <mailto:rmannibu...@gmail.com>>
>>
>>      >>> > > > >> >>>>>>>
>>      >>> > > > >> >>>>>>> wrote:
>>      >>> > > > >> >>>>>>>
>>      >>> > > > >> >>>>>>>> Yes, what I propose earlier was:
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> I. checkpoint marker:
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> @AnyBeamAnnotation
>>      >>> > > > >> >>>>>>>> @CheckpointAfter
>>      >>> > > > >> >>>>>>>> public void someHook(SomeContext ctx);
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> II. pipeline.apply(ParDo.of(new
>>      >>> > > > >> MyFn()).withCheckpointAlgorithm(new
>>      >>> > > > >> >>>>>>>> CountingAlgo()))
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> III. (I like this one less)
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> // in the dofn
>>      >>> > > > >> >>>>>>>> @CheckpointTester
>>      >>> > > > >> >>>>>>>> public boolean shouldCheckpoint();
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> IV. @Checkpointer Serializable getCheckpoint();
>> in the
>>      >>> dofn
>>      >>> > > per
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> element
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> Romain Manni-Bucau
>>      >>> > > > >> >>>>>>>> @rmannibucau |  Blog | Old Blog | Github |
>> LinkedIn
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi
>>      >>> > > > <rang...@google.com.invalid
>>      >>> > > > >> >>>>
>>      >>> > > > >> >>>> :
>>      >>> > > > >> >>>>>>>>>
>>      >>> > > > >> >>>>>>>>> How would you define it (rough API is fine)?.
>> Without
>>      >>> more
>>      >>> > > > >> details,
>>      >>> > > > >> >>>>>
>>      >>> > > > >> >>>>> it is
>>      >>> > > > >> >>>>>>>>>
>>      >>> > > > >> >>>>>>>>> not easy to see wider applicability and
>> feasibility in
>>      >>> > > > runners.
>>      >>> > > > >> >>>>>>>>>
>>      >>> > > > >> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain
>> Manni-Bucau <
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> rmannibu...@gmail.com
>> <mailto:rmannibu...@gmail.com>>
>>
>>      >>> > > > >> >>>>>>>>>
>>      >>> > > > >> >>>>>>>>> wrote:
>>      >>> > > > >> >>>>>>>>>
>>      >>> > > > >> >>>>>>>>>> This is a fair summary of the current state
>> but also
>>      >>> > where
>>      >>> > > > beam
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> can
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> have a
>>      >>> > > > >> >>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>> very strong added value and make big data
>> great and
>>      >>> > smooth.
>>      >>> > > > >> >>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>> Instead of this replay feature isnt
>> checkpointing
>>      >>> > willable?
>>      >>> > > > In
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> particular
>>      >>> > > > >> >>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>> with SDF no?
>>      >>> > > > >> >>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi"
>>      >>> > > > >> <rang...@google.com.invalid>
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> a
>>      >>> > > > >> >>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>> écrit :
>>      >>> > > > >> >>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> Core issue here is that there is no explicit
>> concept
>>      >>> of
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> 'checkpoint'
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> in
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> Beam (UnboundedSource has a method
>> 'getCheckpointMark'
>>      >>> > but
>>      >>> > > > that
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> refers to
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> the checkoint on external source). Runners
>> do
>>      >>> checkpoint
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> internally
>>      >>> > > > >> >>>>>
>>      >>> > > > >> >>>>> as
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> implementation detail. Flink's checkpoint
>> model is
>>      >>> > > entirely
>>      >>> > > > >> >>>>>
>>      >>> > > > >> >>>>> different
>>      >>> > > > >> >>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>> from
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> Dataflow's and Spark's.
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> @StableReplay helps, but it does not
>> explicitly talk
>>      >>> > about
>>      >>> > > a
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> checkpoint
>>      >>> > > > >> >>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>> by
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> design.
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> If you are looking to achieve some
>> guarantees with a
>>      >>> > > > >> sink/DoFn, I
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> think
>>      >>> > > > >> >>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>> it
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> is better to start with the requirements. I
>> worked on
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> exactly-once
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> sink
>>      >>> > > > >> >>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>> for
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where
>> we
>>      >>> > > essentially
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> reshard
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> the
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> elements and assign sequence numbers to
>> elements with
>>      >>> in
>>      >>> > > > each
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> shard.
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> Duplicates in replays are avoided based on
>> these
>>      >>> > sequence
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> numbers.
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> DoFn
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> state API is used to buffer out-of order
>> replays. The
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> implementation
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> strategy works in Dataflow but not in Flink
>> which has
>>      >>> a
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> horizontal
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> checkpoint. KafkaIO checks for
>> compatibility.
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain
>> Manni-Bucau <
>>      >>> > > > >> >>>>>>>>>>> rmannibu...@gmail.com
>> <mailto:rmannibu...@gmail.com>>
>>
>>      >>> > > > >> >>>>>>>>>>> wrote:
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>> Hi guys,
>>      >>> > > > >> >>>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>> The subject is a bit provocative but the
>> topic is
>>      >>> real
>>      >>> > > and
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> coming
>>      >>> > > > >> >>>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>> again and again with the beam usage: how a
>> dofn can
>>      >>> > > handle
>>      >>> > > > >> some
>>      >>> > > > >> >>>>>>>>>>>> "chunking".
>>      >>> > > > >> >>>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>> The need is to be able to commit each N
>> records but
>>      >>> > with
>>      >>> > > N
>>      >>> > > > not
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> too
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> big.
>>      >>> > > > >> >>>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>> The natural API for that in beam is the
>> bundle one
>>      >>> but
>>      >>> > > > bundles
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> are
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> not
>>      >>> > > > >> >>>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>> reliable since they can be very small
>> (flink) - we
>>      >>> can
>>      >>> > > say
>>      >>> > > > it
>>      >>> > > > >> is
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> "ok"
>>      >>> > > > >> >>>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>> even if it has some perf impacts - or too
>> big (spark
>>      >>> > does
>>      >>> > > > full
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> size
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> /
>>      >>> > > > >> >>>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>> #workers).
>>      >>> > > > >> >>>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>> The workaround is what we see in the ES
>> I/O: a
>>      >>> maxSize
>>      >>> > > > which
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> does
>>      >>> > > > >> >>>>>
>>      >>> > > > >> >>>>> an
>>      >>> > > > >> >>>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>> eager flush. The issue is that then the
>> checkpoint is
>>      >>> > not
>>      >>> > > > >> >>>
>>      >>> > > > >> >>> respected
>>      >>> > > > >> >>>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>> and you can process multiple times the same
>> records.
>>      >>> > > > >> >>>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>> Any plan to make this API reliable and
>> controllable
>>      >>> > from
>>      >>> > > a
>>      >>> > > > >> beam
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>>>> point
>>      >>> > > > >> >>>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>> of view (at least in a max manner)?
>>      >>> > > > >> >>>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>> Thanks,
>>      >>> > > > >> >>>>>>>>>>>> Romain Manni-Bucau
>>      >>> > > > >> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github |
>> LinkedIn
>>      >>> > > > >> >>>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>>
>>      >>> > > > >> >>>>>>>>>>
>>      >>> > > > >> >>>>>>>>
>>      >>> > > > >> >>>>>
>>      >>> > > > >> >>>>> --
>>      >>> > > > >> >>>>> Jean-Baptiste Onofré
>>      >>> > > > >> >>>>> jbono...@apache.org <mailto:jbono...@apache.org>
>>      >>> > > > >> >>>>> http://blog.nanthrax.net
>>      >>> > > > >> >>>>> Talend - http://www.talend.com
>>      >>> > > > >> >>>>>
>>      >>> > > > >> >>>
>>      >>> > > > >> >>
>>      >>> > > > >> >
>>      >>> > > > >> > --
>>      >>> > > > >> > Jean-Baptiste Onofré
>>      >>> > > > >> > jbono...@apache.org <mailto:jbono...@apache.org>
>>      >>> > > > >> > http://blog.nanthrax.net
>>      >>> > > > >> > Talend - http://www.talend.com
>>      >>> > > > >>
>>      >>> > > >
>>      >>> > >
>>      >>> >
>>      >>>
>>
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

Reply via email to