I think both concepts likely need to co-exist:

As described in the execution model [1] bundling is a runner-specific
choice about how to execute a pipeline. This affects how frequently it may
need to checkpoint during process, how much communication overhead there is
between workers, the scope of retries, etc. To allow runners to innovate
and explore various options in this space, the semantics of a pipeline
shouldn't depend on bundling.

For cases where a pipeline wants to process N values together, a transform
(or annotation) to explicitly indicate that the semantics of the pipeline
require processing batches of elements makes sense. This is the intended
use of GroupIntoBatches, and allows the two concepts to be kept distinct --
bundles are a runner choice and GroupIntoBatches is a semantic choice.

https://beam.apache.org/documentation/execution-model/#bundling-and-persistence

On Thu, Nov 30, 2017 at 9:46 AM Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> @Ben: would all IO be rewritten to use that and the bundle concept
> dropped from the API to avoid any ambiguity and misleading usage like
> in current IOs?
>
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>
>
> 2017-11-30 18:43 GMT+01:00 Ben Chambers <bchamb...@google.com>:
> > Beam includes a GroupIntoBatches transform (see
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
> )
> > which I believe was intended to be used as part of such a portable IO. It
> > can be used to request that elements are divided into batches of some
> size
> > which can then be used for further processing.
> >
> > On Thu, Nov 30, 2017 at 9:32 AM Romain Manni-Bucau <
> rmannibu...@gmail.com>
> > wrote:
> >>
> >> 2017-11-30 18:11 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:
> >> > Very strong -1 from me:
> >> > - Having a pipeline-global parameter is bad because it will apply to
> all
> >> > transforms, with no ability to control it for individual transforms.
> >> > This
> >> > can go especially poorly because it means that when I write a
> transform,
> >> > I
> >> > don't know whether a user will set this parameter in their pipeline
> to a
> >> > value that's perhaps good for the user's transform but really bad for
> my
> >> > transform; and the user will likely blame my transform for poor
> >> > performance.
> >> > A parameter like this should be set on exactly the thing it applies
> to:
> >> > e.g.
> >> > on the particular IO; and it should be set by the IO itself, not by a
> >> > user
> >> > in pipeline options, because the IO author likely knows better than a
> >> > user
> >> > what is a good value.
> >>
> >> This is true and this is worse today since the user can't tune it but
> >> the IO doesn't handle it as well. it is up to the runner and none
> >> implement it in a way which is IO friendly -check flink and spark
> >> which do the exact opposite, bundle=1 vs bundle=datatset/partitions)
> >>
> >> Also note it is a "max" and not an exact value in the proposal.
> >>
> >> > - The parameter will not achieve what many IOs want, either. In some
> >> > cases,
> >> > you want to limit the number of bytes you write. In some cases, you
> want
> >> > to
> >> > limit the number of values within a key that you write. In some cases,
> >> > it's
> >> > something else - it isn't always elements.
> >>
> >> Elements is the only thing users can really tune since you can't
> >> assume the content.
> >>
> >> > - The parameter will achieve none of the issues that you I think
> raised
> >> > in
> >> > the thread above: it doesn't give deterministic replay, nor any kind
> of
> >> > fault tolerance.
> >>
> >> Right, it only partially solves the first issue popping up: the
> >> chunking. However I think it is a quick win.
> >>
> >> > - Having a parameter like this *at all* goes against Beam's "no knobs"
> >> > philosophy - for all the usual reasons: 1) it encourages users to
> waste
> >> > time
> >> > looking in the wrong places when doing performance tuning: tuning
> >> > parameters
> >> > is almost never the best way to improve performance; 2) when users can
> >> > set a
> >> > tuning parameter, in my experience it is almost always set wrong, or
> >> > perhaps
> >> > it was once set right but then nobody updates it when the use case or
> >> > implementation changes; and we can end up in a situation where the
> >> > pipeline
> >> > is performing poorly because of the parameter but the runner isn't
> >> > allowed
> >> > to choose a better value. (in experience with legacy data processing
> >> > systems
> >> > in Google, like MapReduce, that support plenty of tuning parameters, a
> >> > very
> >> > common advice to someone complaining about a poorly performing job is
> >> > "have
> >> > you tried removing all your parameters?")
> >>
> >> I would be fine with that but what is the alternative?
> >>
> >> > - I still fail to understand the exact issue we're talking about, and
> >> > I've
> >> > made a number of suggestions as to how this understanding could be
> >> > achieved:
> >> > show code that demonstrates the issue; and show how the code could be
> >> > improved by a hypothetical API.
> >>
> >> First immediately blocking issue is how to batch records reliably and
> >> *portably* (the biggest beam added-value IMHO).
> >> Since bundles are "flush" trigger for most IO it means ensuring the
> >> bundle size is somehow controllable or at least not set to a very
> >> small value OOTB.
> >>
> >> An alternative to this proposal can be to let an IO give an hint about
> >> its desired bundle size. Would work as well for that particular issue.
> >> Does it sound better?
> >>
> >> >
> >> > On Thu, Nov 30, 2017 at 6:17 AM Jean-Baptiste Onofré <j...@nanthrax.net
> >
> >> > wrote:
> >> >>
> >> >> It sounds reasonable to me.
> >> >>
> >> >> And agree for Spark, I would like to merge Spark 2 update first.
> >> >>
> >> >> Regards
> >> >> JB
> >> >>
> >> >> On 11/30/2017 03:09 PM, Romain Manni-Bucau wrote:
> >> >> > Guys,
> >> >> >
> >> >> > what about moving getMaxBundleSize from flink options to pipeline
> >> >> > options. I think all runners can support it right? Spark code needs
> >> >> > the merge of the v2 before being able to be implemented probably
> but
> >> >> > I
> >> >> > don't see any blocker.
> >> >> >
> >> >> > wdyt?
> >> >> >
> >> >> > Romain Manni-Bucau
> >> >> > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >
> >> >> >
> >> >> > 2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau <
> rmannibu...@gmail.com>:
> >> >> >> @Eugene: "workaround" as specific to the IO each time and
> therefore
> >> >> >> still highlight a lack in the core.
> >> >> >>
> >> >> >> Other comments inline
> >> >> >>
> >> >> >>
> >> >> >> 2017-11-19 7:40 GMT+01:00 Robert Bradshaw
> >> >> >> <rober...@google.com.invalid>:
> >> >> >>> There is a possible fourth issue that we don't handle well:
> >> >> >>> efficiency. For
> >> >> >>> very large bundles, it may be advantageous to avoid replaying a
> >> >> >>> bunch
> >> >> >>> of
> >> >> >>> idempotent operations if there were a way to record what ones
> we're
> >> >> >>> sure
> >> >> >>> went through. Not sure if that's the issue here (though one could
> >> >> >>> possibly
> >> >> >>> do this with SDFs, one can preemptively returning periodically
> >> >> >>> before
> >> >> >>> an
> >> >> >>> element (or portion thereof) is done).
> >> >> >>
> >> >> >> +1, also lead to the IO handling its own chunking/bundles and
> >> >> >> therefore solves all issues at once.
> >> >> >>
> >> >> >>>
> >> >> >>> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov <
> >> >> >>> kirpic...@google.com.invalid> wrote:
> >> >> >>>
> >> >> >>>> I disagree that the usage of document id in ES is a
> "workaround" -
> >> >> >>>> it
> >> >> >>>> does
> >> >> >>>> not address any *accidental *complexity
> >> >> >>>> <https://en.wikipedia.org/wiki/No_Silver_Bullet> coming from
> >> >> >>>> shortcomings
> >> >> >>>> of Beam, it addresses the *essential* complexity that a
> >> >> >>>> distributed
> >> >> >>>> system
> >> >> >>>> forces one to take it as a fact of nature that the same write
> >> >> >>>> (mutation) will happen multiple times, so if you want a mutation
> >> >> >>>> to
> >> >> >>>> happen
> >> >> >>>> "as-if" it happened exactly once, the mutation itself must be
> >> >> >>>> idempotent
> >> >> >>>> <https://en.wikipedia.org/wiki/Idempotence>. Insert-with-id
> >> >> >>>> (upsert
> >> >> >>>> <https://en.wikipedia.org/wiki/Merge_(SQL)>) is a classic
> example
> >> >> >>>> of
> >> >> >>>> an
> >> >> >>>> idempotent mutation, and it's very good that Elasticsearch
> >> >> >>>> provides
> >> >> >>>> it - if
> >> >> >>>> it didn't, no matter how good of an API Beam had, achieving
> >> >> >>>> exactly-once
> >> >> >>>> writes would be theoretically impossible. Are we in agreement on
> >> >> >>>> this
> >> >> >>>> so
> >> >> >>>> far?
> >> >> >>>>
> >> >> >>>> Next: you seem to be discussing 3 issues together, all of which
> >> >> >>>> are
> >> >> >>>> valid
> >> >> >>>> issues, but they seem unrelated to me:
> >> >> >>>> 1. Exactly-once mutation
> >> >> >>>> 2. Batching multiple mutations into one RPC.
> >> >> >>>> 3. Backpressure
> >> >> >>>>
> >> >> >>>> #1: was considered above. The system the IO is talking to has to
> >> >> >>>> support
> >> >> >>>> idempotent mutations, in an IO-specific way, and the IO has to
> >> >> >>>> take
> >> >> >>>> advantage of them, in the IO-specific way - end of story.
> >> >> >>
> >> >> >> Agree but don't forget the original point was about "chunks" and
> not
> >> >> >> individual records.
> >> >> >>
> >> >> >>>>
> >> >> >>>> #2: a batch of idempotent operations is also idempotent, so this
> >> >> >>>> doesn't
> >> >> >>>> add anything new semantically. Syntactically - Beam already
> allows
> >> >> >>>> you to
> >> >> >>>> write your own batching by notifying you of permitted batch
> >> >> >>>> boundaries
> >> >> >>>> (Start/FinishBundle). Sure, it could do more, but from my
> >> >> >>>> experience
> >> >> >>>> the
> >> >> >>>> batching in IOs I've seen is one of the easiest and least
> >> >> >>>> error-prone
> >> >> >>>> parts, so I don't see something worth an extended discussion
> here.
> >> >> >>
> >> >> >> "Beam already allows you to
> >> >> >>   write your own batching by notifying you of permitted batch
> >> >> >> boundaries
> >> >> >>   (Start/FinishBundle)"
> >> >> >>
> >> >> >> Is wrong since the bundle is potentially the whole PCollection
> >> >> >> (spark)
> >> >> >> so this is not even an option until you use the SDF (back to the
> >> >> >> same
> >> >> >> point).
> >> >> >> Once again the API looks fine but no implementation makes it true.
> >> >> >> It
> >> >> >> would be easy to change it in spark, flink can be ok since it
> >> >> >> targets
> >> >> >> more the streaming case, not sure of others, any idea?
> >> >> >>
> >> >> >>
> >> >> >>>>
> >> >> >>>> #3: handling backpressure is a complex problem with multiple
> >> >> >>>> facets:
> >> >> >>>> 1) how
> >> >> >>>> do you know you're being throttled, and by how much are you
> >> >> >>>> exceeding
> >> >> >>>> the
> >> >> >>>> external system's capacity?
> >> >> >>
> >> >> >> This is the whole point of backpressure, the system sends it back
> to
> >> >> >> you (header like or status technic in general)
> >> >> >>
> >> >> >>>> 2) how do you communicate this signal to the
> >> >> >>>> runner?
> >> >> >>
> >> >> >> You are a client so you get the meta in the response - whatever
> >> >> >> techno.
> >> >> >>
> >> >> >>>> 3) what does the runner do in response?
> >> >> >>
> >> >> >> Runner nothing but the IO adapts its handling as mentionned before
> >> >> >> (wait and retry, skip, ... depending the config)
> >> >> >>
> >> >> >>>> 4) how do you wait until
> >> >> >>>> it's ok to try again?
> >> >> >>
> >> >> >> This is one point to probably enhance in beam but waiting in the
> >> >> >> processing is an option if the source has some buffering otherwise
> >> >> >> it
> >> >> >> requires to have a buffer fallback and max size if the wait mode
> is
> >> >> >> activated.
> >> >> >>
> >> >> >>>> You seem to be advocating for solving one facet of this problem,
> >> >> >>>> which is:
> >> >> >>>> you want it to be possible to signal to the runner "I'm being
> >> >> >>>> throttled,
> >> >> >>>> please end the bundle", right? If so - I think this (ending the
> >> >> >>>> bundle) is
> >> >> >>>> unnecessary: the DoFn can simply do an exponential back-off
> sleep
> >> >> >>>> loop.
> >> >> >>
> >> >> >> Agree, never said the runner should know but GBK+output doesnt
> work
> >> >> >> cause you dont own the GBK.
> >> >> >>
> >> >> >>>> This is e.g. what DatastoreIO does
> >> >> >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/
> >> >> >>>> google-cloud-platform/src/main/java/org/apache/beam/sdk/
> >> >> >>>> io/gcp/datastore/DatastoreV1.java#L1318>
> >> >> >>>> and
> >> >> >>>> this is in general how most systems I've seen handle
> backpressure.
> >> >> >>>> Is
> >> >> >>>> there
> >> >> >>>> something I'm missing? In particular, is there any compelling
> >> >> >>>> reason
> >> >> >>>> why
> >> >> >>>> you think it'd be beneficial e.g. for DatastoreIO to commit the
> >> >> >>>> results of
> >> >> >>>> the bundle so far before processing other elements?
> >> >> >>
> >> >> >> It was more about ensuring you validate early a subset of the
> whole
> >> >> >> bundle and avoid to reprocess it if it fails later.
> >> >> >>
> >> >> >>
> >> >> >> So to summarize I see 2 outcomes:
> >> >> >>
> >> >> >> 1. impl SDF in all runners
> >> >> >> 2. make the bundle size upper bounded - through a pipeline option
> -
> >> >> >> in
> >> >> >> all runners, not sure this one is doable everywhere since I mainly
> >> >> >> checked spark case
> >> >> >>
> >> >> >>>>
> >> >> >>>> Again, it might be that I'm still misunderstanding what you're
> >> >> >>>> trying
> >> >> >>>> to
> >> >> >>>> say. One of the things it would help to clarify would be -
> exactly
> >> >> >>>> what do
> >> >> >>>> you mean by "how batch frameworks solved that for years": can
> you
> >> >> >>>> point at
> >> >> >>>> an existing API in some other framework that achieves what you
> >> >> >>>> want?
> >> >> >>>>
> >> >> >>>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau
> >> >> >>>> <rmannibu...@gmail.com>
> >> >> >>>> wrote:
> >> >> >>>>
> >> >> >>>>> Eugene, point - and issue with a single sample - is you can
> >> >> >>>>> always
> >> >> >>>>> find
> >> >> >>>>> *workarounds* on a case by case basis as the id one with ES but
> >> >> >>>>> beam
> >> >> >>>> doesnt
> >> >> >>>>> solve the problem as a framework.
> >> >> >>>>>
> >> >> >>>>>  From my past, I clearly dont see how batch frameworks solved
> >> >> >>>>> that
> >> >> >>>>> for
> >> >> >>>> years
> >> >> >>>>> and beam is not able to do it - keep in mind it is the same
> kind
> >> >> >>>>> of
> >> >> >>>> techno,
> >> >> >>>>> it just uses different sources and bigger clusters so no real
> >> >> >>>>> reason
> >> >> >>>>> to
> >> >> >>>> not
> >> >> >>>>> have the same feature quality. The only potential reason i see
> is
> >> >> >>>>> there
> >> >> >>>> is
> >> >> >>>>> no tracking of the state into the cluster - e2e. But i dont see
> >> >> >>>>> why
> >> >> >>>>> there
> >> >> >>>>> wouldnt be. Do I miss something here?
> >> >> >>>>>
> >> >> >>>>> An example could be: take a github crawler computing stats on
> the
> >> >> >>>>> whole
> >> >> >>>>> girhub repos which is based on a rest client as example. You
> will
> >> >> >>>>> need to
> >> >> >>>>> handle the rate limit and likely want to "commit" each time you
> >> >> >>>>> reach a
> >> >> >>>>> rate limit with likely some buffering strategy with a max size
> >> >> >>>>> before
> >> >> >>>>> really waiting. How do you do it with a GBK independent of your
> >> >> >>>>> dofn? You
> >> >> >>>>> are not able to compose correctly the fn between them :(.
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>> Le 18 nov. 2017 20:48, "Eugene Kirpichov"
> >> >> >>>>> <kirpic...@google.com.invalid>
> >> >> >>>> a
> >> >> >>>>> écrit :
> >> >> >>>>>
> >> >> >>>>> After giving this thread my best attempt at understanding
> exactly
> >> >> >>>>> what is
> >> >> >>>>> the problem and the proposed solution, I'm afraid I still fail
> to
> >> >> >>>>> understand both. To reiterate, I think the only way to make
> >> >> >>>>> progress
> >> >> >>>>> here
> >> >> >>>>> is to be more concrete: (quote) take some IO that you think
> could
> >> >> >>>>> be
> >> >> >>>> easier
> >> >> >>>>> to write with your proposed API, give the contents of a
> >> >> >>>>> hypothetical
> >> >> >>>>> PCollection being written to this IO, give the code of a
> >> >> >>>>> hypothetical
> >> >> >>>> DoFn
> >> >> >>>>> implementing the write using your API, and explain what you'd
> >> >> >>>>> expect
> >> >> >>>>> to
> >> >> >>>>> happen at runtime. I'm going to re-engage in this thread after
> >> >> >>>>> such
> >> >> >>>>> an
> >> >> >>>>> example is given.
> >> >> >>>>>
> >> >> >>>>> On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau
> >> >> >>>>> <rmannibu...@gmail.com>
> >> >> >>>>> wrote:
> >> >> >>>>>
> >> >> >>>>>> First bundle retry is unusable with dome runners like spark
> >> >> >>>>>> where
> >> >> >>>>>> the
> >> >> >>>>>> bundle size is the collection size / number of work. This
> means
> >> >> >>>>>> a
> >> >> >>>>>> user
> >> >> >>>>> cant
> >> >> >>>>>> use bundle API or feature reliably and portably - which is
> beam
> >> >> >>>> promise.
> >> >> >>>>>> Aligning chunking and bundles would guarantee that bit can be
> >> >> >>>>>> not
> >> >> >>>>> desired,
> >> >> >>>>>> that is why i thought it can be another feature.
> >> >> >>>>>>
> >> >> >>>>>> GBK works until the IO knows about that and both concepts are
> >> >> >>>>>> not
> >> >> >>>> always
> >> >> >>>>>> orthogonal - backpressure like systems is a trivial common
> >> >> >>>>>> example.
> >> >> >>>> This
> >> >> >>>>>> means the IO (dofn) must be able to do it itself at some
> point.
> >> >> >>>>>>
> >> >> >>>>>> Also note the GBK works only if the IO can take a list which
> is
> >> >> >>>>>> never
> >> >> >>>> the
> >> >> >>>>>> case today.
> >> >> >>>>>>
> >> >> >>>>>> Big questions for me are: is SDF the way to go since it
> provides
> >> >> >>>>>> the
> >> >> >>>>> needed
> >> >> >>>>>> API bit is not yet supported? What about existing IO? Should
> >> >> >>>>>> beam
> >> >> >>>> provide
> >> >> >>>>>> an auto wrapping of dofn for that pre-aggregated support and
> >> >> >>>>>> simulate
> >> >> >>>>>> bundles to the actual IO impl to keep the existing API?
> >> >> >>>>>>
> >> >> >>>>>>
> >> >> >>>>>> Le 17 nov. 2017 19:20, "Raghu Angadi"
> >> >> >>>>>> <rang...@google.com.invalid>
> >> >> >>>>>> a
> >> >> >>>>>> écrit :
> >> >> >>>>>>
> >> >> >>>>>> On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau <
> >> >> >>>>> rmannibu...@gmail.com
> >> >> >>>>>>>
> >> >> >>>>>> wrote:
> >> >> >>>>>>
> >> >> >>>>>>> Yep, just take ES IO, if a part of a bundle fails you are in
> an
> >> >> >>>>>>> unmanaged state. This is the case for all O (of IO ;)). Issue
> >> >> >>>>>>> is
> >> >> >>>>>>> not
> >> >> >>>>>>> much about "1" (the code it takes) but more the fact it
> doesn't
> >> >> >>>>>>> integrate with runner features and retries potentially: what
> >> >> >>>>>>> happens
> >> >> >>>>>>> if a bundle has a failure? => undefined today. 2. I'm fine
> with
> >> >> >>>>>>> it
> >> >> >>>>>>> while we know exactly what happens when we restart after a
> >> >> >>>>>>> bundle
> >> >> >>>>>>> failure. With ES the timestamp can be used for instance.
> >> >> >>>>>>>
> >> >> >>>>>>
> >> >> >>>>>> This deterministic batching can be achieved even now with an
> >> >> >>>>>> extra
> >> >> >>>>>> GroupByKey (and if you want ordering on top of that, will need
> >> >> >>>>>> another
> >> >> >>>>>> GBK). Don't know if that is too costly in your case. I would
> >> >> >>>>>> need
> >> >> >>>>>> bit
> >> >> >>>>> more
> >> >> >>>>>> details on handling ES IO write retries to see it could be
> >> >> >>>>>> simplified.
> >> >> >>>>> Note
> >> >> >>>>>> that retries occur with or without any failures in your DoFn.
> >> >> >>>>>>
> >> >> >>>>>> The biggest negative with GBK approach is that it doesn't
> >> >> >>>>>> provide
> >> >> >>>>>> same
> >> >> >>>>>> guarantees on Flink.
> >> >> >>>>>>
> >> >> >>>>>> I don't see how GroubIntoBatches in Beam provides specific
> >> >> >>>>>> guarantees
> >> >> >>>> on
> >> >> >>>>>> deterministic batches.
> >> >> >>>>>>
> >> >> >>>>>> Thinking about it the SDF is really a way to do it since the
> SDF
> >> >> >>>>>> will
> >> >> >>>>>>> manage the bulking and associated with the runner "retry" it
> >> >> >>>>>>> seems
> >> >> >>>>>>> it
> >> >> >>>>>>> covers the needs.
> >> >> >>>>>>>
> >> >> >>>>>>> Romain Manni-Bucau
> >> >> >>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >>>>>>>
> >> >> >>>>>>>
> >> >> >>>>>>> 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov
> >> >> >>>>> <kirpic...@google.com.invalid
> >> >> >>>>>>> :
> >> >> >>>>>>>> I must admit I'm still failing to understand the problem, so
> >> >> >>>>>>>> let's
> >> >> >>>>> step
> >> >> >>>>>>>> back even further.
> >> >> >>>>>>>>
> >> >> >>>>>>>> Could you give an example of an IO that is currently
> difficult
> >> >> >>>>>>>> to
> >> >> >>>>>>> implement
> >> >> >>>>>>>> specifically because of lack of the feature you're talking
> >> >> >>>>>>>> about?
> >> >> >>>>>>>>
> >> >> >>>>>>>> I'm asking because I've reviewed almost all Beam IOs and
> don't
> >> >> >>>> recall
> >> >> >>>>>>>> seeing a similar problem. Sure, a lot of IOs do batching
> >> >> >>>>>>>> within a
> >> >> >>>>>> bundle,
> >> >> >>>>>>>> but 1) it doesn't take up much code (granted, it would be
> even
> >> >> >>>> easier
> >> >> >>>>>> if
> >> >> >>>>>>>> Beam did it for us) and 2) I don't remember any of them
> >> >> >>>>>>>> requiring
> >> >> >>>> the
> >> >> >>>>>>>> batches to be deterministic, and I'm having a hard time
> >> >> >>>>>>>> imagining
> >> >> >>>>> what
> >> >> >>>>>>> kind
> >> >> >>>>>>>> of storage system would be able to deduplicate if batches
> were
> >> >> >>>>>>>> deterministic but wouldn't be able to deduplicate if they
> >> >> >>>>>>>> weren't.
> >> >> >>>>>>>>
> >> >> >>>>>>>> On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau <
> >> >> >>>>>>> rmannibu...@gmail.com>
> >> >> >>>>>>>> wrote:
> >> >> >>>>>>>>
> >> >> >>>>>>>>> Ok, let me try to step back and summarize what we have
> today
> >> >> >>>>>>>>> and
> >> >> >>>>> what
> >> >> >>>>>> I
> >> >> >>>>>>>>> miss:
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> 1. we can handle chunking in beam through group in batch
> (or
> >> >> >>>>>> equivalent)
> >> >> >>>>>>>>> but:
> >> >> >>>>>>>>>     > it is not built-in into the transforms (IO) and it is
> >> >> >>>>> controlled
> >> >> >>>>>>>>> from outside the transforms so no way for a transform to do
> >> >> >>>>>>>>> it
> >> >> >>>>>>>>> properly without handling itself a composition and links
> >> >> >>>>>>>>> between
> >> >> >>>>>>>>> multiple dofns to have notifications and potentially react
> >> >> >>>> properly
> >> >> >>>>> or
> >> >> >>>>>>>>> handle backpressure from its backend
> >> >> >>>>>>>>> 2. there is no restart feature because there is no real
> state
> >> >> >>>>> handling
> >> >> >>>>>>>>> at the moment. this sounds fully delegated to the runner
> but
> >> >> >>>>>>>>> I
> >> >> >>>>>>>>> was
> >> >> >>>>>>>>> hoping to have more guarantees from the used API to be able
> >> >> >>>>>>>>> to
> >> >> >>>>> restart
> >> >> >>>>>>>>> a pipeline (mainly batch since it can be irrelevant or
> >> >> >>>>>>>>> delegates
> >> >> >>>> to
> >> >> >>>>>>>>> the backend for streams) and handle only not commited
> records
> >> >> >>>>>>>>> so
> >> >> >>>> it
> >> >> >>>>>>>>> requires some persistence outside the main IO storages to
> do
> >> >> >>>>>>>>> it
> >> >> >>>>>>>>> properly
> >> >> >>>>>>>>>     > note this is somehow similar to the monitoring topic
> >> >> >>>>>>>>> which
> >> >> >>>> miss
> >> >> >>>>>>>>> persistence ATM so it can end up to beam to have a
> pluggable
> >> >> >>>> storage
> >> >> >>>>>>>>> for a few concerns
> >> >> >>>>>>>>>
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> Short term I would be happy with 1 solved properly, long
> term
> >> >> >>>>>>>>> I
> >> >> >>>> hope
> >> >> >>>>> 2
> >> >> >>>>>>>>> will be tackled without workarounds requiring custom
> wrapping
> >> >> >>>>>>>>> of
> >> >> >>>> IO
> >> >> >>>>> to
> >> >> >>>>>>>>> use a custom state persistence.
> >> >> >>>>>>>>>
> >> >> >>>>>>>>>
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> Romain Manni-Bucau
> >> >> >>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >>>>>>>>>
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré
> >> >> >>>>>>>>> <j...@nanthrax.net>:
> >> >> >>>>>>>>>> Thanks for the explanation. Agree, we might talk about
> >> >> >>>>>>>>>> different
> >> >> >>>>>>> things
> >> >> >>>>>>>>>> using the same wording.
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> I'm also struggling to understand the use case (for a
> >> >> >>>>>>>>>> generic
> >> >> >>>>> DoFn).
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> Regards
> >> >> >>>>>>>>>> JB
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> On 11/17/2017 07:40 AM, Eugene Kirpichov wrote:
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> To avoid spending a lot of time pursuing a false path,
> I'd
> >> >> >>>>>>>>>>> like
> >> >> >>>>> to
> >> >> >>>>>>> say
> >> >> >>>>>>>>>>> straight up that SDF is definitely not going to help
> here,
> >> >> >>>>> despite
> >> >> >>>>>>> the
> >> >> >>>>>>>>>>> fact
> >> >> >>>>>>>>>>> that its API includes the term "checkpoint". In SDF, the
> >> >> >>>>>> "checkpoint"
> >> >> >>>>>>>>>>> captures the state of processing within a single element.
> >> >> >>>>>>>>>>> If
> >> >> >>>>> you're
> >> >> >>>>>>>>>>> applying an SDF to 1000 elements, it will, like any other
> >> >> >>>>>>>>>>> DoFn,
> >> >> >>>>> be
> >> >> >>>>>>>>> applied
> >> >> >>>>>>>>>>> to each of them independently and in parallel, and you'll
> >> >> >>>>>>>>>>> have
> >> >> >>>>> 1000
> >> >> >>>>>>>>>>> checkpoints capturing the state of processing each of
> these
> >> >> >>>>>> elements,
> >> >> >>>>>>>>>>> which
> >> >> >>>>>>>>>>> is probably not what you want.
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> I'm afraid I still don't understand what kind of
> checkpoint
> >> >> >>>>>>>>>>> you
> >> >> >>>>>>> need, if
> >> >> >>>>>>>>>>> it
> >> >> >>>>>>>>>>> is not just deterministic grouping into batches.
> >> >> >>>>>>>>>>> "Checkpoint"
> >> >> >>>> is
> >> >> >>>>> a
> >> >> >>>>>>> very
> >> >> >>>>>>>>>>> broad term and it's very possible that everybody in this
> >> >> >>>>>>>>>>> thread
> >> >> >>>>> is
> >> >> >>>>>>>>> talking
> >> >> >>>>>>>>>>> about different things when saying it. So it would help
> if
> >> >> >>>>>>>>>>> you
> >> >> >>>>>> could
> >> >> >>>>>>>>> give
> >> >> >>>>>>>>>>> a
> >> >> >>>>>>>>>>> more concrete example: for example, take some IO that you
> >> >> >>>>>>>>>>> think
> >> >> >>>>>>> could be
> >> >> >>>>>>>>>>> easier to write with your proposed API, give the contents
> >> >> >>>>>>>>>>> of a
> >> >> >>>>>>>>>>> hypothetical
> >> >> >>>>>>>>>>> PCollection being written to this IO, give the code of a
> >> >> >>>>>> hypothetical
> >> >> >>>>>>>>> DoFn
> >> >> >>>>>>>>>>> implementing the write using your API, and explain what
> >> >> >>>>>>>>>>> you'd
> >> >> >>>>>> expect
> >> >> >>>>>>> to
> >> >> >>>>>>>>>>> happen at runtime.
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau
> >> >> >>>>>>>>>>> <rmannibu...@gmail.com>
> >> >> >>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>>> @Eugene: yes and the other alternative of Reuven too but
> >> >> >>>>>>>>>>>> it
> >> >> >>>>>>>>>>>> is
> >> >> >>>>>> still
> >> >> >>>>>>>>>>>> 1. relying on timers, 2. not really checkpointed
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> In other words it seems all solutions are to create a
> >> >> >>>>>>>>>>>> chunk
> >> >> >>>>>>>>>>>> of
> >> >> >>>>>> size
> >> >> >>>>>>> 1
> >> >> >>>>>>>>>>>> and replayable to fake the lack of chunking in the
> >> >> >>>>>>>>>>>> framework.
> >> >> >>>>> This
> >> >> >>>>>>>>>>>> always implies a chunk handling outside the component
> >> >> >>>> (typically
> >> >> >>>>>>>>>>>> before for an output). My point is I think IO need it in
> >> >> >>>>>>>>>>>> their
> >> >> >>>>> own
> >> >> >>>>>>>>>>>> "internal" or at least control it themselves since the
> >> >> >>>>>>>>>>>> chunk
> >> >> >>>>> size
> >> >> >>>>>> is
> >> >> >>>>>>>>>>>> part of the IO handling most of the time.
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> I think JB spoke of the same "group before" trick using
> >> >> >>>>>> restrictions
> >> >> >>>>>>>>>>>> which can work I have to admit if SDF are implemented by
> >> >> >>>>> runners.
> >> >> >>>>>> Is
> >> >> >>>>>>>>>>>> there a roadmap/status on that? Last time I checked SDF
> >> >> >>>>>>>>>>>> was a
> >> >> >>>>>> great
> >> >> >>>>>>>>>>>> API without support :(.
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> Romain Manni-Bucau
> >> >> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
> >> >> >>>>>>>>>>>> <kirpic...@google.com.invalid>:
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> JB, not sure what you mean? SDFs and triggers are
> >> >> >>>>>>>>>>>>> unrelated,
> >> >> >>>>> and
> >> >> >>>>>>> the
> >> >> >>>>>>>>>>>>> post
> >> >> >>>>>>>>>>>>> doesn't mention the word. Did you mean something else,
> >> >> >>>>>>>>>>>>> e.g.
> >> >> >>>>>>>>> restriction
> >> >> >>>>>>>>>>>>> perhaps? Either way I don't think SDFs are the solution
> >> >> >>>>>>>>>>>>> here;
> >> >> >>>>>> SDFs
> >> >> >>>>>>>>> have
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> to
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> do with the ability to split the processing of *a
> single
> >> >> >>>>> element*
> >> >> >>>>>>> over
> >> >> >>>>>>>>>>>>> multiple calls, whereas Romain I think is asking for
> >> >> >>>> repeatable
> >> >> >>>>>>>>> grouping
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> of
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> *multiple* elements.
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> Romain - does
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/
> >> >> >>>>>>> core/src/main/java/org/apache/beam/sdk/transforms/
> >> >> >>>> GroupIntoBatches.java
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> do what
> >> >> >>>>>>>>>>>>> you want?
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <
> >> >> >>>>>>>>> j...@nanthrax.net>
> >> >> >>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> It sounds like the "Trigger" in the Splittable DoFn,
> no
> >> >> >>>>>>>>>>>>>> ?
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>
> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.
> >> >> >>>> html
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> Regards
> >> >> >>>>>>>>>>>>>> JB
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>> it gives the fn/transform the ability to save a
> state -
> >> >> >>>>>>>>>>>>>>> it
> >> >> >>>>> can
> >> >> >>>>>>> get
> >> >> >>>>>>>>>>>>>>> back on "restart" / whatever unit we can use,
> probably
> >> >> >>>> runner
> >> >> >>>>>>>>>>>>>>> dependent? Without that you need to rewrite all IO
> >> >> >>>>>>>>>>>>>>> usage
> >> >> >>>> with
> >> >> >>>>>>>>>>>>>>> something like the previous pattern which makes the
> IO
> >> >> >>>>>>>>>>>>>>> not
> >> >> >>>>> self
> >> >> >>>>>>>>>>>>>>> sufficient and kind of makes the entry cost and usage
> >> >> >>>>>>>>>>>>>>> of
> >> >> >>>> beam
> >> >> >>>>>> way
> >> >> >>>>>>>>>>>>>>> further.
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>> In my mind it is exactly what jbatch/spring-batch
> uses
> >> >> >>>>>>>>>>>>>>> but
> >> >> >>>>>>> adapted
> >> >> >>>>>>>>> to
> >> >> >>>>>>>>>>>>>>> beam (stream in particular) case.
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>> Romain Manni-Bucau
> >> >> >>>>>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax
> >> >> >>>>> <re...@google.com.invalid
> >> >> >>>>>>> :
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> Romain,
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> Can you define what you mean by checkpoint? What are
> >> >> >>>>>>>>>>>>>>>> the
> >> >> >>>>>>> semantics,
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> what
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> does it accomplish?
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> Reuven
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau
> <
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> rmannibu...@gmail.com>
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> Yes, what I propose earlier was:
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> I. checkpoint marker:
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> @AnyBeamAnnotation
> >> >> >>>>>>>>>>>>>>>>> @CheckpointAfter
> >> >> >>>>>>>>>>>>>>>>> public void someHook(SomeContext ctx);
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> II. pipeline.apply(ParDo.of(new
> >> >> >>>>>>>>> MyFn()).withCheckpointAlgorithm(new
> >> >> >>>>>>>>>>>>>>>>> CountingAlgo()))
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> III. (I like this one less)
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> // in the dofn
> >> >> >>>>>>>>>>>>>>>>> @CheckpointTester
> >> >> >>>>>>>>>>>>>>>>> public boolean shouldCheckpoint();
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); in
> >> >> >>>>>>>>>>>>>>>>> the
> >> >> >>>> dofn
> >> >> >>>>>> per
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> element
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> Romain Manni-Bucau
> >> >> >>>>>>>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi
> >> >> >>>>>>> <rang...@google.com.invalid
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> :
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> How would you define it (rough API is fine)?.
> >> >> >>>>>>>>>>>>>>>>>> Without
> >> >> >>>> more
> >> >> >>>>>>>>> details,
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> it is
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> not easy to see wider applicability and
> feasibility
> >> >> >>>>>>>>>>>>>>>>>> in
> >> >> >>>>>>> runners.
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain
> Manni-Bucau
> >> >> >>>>>>>>>>>>>>>>>> <
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> rmannibu...@gmail.com>
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> This is a fair summary of the current state but
> >> >> >>>>>>>>>>>>>>>>>>> also
> >> >> >>>>> where
> >> >> >>>>>>> beam
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> can
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> have a
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> very strong added value and make big data great
> and
> >> >> >>>>> smooth.
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> Instead of this replay feature isnt checkpointing
> >> >> >>>>> willable?
> >> >> >>>>>>> In
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> particular
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> with SDF no?
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi"
> >> >> >>>>>>>>> <rang...@google.com.invalid>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> a
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> écrit :
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> Core issue here is that there is no explicit
> >> >> >>>>>>>>>>>>>>>>>>>> concept
> >> >> >>>> of
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> 'checkpoint'
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> in
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> Beam (UnboundedSource has a method
> >> >> >>>>>>>>>>>>>>>>>>>> 'getCheckpointMark'
> >> >> >>>>> but
> >> >> >>>>>>> that
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> refers to
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> the checkoint on external source). Runners do
> >> >> >>>> checkpoint
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> internally
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> as
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> implementation detail. Flink's checkpoint model
> is
> >> >> >>>>>> entirely
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> different
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> from
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> Dataflow's and Spark's.
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> @StableReplay helps, but it does not explicitly
> >> >> >>>>>>>>>>>>>>>>>>>> talk
> >> >> >>>>> about
> >> >> >>>>>> a
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> checkpoint
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> by
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> design.
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> If you are looking to achieve some guarantees
> with
> >> >> >>>>>>>>>>>>>>>>>>>> a
> >> >> >>>>>>>>> sink/DoFn, I
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> think
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> it
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> is better to start with the requirements. I
> worked
> >> >> >>>>>>>>>>>>>>>>>>>> on
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> exactly-once
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> sink
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> for
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where we
> >> >> >>>>>> essentially
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> reshard
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> elements and assign sequence numbers to elements
> >> >> >>>>>>>>>>>>>>>>>>>> with
> >> >> >>>> in
> >> >> >>>>>>> each
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> shard.
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> Duplicates in replays are avoided based on these
> >> >> >>>>> sequence
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> numbers.
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> DoFn
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> state API is used to buffer out-of order
> replays.
> >> >> >>>>>>>>>>>>>>>>>>>> The
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> implementation
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> strategy works in Dataflow but not in Flink
> which
> >> >> >>>>>>>>>>>>>>>>>>>> has
> >> >> >>>> a
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> horizontal
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> checkpoint. KafkaIO checks for compatibility.
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain
> >> >> >>>>>>>>>>>>>>>>>>>> Manni-Bucau
> >> >> >>>>>>>>>>>>>>>>>>>> <
> >> >> >>>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com>
> >> >>
> >> >> >>>>>>>>>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> Hi guys,
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> The subject is a bit provocative but the topic
> is
> >> >> >>>> real
> >> >> >>>>>> and
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> coming
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> again and again with the beam usage: how a dofn
> >> >> >>>>>>>>>>>>>>>>>>>>> can
> >> >> >>>>>> handle
> >> >> >>>>>>>>> some
> >> >> >>>>>>>>>>>>>>>>>>>>> "chunking".
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> The need is to be able to commit each N records
> >> >> >>>>>>>>>>>>>>>>>>>>> but
> >> >> >>>>> with
> >> >> >>>>>> N
> >> >> >>>>>>> not
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> too
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> big.
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> The natural API for that in beam is the bundle
> >> >> >>>>>>>>>>>>>>>>>>>>> one
> >> >> >>>> but
> >> >> >>>>>>> bundles
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> are
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> not
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> reliable since they can be very small (flink) -
> >> >> >>>>>>>>>>>>>>>>>>>>> we
> >> >> >>>> can
> >> >> >>>>>> say
> >> >> >>>>>>> it
> >> >> >>>>>>>>> is
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> "ok"
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> even if it has some perf impacts - or too big
> >> >> >>>>>>>>>>>>>>>>>>>>> (spark
> >> >> >>>>> does
> >> >> >>>>>>> full
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> size
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> /
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> #workers).
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> The workaround is what we see in the ES I/O: a
> >> >> >>>> maxSize
> >> >> >>>>>>> which
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> does
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> an
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> eager flush. The issue is that then the
> >> >> >>>>>>>>>>>>>>>>>>>>> checkpoint
> >> >> >>>>>>>>>>>>>>>>>>>>> is
> >> >> >>>>> not
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> respected
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> and you can process multiple times the same
> >> >> >>>>>>>>>>>>>>>>>>>>> records.
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> Any plan to make this API reliable and
> >> >> >>>>>>>>>>>>>>>>>>>>> controllable
> >> >> >>>>> from
> >> >> >>>>>> a
> >> >> >>>>>>>>> beam
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> point
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> of view (at least in a max manner)?
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> Thanks,
> >> >> >>>>>>>>>>>>>>>>>>>>> Romain Manni-Bucau
> >> >> >>>>>>>>>>>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github |
> >> >> >>>>>>>>>>>>>>>>>>>>> LinkedIn
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> --
> >> >> >>>>>>>>>>>>>> Jean-Baptiste Onofré
> >> >> >>>>>>>>>>>>>> jbono...@apache.org
> >> >> >>>>>>>>>>>>>> http://blog.nanthrax.net
> >> >> >>>>>>>>>>>>>> Talend - http://www.talend.com
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> --
> >> >> >>>>>>>>>> Jean-Baptiste Onofré
> >> >> >>>>>>>>>> jbono...@apache.org
> >> >> >>>>>>>>>> http://blog.nanthrax.net
> >> >> >>>>>>>>>> Talend - http://www.talend.com
> >> >> >>>>>>>>>
> >> >> >>>>>>>
> >> >> >>>>>>
> >> >> >>>>>
> >> >> >>>>
> >> >>
> >> >> --
> >> >> Jean-Baptiste Onofré
> >> >> jbono...@apache.org
> >> >> http://blog.nanthrax.net
> >> >> Talend - http://www.talend.com
> >>
> >
>
>

Reply via email to