So is your main concern potential poor performance on runners that choose
to use a very small bundle size? (Currently an IO can trivially handle too
large bundles, simply by flushing when enough data accumulates, which is
what all IOs do - but indeed working around having unreasonably small
bundles is much harder)

If so, I think, rather than making a model change, we should understand why
those runners are choosing such a small bundle size, and potentially fix
them.

On Thu, Nov 30, 2017, 11:01 AM Romain Manni-Bucau <[email protected]>
wrote:

>
>
> Le 30 nov. 2017 19:23, "Kenneth Knowles" <[email protected]> a écrit :
>
> On Thu, Nov 30, 2017 at 10:03 AM, Romain Manni-Bucau <
> [email protected]> wrote:
>
>> Hmm,
>>
>> ESIO:
>> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L847
>> JDBCIO:
>> https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L592
>> MongoIO:
>> https://github.com/apache/beam/blob/master/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java#L657
>> etc...
>>
>> They all use the same pattern.
>>
>
> This is actually correct - if you have some triggers set up to yield some
> low latency, then things need to actually be flushed here. If the runner
> provides a one-element bundle it could be because data volume has dipped.
> In this case, you pay per element instead of getting good amortization, but
> since data volume is low this is not so bad and anyhow the only way to
> yield the desired latency.
>
> Romain - just to echo some others, did you have a particular combination
> of runner + IO that you wanted to target for improvement? That would focus
> the discussion and we could think about what to change in the runner or IO
> or discover an issue that they cannot solve.
>
>
> I want to ensure EsIO will never do a flush of 1 element on any runner
> without a timertrigger - assuming data volume is continuous and with a size
> > 1.
>
> Really rephrased, my concern is that bundle which is a great
> infra/environment feedback is today owned by beam code which defeats that
> great purpose since beam doesnt use the impl for that. This notion should
> be unified (size + timeout are often the defaults to trigger an "end" and
> would work) accross runners or it should be hidden from the transform
> developers IMHO.
>
> Note the low latency point is not linked to bundle size but, as you
> mentionned, triggers (timeout or event based) which means both worlds can
> work together in harmony and outcome to a valid bundle api (without any
> change, yeah) and exposure to the user.
>
>
>
> Kenn
>
>
>
>>
>> From what you wrote - and technically I agree but in current state my
>> point is valid I think - you should drop bundle from the whole user
>> API and make it all @Internal, no?
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-30 18:58 GMT+01:00 Jean-Baptiste Onofré <[email protected]>:
>> > Agree, but maybe we can inform the runner if wanted no ?
>> >
>> > Honestly, from my side, I'm fine with the current situation as it's
>> runner
>> > specific.
>> >
>> > Regards
>> > JB
>> >
>> > On 11/30/2017 06:12 PM, Reuven Lax wrote:
>> >>
>> >> I don't think it belongs in PIpelineOptions, as bundle size is always a
>> >> runner thing.
>> >>
>> >> We could consider adding a new generic RunnerOptions, however I'm not
>> >> convinced all runners can actually support this.
>> >>
>> >> On Thu, Nov 30, 2017 at 6:09 AM, Romain Manni-Bucau <
>> [email protected]
>> >> <mailto:[email protected]>> wrote:
>> >>
>> >>     Guys,
>> >>
>> >>     what about moving getMaxBundleSize from flink options to pipeline
>> >>     options. I think all runners can support it right? Spark code needs
>> >>     the merge of the v2 before being able to be implemented probably
>> but I
>> >>     don't see any blocker.
>> >>
>> >>     wdyt?
>> >>
>> >>     Romain Manni-Bucau
>> >>     @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>
>> >>
>> >>     2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau <
>> [email protected]
>> >>     <mailto:[email protected]>>:
>> >>
>> >>      > @Eugene: "workaround" as specific to the IO each time and
>> therefore
>> >>      > still highlight a lack in the core.
>> >>      >
>> >>      > Other comments inline
>> >>      >
>> >>      >
>> >>      > 2017-11-19 7:40 GMT+01:00 Robert Bradshaw
>> >> <[email protected]>:
>> >>      >> There is a possible fourth issue that we don't handle well:
>> >> efficiency. For
>> >>      >> very large bundles, it may be advantageous to avoid replaying a
>> >> bunch of
>> >>      >> idempotent operations if there were a way to record what ones
>> >> we're sure
>> >>      >> went through. Not sure if that's the issue here (though one
>> could
>> >> possibly
>> >>      >> do this with SDFs, one can preemptively returning periodically
>> >> before an
>> >>      >> element (or portion thereof) is done).
>> >>      >
>> >>      > +1, also lead to the IO handling its own chunking/bundles and
>> >>      > therefore solves all issues at once.
>> >>      >
>> >>      >>
>> >>      >> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov <
>> >>      >> [email protected]> wrote:
>> >>      >>
>> >>      >>> I disagree that the usage of document id in ES is a
>> "workaround"
>> >> - it does
>> >>      >>> not address any *accidental *complexity
>> >>      >>> <https://en.wikipedia.org/wiki/No_Silver_Bullet
>> >>     <https://en.wikipedia.org/wiki/No_Silver_Bullet>> coming from
>> >> shortcomings
>> >>      >>> of Beam, it addresses the *essential* complexity that a
>> >> distributed system
>> >>      >>> forces one to take it as a fact of nature that the same write
>> >>      >>> (mutation) will happen multiple times, so if you want a
>> mutation
>> >> to happen
>> >>      >>> "as-if" it happened exactly once, the mutation itself must be
>> >> idempotent
>> >>      >>> <https://en.wikipedia.org/wiki/Idempotence
>> >>     <https://en.wikipedia.org/wiki/Idempotence>>. Insert-with-id
>> (upsert
>> >>      >>> <https://en.wikipedia.org/wiki/Merge_(SQL)
>> >>     <https://en.wikipedia.org/wiki/Merge_(SQL)>>) is a classic
>> example of
>> >> an
>> >>      >>> idempotent mutation, and it's very good that Elasticsearch
>> >> provides it - if
>> >>      >>> it didn't, no matter how good of an API Beam had, achieving
>> >> exactly-once
>> >>      >>> writes would be theoretically impossible. Are we in agreement
>> on
>> >> this so
>> >>      >>> far?
>> >>      >>>
>> >>      >>> Next: you seem to be discussing 3 issues together, all of
>> which
>> >> are valid
>> >>      >>> issues, but they seem unrelated to me:
>> >>      >>> 1. Exactly-once mutation
>> >>      >>> 2. Batching multiple mutations into one RPC.
>> >>      >>> 3. Backpressure
>> >>      >>>
>> >>      >>> #1: was considered above. The system the IO is talking to has
>> to
>> >> support
>> >>      >>> idempotent mutations, in an IO-specific way, and the IO has to
>> >> take
>> >>      >>> advantage of them, in the IO-specific way - end of story.
>> >>      >
>> >>      > Agree but don't forget the original point was about "chunks" and
>> >> not
>> >>      > individual records.
>> >>      >
>> >>      >>>
>> >>      >>> #2: a batch of idempotent operations is also idempotent, so
>> this
>> >> doesn't
>> >>      >>> add anything new semantically. Syntactically - Beam already
>> >> allows you to
>> >>      >>> write your own batching by notifying you of permitted batch
>> >> boundaries
>> >>      >>> (Start/FinishBundle). Sure, it could do more, but from my
>> >> experience the
>> >>      >>> batching in IOs I've seen is one of the easiest and least
>> >> error-prone
>> >>      >>> parts, so I don't see something worth an extended discussion
>> >> here.
>> >>      >
>> >>      > "Beam already allows you to
>> >>      >  write your own batching by notifying you of permitted batch
>> >> boundaries
>> >>      >  (Start/FinishBundle)"
>> >>      >
>> >>      > Is wrong since the bundle is potentially the whole PCollection
>> >> (spark)
>> >>      > so this is not even an option until you use the SDF (back to the
>> >> same
>> >>      > point).
>> >>      > Once again the API looks fine but no implementation makes it
>> true.
>> >> It
>> >>      > would be easy to change it in spark, flink can be ok since it
>> >> targets
>> >>      > more the streaming case, not sure of others, any idea?
>> >>      >
>> >>      >
>> >>      >>>
>> >>      >>> #3: handling backpressure is a complex problem with multiple
>> >> facets: 1) how
>> >>      >>> do you know you're being throttled, and by how much are you
>> >> exceeding the
>> >>      >>> external system's capacity?
>> >>      >
>> >>      > This is the whole point of backpressure, the system sends it
>> back
>> >> to
>> >>      > you (header like or status technic in general)
>> >>      >
>> >>      >>> 2) how do you communicate this signal to the
>> >>      >>> runner?
>> >>      >
>> >>      > You are a client so you get the meta in the response - whatever
>> >> techno.
>> >>      >
>> >>      >>> 3) what does the runner do in response?
>> >>      >
>> >>      > Runner nothing but the IO adapts its handling as mentionned
>> before
>> >>      > (wait and retry, skip, ... depending the config)
>> >>      >
>> >>      >>> 4) how do you wait until
>> >>      >>> it's ok to try again?
>> >>      >
>> >>      > This is one point to probably enhance in beam but waiting in the
>> >>      > processing is an option if the source has some buffering
>> otherwise
>> >> it
>> >>      > requires to have a buffer fallback and max size if the wait
>> mode is
>> >>      > activated.
>> >>      >
>> >>      >>> You seem to be advocating for solving one facet of this
>> problem,
>> >> which is:
>> >>      >>> you want it to be possible to signal to the runner "I'm being
>> >> throttled,
>> >>      >>> please end the bundle", right? If so - I think this (ending
>> the
>> >> bundle) is
>> >>      >>> unnecessary: the DoFn can simply do an exponential back-off
>> sleep
>> >> loop.
>> >>      >
>> >>      > Agree, never said the runner should know but GBK+output doesnt
>> work
>> >>      > cause you dont own the GBK.
>> >>      >
>> >>      >>> This is e.g. what DatastoreIO does
>> >>      >>> <https://github.com/apache/beam/blob/master/sdks/java/io/
>> >>     <https://github.com/apache/beam/blob/master/sdks/java/io/>
>> >>      >>> google-cloud-platform/src/main/java/org/apache/beam/sdk/
>> >>      >>> io/gcp/datastore/DatastoreV1.java#L1318>
>> >>      >>> and
>> >>      >>> this is in general how most systems I've seen handle
>> >> backpressure. Is there
>> >>      >>> something I'm missing? In particular, is there any compelling
>> >> reason why
>> >>      >>> you think it'd be beneficial e.g. for DatastoreIO to commit
>> the
>> >> results of
>> >>      >>> the bundle so far before processing other elements?
>> >>      >
>> >>      > It was more about ensuring you validate early a subset of the
>> whole
>> >>      > bundle and avoid to reprocess it if it fails later.
>> >>      >
>> >>      >
>> >>      > So to summarize I see 2 outcomes:
>> >>      >
>> >>      > 1. impl SDF in all runners
>> >>      > 2. make the bundle size upper bounded - through a pipeline
>> option -
>> >> in
>> >>      > all runners, not sure this one is doable everywhere since I
>> mainly
>> >>      > checked spark case
>> >>      >
>> >>      >>>
>> >>      >>> Again, it might be that I'm still misunderstanding what you're
>> >> trying to
>> >>      >>> say. One of the things it would help to clarify would be -
>> >> exactly what do
>> >>      >>> you mean by "how batch frameworks solved that for years": can
>> you
>> >> point at
>> >>      >>> an existing API in some other framework that achieves what you
>> >> want?
>> >>      >>>
>> >>      >>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau
>> >>     <[email protected] <mailto:[email protected]>>
>> >>
>> >>      >>> wrote:
>> >>      >>>
>> >>      >>> > Eugene, point - and issue with a single sample - is you can
>> >> always find
>> >>      >>> > *workarounds* on a case by case basis as the id one with ES
>> but
>> >> beam
>> >>      >>> doesnt
>> >>      >>> > solve the problem as a framework.
>> >>      >>> >
>> >>      >>> > From my past, I clearly dont see how batch frameworks solved
>> >> that for
>> >>      >>> years
>> >>      >>> > and beam is not able to do it - keep in mind it is the same
>> >> kind of
>> >>      >>> techno,
>> >>      >>> > it just uses different sources and bigger clusters so no
>> real
>> >> reason to
>> >>      >>> not
>> >>      >>> > have the same feature quality. The only potential reason i
>> see
>> >> is there
>> >>      >>> is
>> >>      >>> > no tracking of the state into the cluster - e2e. But i dont
>> see
>> >> why there
>> >>      >>> > wouldnt be. Do I miss something here?
>> >>      >>> >
>> >>      >>> > An example could be: take a github crawler computing stats
>> on
>> >> the whole
>> >>      >>> > girhub repos which is based on a rest client as example. You
>> >> will need to
>> >>      >>> > handle the rate limit and likely want to "commit" each time
>> you
>> >> reach a
>> >>      >>> > rate limit with likely some buffering strategy with a max
>> size
>> >> before
>> >>      >>> > really waiting. How do you do it with a GBK independent of
>> your
>> >> dofn? You
>> >>      >>> > are not able to compose correctly the fn between them :(.
>> >>      >>> >
>> >>      >>> >
>> >>      >>> > Le 18 nov. 2017 20:48, "Eugene Kirpichov"
>> >> <[email protected]>
>> >>      >>> a
>> >>      >>> > écrit :
>> >>      >>> >
>> >>      >>> > After giving this thread my best attempt at understanding
>> >> exactly what is
>> >>      >>> > the problem and the proposed solution, I'm afraid I still
>> fail
>> >> to
>> >>      >>> > understand both. To reiterate, I think the only way to make
>> >> progress here
>> >>      >>> > is to be more concrete: (quote) take some IO that you think
>> >> could be
>> >>      >>> easier
>> >>      >>> > to write with your proposed API, give the contents of a
>> >> hypothetical
>> >>      >>> > PCollection being written to this IO, give the code of a
>> >> hypothetical
>> >>      >>> DoFn
>> >>      >>> > implementing the write using your API, and explain what
>> you'd
>> >> expect to
>> >>      >>> > happen at runtime. I'm going to re-engage in this thread
>> after
>> >> such an
>> >>      >>> > example is given.
>> >>      >>> >
>> >>      >>> > On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau
>> >>     <[email protected] <mailto:[email protected]>>
>> >>
>> >>      >>> > wrote:
>> >>      >>> >
>> >>      >>> > > First bundle retry is unusable with dome runners like
>> spark
>> >> where the
>> >>      >>> > > bundle size is the collection size / number of work. This
>> >> means a user
>> >>      >>> > cant
>> >>      >>> > > use bundle API or feature reliably and portably - which is
>> >> beam
>> >>      >>> promise.
>> >>      >>> > > Aligning chunking and bundles would guarantee that bit
>> can be
>> >> not
>> >>      >>> > desired,
>> >>      >>> > > that is why i thought it can be another feature.
>> >>      >>> > >
>> >>      >>> > > GBK works until the IO knows about that and both concepts
>> are
>> >> not
>> >>      >>> always
>> >>      >>> > > orthogonal - backpressure like systems is a trivial common
>> >> example.
>> >>      >>> This
>> >>      >>> > > means the IO (dofn) must be able to do it itself at some
>> >> point.
>> >>      >>> > >
>> >>      >>> > > Also note the GBK works only if the IO can take a list
>> which
>> >> is never
>> >>      >>> the
>> >>      >>> > > case today.
>> >>      >>> > >
>> >>      >>> > > Big questions for me are: is SDF the way to go since it
>> >> provides the
>> >>      >>> > needed
>> >>      >>> > > API bit is not yet supported? What about existing IO?
>> Should
>> >> beam
>> >>      >>> provide
>> >>      >>> > > an auto wrapping of dofn for that pre-aggregated support
>> and
>> >> simulate
>> >>      >>> > > bundles to the actual IO impl to keep the existing API?
>> >>      >>> > >
>> >>      >>> > >
>> >>      >>> > > Le 17 nov. 2017 19:20, "Raghu Angadi"
>> >> <[email protected]> a
>> >>      >>> > > écrit :
>> >>      >>> > >
>> >>      >>> > > On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau <
>> >>      >>> > [email protected] <mailto:[email protected]>
>> >>
>> >>      >>> > > >
>> >>      >>> > > wrote:
>> >>      >>> > >
>> >>      >>> > > > Yep, just take ES IO, if a part of a bundle fails you
>> are
>> >> in an
>> >>      >>> > > > unmanaged state. This is the case for all O (of IO ;)).
>> >> Issue is not
>> >>      >>> > > > much about "1" (the code it takes) but more the fact it
>> >> doesn't
>> >>      >>> > > > integrate with runner features and retries potentially:
>> >> what happens
>> >>      >>> > > > if a bundle has a failure? => undefined today. 2. I'm
>> fine
>> >> with it
>> >>      >>> > > > while we know exactly what happens when we restart
>> after a
>> >> bundle
>> >>      >>> > > > failure. With ES the timestamp can be used for instance.
>> >>      >>> > > >
>> >>      >>> > >
>> >>      >>> > > This deterministic batching can be achieved even now with
>> an
>> >> extra
>> >>      >>> > > GroupByKey (and if you want ordering on top of that, will
>> >> need another
>> >>      >>> > > GBK). Don't know if that is too costly in your case. I
>> would
>> >> need bit
>> >>      >>> > more
>> >>      >>> > > details on handling ES IO write retries to see it could be
>> >> simplified.
>> >>      >>> > Note
>> >>      >>> > > that retries occur with or without any failures in your
>> DoFn.
>> >>      >>> > >
>> >>      >>> > > The biggest negative with GBK approach is that it doesn't
>> >> provide same
>> >>      >>> > > guarantees on Flink.
>> >>      >>> > >
>> >>      >>> > > I don't see how GroubIntoBatches in Beam provides specific
>> >> guarantees
>> >>      >>> on
>> >>      >>> > > deterministic batches.
>> >>      >>> > >
>> >>      >>> > > Thinking about it the SDF is really a way to do it since
>> the
>> >> SDF will
>> >>      >>> > > > manage the bulking and associated with the runner
>> "retry"
>> >> it seems it
>> >>      >>> > > > covers the needs.
>> >>      >>> > > >
>> >>      >>> > > > Romain Manni-Bucau
>> >>      >>> > > > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>      >>> > > >
>> >>      >>> > > >
>> >>      >>> > > > 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov
>> >>      >>> > <[email protected]
>> >>      >>> > > >:
>> >>      >>> > > > > I must admit I'm still failing to understand the
>> problem,
>> >> so let's
>> >>      >>> > step
>> >>      >>> > > > > back even further.
>> >>      >>> > > > >
>> >>      >>> > > > > Could you give an example of an IO that is currently
>> >> difficult to
>> >>      >>> > > > implement
>> >>      >>> > > > > specifically because of lack of the feature you're
>> >> talking about?
>> >>      >>> > > > >
>> >>      >>> > > > > I'm asking because I've reviewed almost all Beam IOs
>> and
>> >> don't
>> >>      >>> recall
>> >>      >>> > > > > seeing a similar problem. Sure, a lot of IOs do
>> batching
>> >> within a
>> >>      >>> > > bundle,
>> >>      >>> > > > > but 1) it doesn't take up much code (granted, it
>> would be
>> >> even
>> >>      >>> easier
>> >>      >>> > > if
>> >>      >>> > > > > Beam did it for us) and 2) I don't remember any of
>> them
>> >> requiring
>> >>      >>> the
>> >>      >>> > > > > batches to be deterministic, and I'm having a hard
>> time
>> >> imagining
>> >>      >>> > what
>> >>      >>> > > > kind
>> >>      >>> > > > > of storage system would be able to deduplicate if
>> batches
>> >> were
>> >>      >>> > > > > deterministic but wouldn't be able to deduplicate if
>> they
>> >> weren't.
>> >>      >>> > > > >
>> >>      >>> > > > > On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau <
>> >>      >>> > > > [email protected] <mailto:[email protected]>>
>> >>
>> >>      >>> > > > > wrote:
>> >>      >>> > > > >
>> >>      >>> > > > >> Ok, let me try to step back and summarize what we
>> have
>> >> today and
>> >>      >>> > what
>> >>      >>> > > I
>> >>      >>> > > > >> miss:
>> >>      >>> > > > >>
>> >>      >>> > > > >> 1. we can handle chunking in beam through group in
>> batch
>> >> (or
>> >>      >>> > > equivalent)
>> >>      >>> > > > >> but:
>> >>      >>> > > > >>    > it is not built-in into the transforms (IO) and
>> it
>> >> is
>> >>      >>> > controlled
>> >>      >>> > > > >> from outside the transforms so no way for a
>> transform to
>> >> do it
>> >>      >>> > > > >> properly without handling itself a composition and
>> links
>> >> between
>> >>      >>> > > > >> multiple dofns to have notifications and potentially
>> >> react
>> >>      >>> properly
>> >>      >>> > or
>> >>      >>> > > > >> handle backpressure from its backend
>> >>      >>> > > > >> 2. there is no restart feature because there is no
>> real
>> >> state
>> >>      >>> > handling
>> >>      >>> > > > >> at the moment. this sounds fully delegated to the
>> runner
>> >> but I was
>> >>      >>> > > > >> hoping to have more guarantees from the used API to
>> be
>> >> able to
>> >>      >>> > restart
>> >>      >>> > > > >> a pipeline (mainly batch since it can be irrelevant
>> or
>> >> delegates
>> >>      >>> to
>> >>      >>> > > > >> the backend for streams) and handle only not commited
>> >> records so
>> >>      >>> it
>> >>      >>> > > > >> requires some persistence outside the main IO
>> storages
>> >> to do it
>> >>      >>> > > > >> properly
>> >>      >>> > > > >>    > note this is somehow similar to the monitoring
>> >> topic which
>> >>      >>> miss
>> >>      >>> > > > >> persistence ATM so it can end up to beam to have a
>> >> pluggable
>> >>      >>> storage
>> >>      >>> > > > >> for a few concerns
>> >>      >>> > > > >>
>> >>      >>> > > > >>
>> >>      >>> > > > >> Short term I would be happy with 1 solved properly,
>> long
>> >> term I
>> >>      >>> hope
>> >>      >>> > 2
>> >>      >>> > > > >> will be tackled without workarounds requiring custom
>> >> wrapping of
>> >>      >>> IO
>> >>      >>> > to
>> >>      >>> > > > >> use a custom state persistence.
>> >>      >>> > > > >>
>> >>      >>> > > > >>
>> >>      >>> > > > >>
>> >>      >>> > > > >> Romain Manni-Bucau
>> >>      >>> > > > >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>      >>> > > > >>
>> >>      >>> > > > >>
>> >>      >>> > > > >> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré
>> >>     <[email protected] <mailto:[email protected]>>:
>> >>
>> >>      >>> > > > >> > Thanks for the explanation. Agree, we might talk
>> about
>> >> different
>> >>      >>> > > > things
>> >>      >>> > > > >> > using the same wording.
>> >>      >>> > > > >> >
>> >>      >>> > > > >> > I'm also struggling to understand the use case
>> (for a
>> >> generic
>> >>      >>> > DoFn).
>> >>      >>> > > > >> >
>> >>      >>> > > > >> > Regards
>> >>      >>> > > > >> > JB
>> >>      >>> > > > >> >
>> >>      >>> > > > >> >
>> >>      >>> > > > >> > On 11/17/2017 07:40 AM, Eugene Kirpichov wrote:
>> >>      >>> > > > >> >>
>> >>      >>> > > > >> >> To avoid spending a lot of time pursuing a false
>> >> path, I'd like
>> >>      >>> > to
>> >>      >>> > > > say
>> >>      >>> > > > >> >> straight up that SDF is definitely not going to
>> help
>> >> here,
>> >>      >>> > despite
>> >>      >>> > > > the
>> >>      >>> > > > >> >> fact
>> >>      >>> > > > >> >> that its API includes the term "checkpoint". In
>> SDF,
>> >> the
>> >>      >>> > > "checkpoint"
>> >>      >>> > > > >> >> captures the state of processing within a single
>> >> element. If
>> >>      >>> > you're
>> >>      >>> > > > >> >> applying an SDF to 1000 elements, it will, like
>> any
>> >> other DoFn,
>> >>      >>> > be
>> >>      >>> > > > >> applied
>> >>      >>> > > > >> >> to each of them independently and in parallel, and
>> >> you'll have
>> >>      >>> > 1000
>> >>      >>> > > > >> >> checkpoints capturing the state of processing
>> each of
>> >> these
>> >>      >>> > > elements,
>> >>      >>> > > > >> >> which
>> >>      >>> > > > >> >> is probably not what you want.
>> >>      >>> > > > >> >>
>> >>      >>> > > > >> >> I'm afraid I still don't understand what kind of
>> >> checkpoint you
>> >>      >>> > > > need, if
>> >>      >>> > > > >> >> it
>> >>      >>> > > > >> >> is not just deterministic grouping into batches.
>> >> "Checkpoint"
>> >>      >>> is
>> >>      >>> > a
>> >>      >>> > > > very
>> >>      >>> > > > >> >> broad term and it's very possible that everybody
>> in
>> >> this thread
>> >>      >>> > is
>> >>      >>> > > > >> talking
>> >>      >>> > > > >> >> about different things when saying it. So it would
>> >> help if you
>> >>      >>> > > could
>> >>      >>> > > > >> give
>> >>      >>> > > > >> >> a
>> >>      >>> > > > >> >> more concrete example: for example, take some IO
>> that
>> >> you think
>> >>      >>> > > > could be
>> >>      >>> > > > >> >> easier to write with your proposed API, give the
>> >> contents of a
>> >>      >>> > > > >> >> hypothetical
>> >>      >>> > > > >> >> PCollection being written to this IO, give the
>> code
>> >> of a
>> >>      >>> > > hypothetical
>> >>      >>> > > > >> DoFn
>> >>      >>> > > > >> >> implementing the write using your API, and explain
>> >> what you'd
>> >>      >>> > > expect
>> >>      >>> > > > to
>> >>      >>> > > > >> >> happen at runtime.
>> >>      >>> > > > >> >>
>> >>      >>> > > > >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain
>> Manni-Bucau
>> >>      >>> > > > >> >> <[email protected]
>> >> <mailto:[email protected]>>
>> >>
>> >>      >>> > > > >> >> wrote:
>> >>      >>> > > > >> >>
>> >>      >>> > > > >> >>> @Eugene: yes and the other alternative of Reuven
>> too
>> >> but it is
>> >>      >>> > > still
>> >>      >>> > > > >> >>> 1. relying on timers, 2. not really checkpointed
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> In other words it seems all solutions are to
>> create
>> >> a chunk of
>> >>      >>> > > size
>> >>      >>> > > > 1
>> >>      >>> > > > >> >>> and replayable to fake the lack of chunking in
>> the
>> >> framework.
>> >>      >>> > This
>> >>      >>> > > > >> >>> always implies a chunk handling outside the
>> >> component
>> >>      >>> (typically
>> >>      >>> > > > >> >>> before for an output). My point is I think IO
>> need
>> >> it in their
>> >>      >>> > own
>> >>      >>> > > > >> >>> "internal" or at least control it themselves
>> since
>> >> the chunk
>> >>      >>> > size
>> >>      >>> > > is
>> >>      >>> > > > >> >>> part of the IO handling most of the time.
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> I think JB spoke of the same "group before" trick
>> >> using
>> >>      >>> > > restrictions
>> >>      >>> > > > >> >>> which can work I have to admit if SDF are
>> >> implemented by
>> >>      >>> > runners.
>> >>      >>> > > Is
>> >>      >>> > > > >> >>> there a roadmap/status on that? Last time I
>> checked
>> >> SDF was a
>> >>      >>> > > great
>> >>      >>> > > > >> >>> API without support :(.
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> Romain Manni-Bucau
>> >>      >>> > > > >> >>> @rmannibucau |  Blog | Old Blog | Github |
>> LinkedIn
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
>> >>      >>> > > > >> >>> <[email protected]>:
>> >>      >>> > > > >> >>>>
>> >>      >>> > > > >> >>>> JB, not sure what you mean? SDFs and triggers
>> are
>> >> unrelated,
>> >>      >>> > and
>> >>      >>> > > > the
>> >>      >>> > > > >> >>>> post
>> >>      >>> > > > >> >>>> doesn't mention the word. Did you mean something
>> >> else, e.g.
>> >>      >>> > > > >> restriction
>> >>      >>> > > > >> >>>> perhaps? Either way I don't think SDFs are the
>> >> solution here;
>> >>      >>> > > SDFs
>> >>      >>> > > > >> have
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> to
>> >>      >>> > > > >> >>>>
>> >>      >>> > > > >> >>>> do with the ability to split the processing of
>> *a
>> >> single
>> >>      >>> > element*
>> >>      >>> > > > over
>> >>      >>> > > > >> >>>> multiple calls, whereas Romain I think is asking
>> >> for
>> >>      >>> repeatable
>> >>      >>> > > > >> grouping
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> of
>> >>      >>> > > > >> >>>>
>> >>      >>> > > > >> >>>> *multiple* elements.
>> >>      >>> > > > >> >>>>
>> >>      >>> > > > >> >>>> Romain - does
>> >>      >>> > > > >> >>>>
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >>
>> https://github.com/apache/beam/blob/master/sdks/java/
>> >>     <https://github.com/apache/beam/blob/master/sdks/java/>
>> >>      >>> > > > core/src/main/java/org/apache/beam/sdk/transforms/
>> >>      >>> GroupIntoBatches.java
>> >>      >>> > > > >> >>>>
>> >>      >>> > > > >> >>>> do what
>> >>      >>> > > > >> >>>> you want?
>> >>      >>> > > > >> >>>>
>> >>      >>> > > > >> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste
>> >> Onofré <
>> >>      >>> > > > >> [email protected] <mailto:[email protected]>>
>> >>      >>> > > > >> >>>> wrote:
>> >>      >>> > > > >> >>>>
>> >>      >>> > > > >> >>>>> It sounds like the "Trigger" in the Splittable
>> >> DoFn, no ?
>> >>      >>> > > > >> >>>>>
>> >>      >>> > > > >> >>>>>
>> >> https://beam.apache.org/blog/2017/08/16/splittable-do-fn
>> >>     <https://beam.apache.org/blog/2017/08/16/splittable-do-fn>.
>> >>
>> >>      >>> html
>> >>      >>> > > > >> >>>>>
>> >>      >>> > > > >> >>>>> Regards
>> >>      >>> > > > >> >>>>> JB
>> >>      >>> > > > >> >>>>>
>> >>      >>> > > > >> >>>>>
>> >>      >>> > > > >> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau
>> wrote:
>> >>      >>> > > > >> >>>>>>
>> >>      >>> > > > >> >>>>>> it gives the fn/transform the ability to save
>> a
>> >> state - it
>> >>      >>> > can
>> >>      >>> > > > get
>> >>      >>> > > > >> >>>>>> back on "restart" / whatever unit we can use,
>> >> probably
>> >>      >>> runner
>> >>      >>> > > > >> >>>>>> dependent? Without that you need to rewrite
>> all
>> >> IO usage
>> >>      >>> with
>> >>      >>> > > > >> >>>>>> something like the previous pattern which
>> makes
>> >> the IO not
>> >>      >>> > self
>> >>      >>> > > > >> >>>>>> sufficient and kind of makes the entry cost
>> and
>> >> usage of
>> >>      >>> beam
>> >>      >>> > > way
>> >>      >>> > > > >> >>>>>> further.
>> >>      >>> > > > >> >>>>>>
>> >>      >>> > > > >> >>>>>> In my mind it is exactly what
>> jbatch/spring-batch
>> >> uses but
>> >>      >>> > > > adapted
>> >>      >>> > > > >> to
>> >>      >>> > > > >> >>>>>> beam (stream in particular) case.
>> >>      >>> > > > >> >>>>>>
>> >>      >>> > > > >> >>>>>> Romain Manni-Bucau
>> >>      >>> > > > >> >>>>>> @rmannibucau |  Blog | Old Blog | Github |
>> >> LinkedIn
>> >>      >>> > > > >> >>>>>>
>> >>      >>> > > > >> >>>>>>
>> >>      >>> > > > >> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax
>> >>      >>> > <[email protected]
>> >>      >>> > > >:
>> >>      >>> > > > >> >>>>>>>
>> >>      >>> > > > >> >>>>>>> Romain,
>> >>      >>> > > > >> >>>>>>>
>> >>      >>> > > > >> >>>>>>> Can you define what you mean by checkpoint?
>> What
>> >> are the
>> >>      >>> > > > semantics,
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> what
>> >>      >>> > > > >> >>>>>>>
>> >>      >>> > > > >> >>>>>>> does it accomplish?
>> >>      >>> > > > >> >>>>>>>
>> >>      >>> > > > >> >>>>>>> Reuven
>> >>      >>> > > > >> >>>>>>>
>> >>      >>> > > > >> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain
>> >> Manni-Bucau <
>> >>      >>> > > > >> >>>>>
>> >>      >>> > > > >> >>>>> [email protected]
>> >> <mailto:[email protected]>>
>> >>
>> >>      >>> > > > >> >>>>>>>
>> >>      >>> > > > >> >>>>>>> wrote:
>> >>      >>> > > > >> >>>>>>>
>> >>      >>> > > > >> >>>>>>>> Yes, what I propose earlier was:
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> I. checkpoint marker:
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> @AnyBeamAnnotation
>> >>      >>> > > > >> >>>>>>>> @CheckpointAfter
>> >>      >>> > > > >> >>>>>>>> public void someHook(SomeContext ctx);
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> II. pipeline.apply(ParDo.of(new
>> >>      >>> > > > >> MyFn()).withCheckpointAlgorithm(new
>> >>      >>> > > > >> >>>>>>>> CountingAlgo()))
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> III. (I like this one less)
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> // in the dofn
>> >>      >>> > > > >> >>>>>>>> @CheckpointTester
>> >>      >>> > > > >> >>>>>>>> public boolean shouldCheckpoint();
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> IV. @Checkpointer Serializable
>> getCheckpoint();
>> >> in the
>> >>      >>> dofn
>> >>      >>> > > per
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> element
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> Romain Manni-Bucau
>> >>      >>> > > > >> >>>>>>>> @rmannibucau |  Blog | Old Blog | Github |
>> >> LinkedIn
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi
>> >>      >>> > > > <[email protected]
>> >>      >>> > > > >> >>>>
>> >>      >>> > > > >> >>>> :
>> >>      >>> > > > >> >>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>> How would you define it (rough API is
>> fine)?.
>> >> Without
>> >>      >>> more
>> >>      >>> > > > >> details,
>> >>      >>> > > > >> >>>>>
>> >>      >>> > > > >> >>>>> it is
>> >>      >>> > > > >> >>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>> not easy to see wider applicability and
>> >> feasibility in
>> >>      >>> > > > runners.
>> >>      >>> > > > >> >>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain
>> >> Manni-Bucau <
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> [email protected]
>> >> <mailto:[email protected]>>
>> >>
>> >>      >>> > > > >> >>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>> wrote:
>> >>      >>> > > > >> >>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>> This is a fair summary of the current
>> state
>> >> but also
>> >>      >>> > where
>> >>      >>> > > > beam
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> can
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> have a
>> >>      >>> > > > >> >>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>> very strong added value and make big data
>> >> great and
>> >>      >>> > smooth.
>> >>      >>> > > > >> >>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>> Instead of this replay feature isnt
>> >> checkpointing
>> >>      >>> > willable?
>> >>      >>> > > > In
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> particular
>> >>      >>> > > > >> >>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>> with SDF no?
>> >>      >>> > > > >> >>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi"
>> >>      >>> > > > >> <[email protected]>
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> a
>> >>      >>> > > > >> >>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>> écrit :
>> >>      >>> > > > >> >>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> Core issue here is that there is no
>> explicit
>> >> concept
>> >>      >>> of
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> 'checkpoint'
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> in
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> Beam (UnboundedSource has a method
>> >> 'getCheckpointMark'
>> >>      >>> > but
>> >>      >>> > > > that
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> refers to
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> the checkoint on external source).
>> Runners
>> >> do
>> >>      >>> checkpoint
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> internally
>> >>      >>> > > > >> >>>>>
>> >>      >>> > > > >> >>>>> as
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> implementation detail. Flink's checkpoint
>> >> model is
>> >>      >>> > > entirely
>> >>      >>> > > > >> >>>>>
>> >>      >>> > > > >> >>>>> different
>> >>      >>> > > > >> >>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>> from
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> Dataflow's and Spark's.
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> @StableReplay helps, but it does not
>> >> explicitly talk
>> >>      >>> > about
>> >>      >>> > > a
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> checkpoint
>> >>      >>> > > > >> >>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>> by
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> design.
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> If you are looking to achieve some
>> >> guarantees with a
>> >>      >>> > > > >> sink/DoFn, I
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> think
>> >>      >>> > > > >> >>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>> it
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> is better to start with the
>> requirements. I
>> >> worked on
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> exactly-once
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> sink
>> >>      >>> > > > >> >>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>> for
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()),
>> where
>> >> we
>> >>      >>> > > essentially
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> reshard
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> the
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> elements and assign sequence numbers to
>> >> elements with
>> >>      >>> in
>> >>      >>> > > > each
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> shard.
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> Duplicates in replays are avoided based
>> on
>> >> these
>> >>      >>> > sequence
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> numbers.
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> DoFn
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> state API is used to buffer out-of order
>> >> replays. The
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> implementation
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> strategy works in Dataflow but not in
>> Flink
>> >> which has
>> >>      >>> a
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> horizontal
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> checkpoint. KafkaIO checks for
>> >> compatibility.
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain
>> >> Manni-Bucau <
>> >>      >>> > > > >> >>>>>>>>>>> [email protected]
>> >> <mailto:[email protected]>>
>> >>
>> >>      >>> > > > >> >>>>>>>>>>> wrote:
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>> Hi guys,
>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>> The subject is a bit provocative but the
>> >> topic is
>> >>      >>> real
>> >>      >>> > > and
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> coming
>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>> again and again with the beam usage:
>> how a
>> >> dofn can
>> >>      >>> > > handle
>> >>      >>> > > > >> some
>> >>      >>> > > > >> >>>>>>>>>>>> "chunking".
>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>> The need is to be able to commit each N
>> >> records but
>> >>      >>> > with
>> >>      >>> > > N
>> >>      >>> > > > not
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> too
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> big.
>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>> The natural API for that in beam is the
>> >> bundle one
>> >>      >>> but
>> >>      >>> > > > bundles
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> are
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> not
>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>> reliable since they can be very small
>> >> (flink) - we
>> >>      >>> can
>> >>      >>> > > say
>> >>      >>> > > > it
>> >>      >>> > > > >> is
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> "ok"
>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>> even if it has some perf impacts - or
>> too
>> >> big (spark
>> >>      >>> > does
>> >>      >>> > > > full
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> size
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> /
>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>> #workers).
>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>> The workaround is what we see in the ES
>> >> I/O: a
>> >>      >>> maxSize
>> >>      >>> > > > which
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> does
>> >>      >>> > > > >> >>>>>
>> >>      >>> > > > >> >>>>> an
>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>> eager flush. The issue is that then the
>> >> checkpoint is
>> >>      >>> > not
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>> respected
>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>> and you can process multiple times the
>> same
>> >> records.
>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>> Any plan to make this API reliable and
>> >> controllable
>> >>      >>> > from
>> >>      >>> > > a
>> >>      >>> > > > >> beam
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>>>> point
>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>> of view (at least in a max manner)?
>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>> Thanks,
>> >>      >>> > > > >> >>>>>>>>>>>> Romain Manni-Bucau
>> >>      >>> > > > >> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog |
>> Github |
>> >> LinkedIn
>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>>>
>> >>      >>> > > > >> >>>>>>>>
>> >>      >>> > > > >> >>>>>
>> >>      >>> > > > >> >>>>> --
>> >>      >>> > > > >> >>>>> Jean-Baptiste Onofré
>> >>      >>> > > > >> >>>>> [email protected] <mailto:
>> [email protected]>
>> >>      >>> > > > >> >>>>> http://blog.nanthrax.net
>> >>      >>> > > > >> >>>>> Talend - http://www.talend.com
>> >>      >>> > > > >> >>>>>
>> >>      >>> > > > >> >>>
>> >>      >>> > > > >> >>
>> >>      >>> > > > >> >
>> >>      >>> > > > >> > --
>> >>      >>> > > > >> > Jean-Baptiste Onofré
>> >>      >>> > > > >> > [email protected] <mailto:[email protected]>
>> >>      >>> > > > >> > http://blog.nanthrax.net
>> >>      >>> > > > >> > Talend - http://www.talend.com
>> >>      >>> > > > >>
>> >>      >>> > > >
>> >>      >>> > >
>> >>      >>> >
>> >>      >>>
>> >>
>> >>
>> >
>> > --
>> > Jean-Baptiste Onofré
>> > [email protected]
>> > http://blog.nanthrax.net
>> > Talend - http://www.talend.com
>>
>>
>
>

Reply via email to