"First immediately blocking issue is how to batch records reliably and
*portably* (the biggest beam added-value IMHO).
Since bundles are "flush" trigger for most IO it means ensuring the
bundle size is somehow controllable or at least not set to a very
small value OOTB."

Please cite an existing IO that currently suffers from this issue: I'm not
aware of any.

On Thu, Nov 30, 2017, 9:46 AM Romain Manni-Bucau <[email protected]>
wrote:

> @Ben: would all IO be rewritten to use that and the bundle concept
> dropped from the API to avoid any ambiguity and misleading usage like
> in current IOs?
>
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>
>
> 2017-11-30 18:43 GMT+01:00 Ben Chambers <[email protected]>:
> > Beam includes a GroupIntoBatches transform (see
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
> )
> > which I believe was intended to be used as part of such a portable IO. It
> > can be used to request that elements are divided into batches of some
> size
> > which can then be used for further processing.
> >
> > On Thu, Nov 30, 2017 at 9:32 AM Romain Manni-Bucau <
> [email protected]>
> > wrote:
> >>
> >> 2017-11-30 18:11 GMT+01:00 Eugene Kirpichov <[email protected]>:
> >> > Very strong -1 from me:
> >> > - Having a pipeline-global parameter is bad because it will apply to
> all
> >> > transforms, with no ability to control it for individual transforms.
> >> > This
> >> > can go especially poorly because it means that when I write a
> transform,
> >> > I
> >> > don't know whether a user will set this parameter in their pipeline
> to a
> >> > value that's perhaps good for the user's transform but really bad for
> my
> >> > transform; and the user will likely blame my transform for poor
> >> > performance.
> >> > A parameter like this should be set on exactly the thing it applies
> to:
> >> > e.g.
> >> > on the particular IO; and it should be set by the IO itself, not by a
> >> > user
> >> > in pipeline options, because the IO author likely knows better than a
> >> > user
> >> > what is a good value.
> >>
> >> This is true and this is worse today since the user can't tune it but
> >> the IO doesn't handle it as well. it is up to the runner and none
> >> implement it in a way which is IO friendly -check flink and spark
> >> which do the exact opposite, bundle=1 vs bundle=datatset/partitions)
> >>
> >> Also note it is a "max" and not an exact value in the proposal.
> >>
> >> > - The parameter will not achieve what many IOs want, either. In some
> >> > cases,
> >> > you want to limit the number of bytes you write. In some cases, you
> want
> >> > to
> >> > limit the number of values within a key that you write. In some cases,
> >> > it's
> >> > something else - it isn't always elements.
> >>
> >> Elements is the only thing users can really tune since you can't
> >> assume the content.
> >>
> >> > - The parameter will achieve none of the issues that you I think
> raised
> >> > in
> >> > the thread above: it doesn't give deterministic replay, nor any kind
> of
> >> > fault tolerance.
> >>
> >> Right, it only partially solves the first issue popping up: the
> >> chunking. However I think it is a quick win.
> >>
> >> > - Having a parameter like this *at all* goes against Beam's "no knobs"
> >> > philosophy - for all the usual reasons: 1) it encourages users to
> waste
> >> > time
> >> > looking in the wrong places when doing performance tuning: tuning
> >> > parameters
> >> > is almost never the best way to improve performance; 2) when users can
> >> > set a
> >> > tuning parameter, in my experience it is almost always set wrong, or
> >> > perhaps
> >> > it was once set right but then nobody updates it when the use case or
> >> > implementation changes; and we can end up in a situation where the
> >> > pipeline
> >> > is performing poorly because of the parameter but the runner isn't
> >> > allowed
> >> > to choose a better value. (in experience with legacy data processing
> >> > systems
> >> > in Google, like MapReduce, that support plenty of tuning parameters, a
> >> > very
> >> > common advice to someone complaining about a poorly performing job is
> >> > "have
> >> > you tried removing all your parameters?")
> >>
> >> I would be fine with that but what is the alternative?
> >>
> >> > - I still fail to understand the exact issue we're talking about, and
> >> > I've
> >> > made a number of suggestions as to how this understanding could be
> >> > achieved:
> >> > show code that demonstrates the issue; and show how the code could be
> >> > improved by a hypothetical API.
> >>
> >> First immediately blocking issue is how to batch records reliably and
> >> *portably* (the biggest beam added-value IMHO).
> >> Since bundles are "flush" trigger for most IO it means ensuring the
> >> bundle size is somehow controllable or at least not set to a very
> >> small value OOTB.
> >>
> >> An alternative to this proposal can be to let an IO give an hint about
> >> its desired bundle size. Would work as well for that particular issue.
> >> Does it sound better?
> >>
> >> >
> >> > On Thu, Nov 30, 2017 at 6:17 AM Jean-Baptiste Onofré <[email protected]
> >
> >> > wrote:
> >> >>
> >> >> It sounds reasonable to me.
> >> >>
> >> >> And agree for Spark, I would like to merge Spark 2 update first.
> >> >>
> >> >> Regards
> >> >> JB
> >> >>
> >> >> On 11/30/2017 03:09 PM, Romain Manni-Bucau wrote:
> >> >> > Guys,
> >> >> >
> >> >> > what about moving getMaxBundleSize from flink options to pipeline
> >> >> > options. I think all runners can support it right? Spark code needs
> >> >> > the merge of the v2 before being able to be implemented probably
> but
> >> >> > I
> >> >> > don't see any blocker.
> >> >> >
> >> >> > wdyt?
> >> >> >
> >> >> > Romain Manni-Bucau
> >> >> > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >
> >> >> >
> >> >> > 2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau <
> [email protected]>:
> >> >> >> @Eugene: "workaround" as specific to the IO each time and
> therefore
> >> >> >> still highlight a lack in the core.
> >> >> >>
> >> >> >> Other comments inline
> >> >> >>
> >> >> >>
> >> >> >> 2017-11-19 7:40 GMT+01:00 Robert Bradshaw
> >> >> >> <[email protected]>:
> >> >> >>> There is a possible fourth issue that we don't handle well:
> >> >> >>> efficiency. For
> >> >> >>> very large bundles, it may be advantageous to avoid replaying a
> >> >> >>> bunch
> >> >> >>> of
> >> >> >>> idempotent operations if there were a way to record what ones
> we're
> >> >> >>> sure
> >> >> >>> went through. Not sure if that's the issue here (though one could
> >> >> >>> possibly
> >> >> >>> do this with SDFs, one can preemptively returning periodically
> >> >> >>> before
> >> >> >>> an
> >> >> >>> element (or portion thereof) is done).
> >> >> >>
> >> >> >> +1, also lead to the IO handling its own chunking/bundles and
> >> >> >> therefore solves all issues at once.
> >> >> >>
> >> >> >>>
> >> >> >>> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov <
> >> >> >>> [email protected]> wrote:
> >> >> >>>
> >> >> >>>> I disagree that the usage of document id in ES is a
> "workaround" -
> >> >> >>>> it
> >> >> >>>> does
> >> >> >>>> not address any *accidental *complexity
> >> >> >>>> <https://en.wikipedia.org/wiki/No_Silver_Bullet> coming from
> >> >> >>>> shortcomings
> >> >> >>>> of Beam, it addresses the *essential* complexity that a
> >> >> >>>> distributed
> >> >> >>>> system
> >> >> >>>> forces one to take it as a fact of nature that the same write
> >> >> >>>> (mutation) will happen multiple times, so if you want a mutation
> >> >> >>>> to
> >> >> >>>> happen
> >> >> >>>> "as-if" it happened exactly once, the mutation itself must be
> >> >> >>>> idempotent
> >> >> >>>> <https://en.wikipedia.org/wiki/Idempotence>. Insert-with-id
> >> >> >>>> (upsert
> >> >> >>>> <https://en.wikipedia.org/wiki/Merge_(SQL)>) is a classic
> example
> >> >> >>>> of
> >> >> >>>> an
> >> >> >>>> idempotent mutation, and it's very good that Elasticsearch
> >> >> >>>> provides
> >> >> >>>> it - if
> >> >> >>>> it didn't, no matter how good of an API Beam had, achieving
> >> >> >>>> exactly-once
> >> >> >>>> writes would be theoretically impossible. Are we in agreement on
> >> >> >>>> this
> >> >> >>>> so
> >> >> >>>> far?
> >> >> >>>>
> >> >> >>>> Next: you seem to be discussing 3 issues together, all of which
> >> >> >>>> are
> >> >> >>>> valid
> >> >> >>>> issues, but they seem unrelated to me:
> >> >> >>>> 1. Exactly-once mutation
> >> >> >>>> 2. Batching multiple mutations into one RPC.
> >> >> >>>> 3. Backpressure
> >> >> >>>>
> >> >> >>>> #1: was considered above. The system the IO is talking to has to
> >> >> >>>> support
> >> >> >>>> idempotent mutations, in an IO-specific way, and the IO has to
> >> >> >>>> take
> >> >> >>>> advantage of them, in the IO-specific way - end of story.
> >> >> >>
> >> >> >> Agree but don't forget the original point was about "chunks" and
> not
> >> >> >> individual records.
> >> >> >>
> >> >> >>>>
> >> >> >>>> #2: a batch of idempotent operations is also idempotent, so this
> >> >> >>>> doesn't
> >> >> >>>> add anything new semantically. Syntactically - Beam already
> allows
> >> >> >>>> you to
> >> >> >>>> write your own batching by notifying you of permitted batch
> >> >> >>>> boundaries
> >> >> >>>> (Start/FinishBundle). Sure, it could do more, but from my
> >> >> >>>> experience
> >> >> >>>> the
> >> >> >>>> batching in IOs I've seen is one of the easiest and least
> >> >> >>>> error-prone
> >> >> >>>> parts, so I don't see something worth an extended discussion
> here.
> >> >> >>
> >> >> >> "Beam already allows you to
> >> >> >>   write your own batching by notifying you of permitted batch
> >> >> >> boundaries
> >> >> >>   (Start/FinishBundle)"
> >> >> >>
> >> >> >> Is wrong since the bundle is potentially the whole PCollection
> >> >> >> (spark)
> >> >> >> so this is not even an option until you use the SDF (back to the
> >> >> >> same
> >> >> >> point).
> >> >> >> Once again the API looks fine but no implementation makes it true.
> >> >> >> It
> >> >> >> would be easy to change it in spark, flink can be ok since it
> >> >> >> targets
> >> >> >> more the streaming case, not sure of others, any idea?
> >> >> >>
> >> >> >>
> >> >> >>>>
> >> >> >>>> #3: handling backpressure is a complex problem with multiple
> >> >> >>>> facets:
> >> >> >>>> 1) how
> >> >> >>>> do you know you're being throttled, and by how much are you
> >> >> >>>> exceeding
> >> >> >>>> the
> >> >> >>>> external system's capacity?
> >> >> >>
> >> >> >> This is the whole point of backpressure, the system sends it back
> to
> >> >> >> you (header like or status technic in general)
> >> >> >>
> >> >> >>>> 2) how do you communicate this signal to the
> >> >> >>>> runner?
> >> >> >>
> >> >> >> You are a client so you get the meta in the response - whatever
> >> >> >> techno.
> >> >> >>
> >> >> >>>> 3) what does the runner do in response?
> >> >> >>
> >> >> >> Runner nothing but the IO adapts its handling as mentionned before
> >> >> >> (wait and retry, skip, ... depending the config)
> >> >> >>
> >> >> >>>> 4) how do you wait until
> >> >> >>>> it's ok to try again?
> >> >> >>
> >> >> >> This is one point to probably enhance in beam but waiting in the
> >> >> >> processing is an option if the source has some buffering otherwise
> >> >> >> it
> >> >> >> requires to have a buffer fallback and max size if the wait mode
> is
> >> >> >> activated.
> >> >> >>
> >> >> >>>> You seem to be advocating for solving one facet of this problem,
> >> >> >>>> which is:
> >> >> >>>> you want it to be possible to signal to the runner "I'm being
> >> >> >>>> throttled,
> >> >> >>>> please end the bundle", right? If so - I think this (ending the
> >> >> >>>> bundle) is
> >> >> >>>> unnecessary: the DoFn can simply do an exponential back-off
> sleep
> >> >> >>>> loop.
> >> >> >>
> >> >> >> Agree, never said the runner should know but GBK+output doesnt
> work
> >> >> >> cause you dont own the GBK.
> >> >> >>
> >> >> >>>> This is e.g. what DatastoreIO does
> >> >> >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/
> >> >> >>>> google-cloud-platform/src/main/java/org/apache/beam/sdk/
> >> >> >>>> io/gcp/datastore/DatastoreV1.java#L1318>
> >> >> >>>> and
> >> >> >>>> this is in general how most systems I've seen handle
> backpressure.
> >> >> >>>> Is
> >> >> >>>> there
> >> >> >>>> something I'm missing? In particular, is there any compelling
> >> >> >>>> reason
> >> >> >>>> why
> >> >> >>>> you think it'd be beneficial e.g. for DatastoreIO to commit the
> >> >> >>>> results of
> >> >> >>>> the bundle so far before processing other elements?
> >> >> >>
> >> >> >> It was more about ensuring you validate early a subset of the
> whole
> >> >> >> bundle and avoid to reprocess it if it fails later.
> >> >> >>
> >> >> >>
> >> >> >> So to summarize I see 2 outcomes:
> >> >> >>
> >> >> >> 1. impl SDF in all runners
> >> >> >> 2. make the bundle size upper bounded - through a pipeline option
> -
> >> >> >> in
> >> >> >> all runners, not sure this one is doable everywhere since I mainly
> >> >> >> checked spark case
> >> >> >>
> >> >> >>>>
> >> >> >>>> Again, it might be that I'm still misunderstanding what you're
> >> >> >>>> trying
> >> >> >>>> to
> >> >> >>>> say. One of the things it would help to clarify would be -
> exactly
> >> >> >>>> what do
> >> >> >>>> you mean by "how batch frameworks solved that for years": can
> you
> >> >> >>>> point at
> >> >> >>>> an existing API in some other framework that achieves what you
> >> >> >>>> want?
> >> >> >>>>
> >> >> >>>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau
> >> >> >>>> <[email protected]>
> >> >> >>>> wrote:
> >> >> >>>>
> >> >> >>>>> Eugene, point - and issue with a single sample - is you can
> >> >> >>>>> always
> >> >> >>>>> find
> >> >> >>>>> *workarounds* on a case by case basis as the id one with ES but
> >> >> >>>>> beam
> >> >> >>>> doesnt
> >> >> >>>>> solve the problem as a framework.
> >> >> >>>>>
> >> >> >>>>>  From my past, I clearly dont see how batch frameworks solved
> >> >> >>>>> that
> >> >> >>>>> for
> >> >> >>>> years
> >> >> >>>>> and beam is not able to do it - keep in mind it is the same
> kind
> >> >> >>>>> of
> >> >> >>>> techno,
> >> >> >>>>> it just uses different sources and bigger clusters so no real
> >> >> >>>>> reason
> >> >> >>>>> to
> >> >> >>>> not
> >> >> >>>>> have the same feature quality. The only potential reason i see
> is
> >> >> >>>>> there
> >> >> >>>> is
> >> >> >>>>> no tracking of the state into the cluster - e2e. But i dont see
> >> >> >>>>> why
> >> >> >>>>> there
> >> >> >>>>> wouldnt be. Do I miss something here?
> >> >> >>>>>
> >> >> >>>>> An example could be: take a github crawler computing stats on
> the
> >> >> >>>>> whole
> >> >> >>>>> girhub repos which is based on a rest client as example. You
> will
> >> >> >>>>> need to
> >> >> >>>>> handle the rate limit and likely want to "commit" each time you
> >> >> >>>>> reach a
> >> >> >>>>> rate limit with likely some buffering strategy with a max size
> >> >> >>>>> before
> >> >> >>>>> really waiting. How do you do it with a GBK independent of your
> >> >> >>>>> dofn? You
> >> >> >>>>> are not able to compose correctly the fn between them :(.
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>> Le 18 nov. 2017 20:48, "Eugene Kirpichov"
> >> >> >>>>> <[email protected]>
> >> >> >>>> a
> >> >> >>>>> écrit :
> >> >> >>>>>
> >> >> >>>>> After giving this thread my best attempt at understanding
> exactly
> >> >> >>>>> what is
> >> >> >>>>> the problem and the proposed solution, I'm afraid I still fail
> to
> >> >> >>>>> understand both. To reiterate, I think the only way to make
> >> >> >>>>> progress
> >> >> >>>>> here
> >> >> >>>>> is to be more concrete: (quote) take some IO that you think
> could
> >> >> >>>>> be
> >> >> >>>> easier
> >> >> >>>>> to write with your proposed API, give the contents of a
> >> >> >>>>> hypothetical
> >> >> >>>>> PCollection being written to this IO, give the code of a
> >> >> >>>>> hypothetical
> >> >> >>>> DoFn
> >> >> >>>>> implementing the write using your API, and explain what you'd
> >> >> >>>>> expect
> >> >> >>>>> to
> >> >> >>>>> happen at runtime. I'm going to re-engage in this thread after
> >> >> >>>>> such
> >> >> >>>>> an
> >> >> >>>>> example is given.
> >> >> >>>>>
> >> >> >>>>> On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau
> >> >> >>>>> <[email protected]>
> >> >> >>>>> wrote:
> >> >> >>>>>
> >> >> >>>>>> First bundle retry is unusable with dome runners like spark
> >> >> >>>>>> where
> >> >> >>>>>> the
> >> >> >>>>>> bundle size is the collection size / number of work. This
> means
> >> >> >>>>>> a
> >> >> >>>>>> user
> >> >> >>>>> cant
> >> >> >>>>>> use bundle API or feature reliably and portably - which is
> beam
> >> >> >>>> promise.
> >> >> >>>>>> Aligning chunking and bundles would guarantee that bit can be
> >> >> >>>>>> not
> >> >> >>>>> desired,
> >> >> >>>>>> that is why i thought it can be another feature.
> >> >> >>>>>>
> >> >> >>>>>> GBK works until the IO knows about that and both concepts are
> >> >> >>>>>> not
> >> >> >>>> always
> >> >> >>>>>> orthogonal - backpressure like systems is a trivial common
> >> >> >>>>>> example.
> >> >> >>>> This
> >> >> >>>>>> means the IO (dofn) must be able to do it itself at some
> point.
> >> >> >>>>>>
> >> >> >>>>>> Also note the GBK works only if the IO can take a list which
> is
> >> >> >>>>>> never
> >> >> >>>> the
> >> >> >>>>>> case today.
> >> >> >>>>>>
> >> >> >>>>>> Big questions for me are: is SDF the way to go since it
> provides
> >> >> >>>>>> the
> >> >> >>>>> needed
> >> >> >>>>>> API bit is not yet supported? What about existing IO? Should
> >> >> >>>>>> beam
> >> >> >>>> provide
> >> >> >>>>>> an auto wrapping of dofn for that pre-aggregated support and
> >> >> >>>>>> simulate
> >> >> >>>>>> bundles to the actual IO impl to keep the existing API?
> >> >> >>>>>>
> >> >> >>>>>>
> >> >> >>>>>> Le 17 nov. 2017 19:20, "Raghu Angadi"
> >> >> >>>>>> <[email protected]>
> >> >> >>>>>> a
> >> >> >>>>>> écrit :
> >> >> >>>>>>
> >> >> >>>>>> On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau <
> >> >> >>>>> [email protected]
> >> >> >>>>>>>
> >> >> >>>>>> wrote:
> >> >> >>>>>>
> >> >> >>>>>>> Yep, just take ES IO, if a part of a bundle fails you are in
> an
> >> >> >>>>>>> unmanaged state. This is the case for all O (of IO ;)). Issue
> >> >> >>>>>>> is
> >> >> >>>>>>> not
> >> >> >>>>>>> much about "1" (the code it takes) but more the fact it
> doesn't
> >> >> >>>>>>> integrate with runner features and retries potentially: what
> >> >> >>>>>>> happens
> >> >> >>>>>>> if a bundle has a failure? => undefined today. 2. I'm fine
> with
> >> >> >>>>>>> it
> >> >> >>>>>>> while we know exactly what happens when we restart after a
> >> >> >>>>>>> bundle
> >> >> >>>>>>> failure. With ES the timestamp can be used for instance.
> >> >> >>>>>>>
> >> >> >>>>>>
> >> >> >>>>>> This deterministic batching can be achieved even now with an
> >> >> >>>>>> extra
> >> >> >>>>>> GroupByKey (and if you want ordering on top of that, will need
> >> >> >>>>>> another
> >> >> >>>>>> GBK). Don't know if that is too costly in your case. I would
> >> >> >>>>>> need
> >> >> >>>>>> bit
> >> >> >>>>> more
> >> >> >>>>>> details on handling ES IO write retries to see it could be
> >> >> >>>>>> simplified.
> >> >> >>>>> Note
> >> >> >>>>>> that retries occur with or without any failures in your DoFn.
> >> >> >>>>>>
> >> >> >>>>>> The biggest negative with GBK approach is that it doesn't
> >> >> >>>>>> provide
> >> >> >>>>>> same
> >> >> >>>>>> guarantees on Flink.
> >> >> >>>>>>
> >> >> >>>>>> I don't see how GroubIntoBatches in Beam provides specific
> >> >> >>>>>> guarantees
> >> >> >>>> on
> >> >> >>>>>> deterministic batches.
> >> >> >>>>>>
> >> >> >>>>>> Thinking about it the SDF is really a way to do it since the
> SDF
> >> >> >>>>>> will
> >> >> >>>>>>> manage the bulking and associated with the runner "retry" it
> >> >> >>>>>>> seems
> >> >> >>>>>>> it
> >> >> >>>>>>> covers the needs.
> >> >> >>>>>>>
> >> >> >>>>>>> Romain Manni-Bucau
> >> >> >>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >>>>>>>
> >> >> >>>>>>>
> >> >> >>>>>>> 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov
> >> >> >>>>> <[email protected]
> >> >> >>>>>>> :
> >> >> >>>>>>>> I must admit I'm still failing to understand the problem, so
> >> >> >>>>>>>> let's
> >> >> >>>>> step
> >> >> >>>>>>>> back even further.
> >> >> >>>>>>>>
> >> >> >>>>>>>> Could you give an example of an IO that is currently
> difficult
> >> >> >>>>>>>> to
> >> >> >>>>>>> implement
> >> >> >>>>>>>> specifically because of lack of the feature you're talking
> >> >> >>>>>>>> about?
> >> >> >>>>>>>>
> >> >> >>>>>>>> I'm asking because I've reviewed almost all Beam IOs and
> don't
> >> >> >>>> recall
> >> >> >>>>>>>> seeing a similar problem. Sure, a lot of IOs do batching
> >> >> >>>>>>>> within a
> >> >> >>>>>> bundle,
> >> >> >>>>>>>> but 1) it doesn't take up much code (granted, it would be
> even
> >> >> >>>> easier
> >> >> >>>>>> if
> >> >> >>>>>>>> Beam did it for us) and 2) I don't remember any of them
> >> >> >>>>>>>> requiring
> >> >> >>>> the
> >> >> >>>>>>>> batches to be deterministic, and I'm having a hard time
> >> >> >>>>>>>> imagining
> >> >> >>>>> what
> >> >> >>>>>>> kind
> >> >> >>>>>>>> of storage system would be able to deduplicate if batches
> were
> >> >> >>>>>>>> deterministic but wouldn't be able to deduplicate if they
> >> >> >>>>>>>> weren't.
> >> >> >>>>>>>>
> >> >> >>>>>>>> On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau <
> >> >> >>>>>>> [email protected]>
> >> >> >>>>>>>> wrote:
> >> >> >>>>>>>>
> >> >> >>>>>>>>> Ok, let me try to step back and summarize what we have
> today
> >> >> >>>>>>>>> and
> >> >> >>>>> what
> >> >> >>>>>> I
> >> >> >>>>>>>>> miss:
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> 1. we can handle chunking in beam through group in batch
> (or
> >> >> >>>>>> equivalent)
> >> >> >>>>>>>>> but:
> >> >> >>>>>>>>>     > it is not built-in into the transforms (IO) and it is
> >> >> >>>>> controlled
> >> >> >>>>>>>>> from outside the transforms so no way for a transform to do
> >> >> >>>>>>>>> it
> >> >> >>>>>>>>> properly without handling itself a composition and links
> >> >> >>>>>>>>> between
> >> >> >>>>>>>>> multiple dofns to have notifications and potentially react
> >> >> >>>> properly
> >> >> >>>>> or
> >> >> >>>>>>>>> handle backpressure from its backend
> >> >> >>>>>>>>> 2. there is no restart feature because there is no real
> state
> >> >> >>>>> handling
> >> >> >>>>>>>>> at the moment. this sounds fully delegated to the runner
> but
> >> >> >>>>>>>>> I
> >> >> >>>>>>>>> was
> >> >> >>>>>>>>> hoping to have more guarantees from the used API to be able
> >> >> >>>>>>>>> to
> >> >> >>>>> restart
> >> >> >>>>>>>>> a pipeline (mainly batch since it can be irrelevant or
> >> >> >>>>>>>>> delegates
> >> >> >>>> to
> >> >> >>>>>>>>> the backend for streams) and handle only not commited
> records
> >> >> >>>>>>>>> so
> >> >> >>>> it
> >> >> >>>>>>>>> requires some persistence outside the main IO storages to
> do
> >> >> >>>>>>>>> it
> >> >> >>>>>>>>> properly
> >> >> >>>>>>>>>     > note this is somehow similar to the monitoring topic
> >> >> >>>>>>>>> which
> >> >> >>>> miss
> >> >> >>>>>>>>> persistence ATM so it can end up to beam to have a
> pluggable
> >> >> >>>> storage
> >> >> >>>>>>>>> for a few concerns
> >> >> >>>>>>>>>
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> Short term I would be happy with 1 solved properly, long
> term
> >> >> >>>>>>>>> I
> >> >> >>>> hope
> >> >> >>>>> 2
> >> >> >>>>>>>>> will be tackled without workarounds requiring custom
> wrapping
> >> >> >>>>>>>>> of
> >> >> >>>> IO
> >> >> >>>>> to
> >> >> >>>>>>>>> use a custom state persistence.
> >> >> >>>>>>>>>
> >> >> >>>>>>>>>
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> Romain Manni-Bucau
> >> >> >>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >>>>>>>>>
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré
> >> >> >>>>>>>>> <[email protected]>:
> >> >> >>>>>>>>>> Thanks for the explanation. Agree, we might talk about
> >> >> >>>>>>>>>> different
> >> >> >>>>>>> things
> >> >> >>>>>>>>>> using the same wording.
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> I'm also struggling to understand the use case (for a
> >> >> >>>>>>>>>> generic
> >> >> >>>>> DoFn).
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> Regards
> >> >> >>>>>>>>>> JB
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> On 11/17/2017 07:40 AM, Eugene Kirpichov wrote:
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> To avoid spending a lot of time pursuing a false path,
> I'd
> >> >> >>>>>>>>>>> like
> >> >> >>>>> to
> >> >> >>>>>>> say
> >> >> >>>>>>>>>>> straight up that SDF is definitely not going to help
> here,
> >> >> >>>>> despite
> >> >> >>>>>>> the
> >> >> >>>>>>>>>>> fact
> >> >> >>>>>>>>>>> that its API includes the term "checkpoint". In SDF, the
> >> >> >>>>>> "checkpoint"
> >> >> >>>>>>>>>>> captures the state of processing within a single element.
> >> >> >>>>>>>>>>> If
> >> >> >>>>> you're
> >> >> >>>>>>>>>>> applying an SDF to 1000 elements, it will, like any other
> >> >> >>>>>>>>>>> DoFn,
> >> >> >>>>> be
> >> >> >>>>>>>>> applied
> >> >> >>>>>>>>>>> to each of them independently and in parallel, and you'll
> >> >> >>>>>>>>>>> have
> >> >> >>>>> 1000
> >> >> >>>>>>>>>>> checkpoints capturing the state of processing each of
> these
> >> >> >>>>>> elements,
> >> >> >>>>>>>>>>> which
> >> >> >>>>>>>>>>> is probably not what you want.
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> I'm afraid I still don't understand what kind of
> checkpoint
> >> >> >>>>>>>>>>> you
> >> >> >>>>>>> need, if
> >> >> >>>>>>>>>>> it
> >> >> >>>>>>>>>>> is not just deterministic grouping into batches.
> >> >> >>>>>>>>>>> "Checkpoint"
> >> >> >>>> is
> >> >> >>>>> a
> >> >> >>>>>>> very
> >> >> >>>>>>>>>>> broad term and it's very possible that everybody in this
> >> >> >>>>>>>>>>> thread
> >> >> >>>>> is
> >> >> >>>>>>>>> talking
> >> >> >>>>>>>>>>> about different things when saying it. So it would help
> if
> >> >> >>>>>>>>>>> you
> >> >> >>>>>> could
> >> >> >>>>>>>>> give
> >> >> >>>>>>>>>>> a
> >> >> >>>>>>>>>>> more concrete example: for example, take some IO that you
> >> >> >>>>>>>>>>> think
> >> >> >>>>>>> could be
> >> >> >>>>>>>>>>> easier to write with your proposed API, give the contents
> >> >> >>>>>>>>>>> of a
> >> >> >>>>>>>>>>> hypothetical
> >> >> >>>>>>>>>>> PCollection being written to this IO, give the code of a
> >> >> >>>>>> hypothetical
> >> >> >>>>>>>>> DoFn
> >> >> >>>>>>>>>>> implementing the write using your API, and explain what
> >> >> >>>>>>>>>>> you'd
> >> >> >>>>>> expect
> >> >> >>>>>>> to
> >> >> >>>>>>>>>>> happen at runtime.
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau
> >> >> >>>>>>>>>>> <[email protected]>
> >> >> >>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>>> @Eugene: yes and the other alternative of Reuven too but
> >> >> >>>>>>>>>>>> it
> >> >> >>>>>>>>>>>> is
> >> >> >>>>>> still
> >> >> >>>>>>>>>>>> 1. relying on timers, 2. not really checkpointed
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> In other words it seems all solutions are to create a
> >> >> >>>>>>>>>>>> chunk
> >> >> >>>>>>>>>>>> of
> >> >> >>>>>> size
> >> >> >>>>>>> 1
> >> >> >>>>>>>>>>>> and replayable to fake the lack of chunking in the
> >> >> >>>>>>>>>>>> framework.
> >> >> >>>>> This
> >> >> >>>>>>>>>>>> always implies a chunk handling outside the component
> >> >> >>>> (typically
> >> >> >>>>>>>>>>>> before for an output). My point is I think IO need it in
> >> >> >>>>>>>>>>>> their
> >> >> >>>>> own
> >> >> >>>>>>>>>>>> "internal" or at least control it themselves since the
> >> >> >>>>>>>>>>>> chunk
> >> >> >>>>> size
> >> >> >>>>>> is
> >> >> >>>>>>>>>>>> part of the IO handling most of the time.
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> I think JB spoke of the same "group before" trick using
> >> >> >>>>>> restrictions
> >> >> >>>>>>>>>>>> which can work I have to admit if SDF are implemented by
> >> >> >>>>> runners.
> >> >> >>>>>> Is
> >> >> >>>>>>>>>>>> there a roadmap/status on that? Last time I checked SDF
> >> >> >>>>>>>>>>>> was a
> >> >> >>>>>> great
> >> >> >>>>>>>>>>>> API without support :(.
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> Romain Manni-Bucau
> >> >> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
> >> >> >>>>>>>>>>>> <[email protected]>:
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> JB, not sure what you mean? SDFs and triggers are
> >> >> >>>>>>>>>>>>> unrelated,
> >> >> >>>>> and
> >> >> >>>>>>> the
> >> >> >>>>>>>>>>>>> post
> >> >> >>>>>>>>>>>>> doesn't mention the word. Did you mean something else,
> >> >> >>>>>>>>>>>>> e.g.
> >> >> >>>>>>>>> restriction
> >> >> >>>>>>>>>>>>> perhaps? Either way I don't think SDFs are the solution
> >> >> >>>>>>>>>>>>> here;
> >> >> >>>>>> SDFs
> >> >> >>>>>>>>> have
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> to
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> do with the ability to split the processing of *a
> single
> >> >> >>>>> element*
> >> >> >>>>>>> over
> >> >> >>>>>>>>>>>>> multiple calls, whereas Romain I think is asking for
> >> >> >>>> repeatable
> >> >> >>>>>>>>> grouping
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> of
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> *multiple* elements.
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> Romain - does
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/
> >> >> >>>>>>> core/src/main/java/org/apache/beam/sdk/transforms/
> >> >> >>>> GroupIntoBatches.java
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> do what
> >> >> >>>>>>>>>>>>> you want?
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <
> >> >> >>>>>>>>> [email protected]>
> >> >> >>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> It sounds like the "Trigger" in the Splittable DoFn,
> no
> >> >> >>>>>>>>>>>>>> ?
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>
> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.
> >> >> >>>> html
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> Regards
> >> >> >>>>>>>>>>>>>> JB
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>> it gives the fn/transform the ability to save a
> state -
> >> >> >>>>>>>>>>>>>>> it
> >> >> >>>>> can
> >> >> >>>>>>> get
> >> >> >>>>>>>>>>>>>>> back on "restart" / whatever unit we can use,
> probably
> >> >> >>>> runner
> >> >> >>>>>>>>>>>>>>> dependent? Without that you need to rewrite all IO
> >> >> >>>>>>>>>>>>>>> usage
> >> >> >>>> with
> >> >> >>>>>>>>>>>>>>> something like the previous pattern which makes the
> IO
> >> >> >>>>>>>>>>>>>>> not
> >> >> >>>>> self
> >> >> >>>>>>>>>>>>>>> sufficient and kind of makes the entry cost and usage
> >> >> >>>>>>>>>>>>>>> of
> >> >> >>>> beam
> >> >> >>>>>> way
> >> >> >>>>>>>>>>>>>>> further.
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>> In my mind it is exactly what jbatch/spring-batch
> uses
> >> >> >>>>>>>>>>>>>>> but
> >> >> >>>>>>> adapted
> >> >> >>>>>>>>> to
> >> >> >>>>>>>>>>>>>>> beam (stream in particular) case.
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>> Romain Manni-Bucau
> >> >> >>>>>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax
> >> >> >>>>> <[email protected]
> >> >> >>>>>>> :
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> Romain,
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> Can you define what you mean by checkpoint? What are
> >> >> >>>>>>>>>>>>>>>> the
> >> >> >>>>>>> semantics,
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> what
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> does it accomplish?
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> Reuven
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau
> <
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> [email protected]>
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> Yes, what I propose earlier was:
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> I. checkpoint marker:
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> @AnyBeamAnnotation
> >> >> >>>>>>>>>>>>>>>>> @CheckpointAfter
> >> >> >>>>>>>>>>>>>>>>> public void someHook(SomeContext ctx);
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> II. pipeline.apply(ParDo.of(new
> >> >> >>>>>>>>> MyFn()).withCheckpointAlgorithm(new
> >> >> >>>>>>>>>>>>>>>>> CountingAlgo()))
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> III. (I like this one less)
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> // in the dofn
> >> >> >>>>>>>>>>>>>>>>> @CheckpointTester
> >> >> >>>>>>>>>>>>>>>>> public boolean shouldCheckpoint();
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); in
> >> >> >>>>>>>>>>>>>>>>> the
> >> >> >>>> dofn
> >> >> >>>>>> per
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> element
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> Romain Manni-Bucau
> >> >> >>>>>>>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi
> >> >> >>>>>>> <[email protected]
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> :
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> How would you define it (rough API is fine)?.
> >> >> >>>>>>>>>>>>>>>>>> Without
> >> >> >>>> more
> >> >> >>>>>>>>> details,
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> it is
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> not easy to see wider applicability and
> feasibility
> >> >> >>>>>>>>>>>>>>>>>> in
> >> >> >>>>>>> runners.
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain
> Manni-Bucau
> >> >> >>>>>>>>>>>>>>>>>> <
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> [email protected]>
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> This is a fair summary of the current state but
> >> >> >>>>>>>>>>>>>>>>>>> also
> >> >> >>>>> where
> >> >> >>>>>>> beam
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> can
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> have a
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> very strong added value and make big data great
> and
> >> >> >>>>> smooth.
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> Instead of this replay feature isnt checkpointing
> >> >> >>>>> willable?
> >> >> >>>>>>> In
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> particular
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> with SDF no?
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi"
> >> >> >>>>>>>>> <[email protected]>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> a
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> écrit :
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> Core issue here is that there is no explicit
> >> >> >>>>>>>>>>>>>>>>>>>> concept
> >> >> >>>> of
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> 'checkpoint'
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> in
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> Beam (UnboundedSource has a method
> >> >> >>>>>>>>>>>>>>>>>>>> 'getCheckpointMark'
> >> >> >>>>> but
> >> >> >>>>>>> that
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> refers to
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> the checkoint on external source). Runners do
> >> >> >>>> checkpoint
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> internally
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> as
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> implementation detail. Flink's checkpoint model
> is
> >> >> >>>>>> entirely
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> different
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> from
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> Dataflow's and Spark's.
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> @StableReplay helps, but it does not explicitly
> >> >> >>>>>>>>>>>>>>>>>>>> talk
> >> >> >>>>> about
> >> >> >>>>>> a
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> checkpoint
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> by
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> design.
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> If you are looking to achieve some guarantees
> with
> >> >> >>>>>>>>>>>>>>>>>>>> a
> >> >> >>>>>>>>> sink/DoFn, I
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> think
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> it
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> is better to start with the requirements. I
> worked
> >> >> >>>>>>>>>>>>>>>>>>>> on
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> exactly-once
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> sink
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> for
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where we
> >> >> >>>>>> essentially
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> reshard
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> elements and assign sequence numbers to elements
> >> >> >>>>>>>>>>>>>>>>>>>> with
> >> >> >>>> in
> >> >> >>>>>>> each
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> shard.
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> Duplicates in replays are avoided based on these
> >> >> >>>>> sequence
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> numbers.
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> DoFn
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> state API is used to buffer out-of order
> replays.
> >> >> >>>>>>>>>>>>>>>>>>>> The
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> implementation
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> strategy works in Dataflow but not in Flink
> which
> >> >> >>>>>>>>>>>>>>>>>>>> has
> >> >> >>>> a
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> horizontal
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> checkpoint. KafkaIO checks for compatibility.
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain
> >> >> >>>>>>>>>>>>>>>>>>>> Manni-Bucau
> >> >> >>>>>>>>>>>>>>>>>>>> <
> >> >> >>>>>>>>>>>>>>>>>>>> [email protected]>
> >> >>
> >> >> >>>>>>>>>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> Hi guys,
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> The subject is a bit provocative but the topic
> is
> >> >> >>>> real
> >> >> >>>>>> and
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> coming
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> again and again with the beam usage: how a dofn
> >> >> >>>>>>>>>>>>>>>>>>>>> can
> >> >> >>>>>> handle
> >> >> >>>>>>>>> some
> >> >> >>>>>>>>>>>>>>>>>>>>> "chunking".
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> The need is to be able to commit each N records
> >> >> >>>>>>>>>>>>>>>>>>>>> but
> >> >> >>>>> with
> >> >> >>>>>> N
> >> >> >>>>>>> not
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> too
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> big.
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> The natural API for that in beam is the bundle
> >> >> >>>>>>>>>>>>>>>>>>>>> one
> >> >> >>>> but
> >> >> >>>>>>> bundles
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> are
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> not
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> reliable since they can be very small (flink) -
> >> >> >>>>>>>>>>>>>>>>>>>>> we
> >> >> >>>> can
> >> >> >>>>>> say
> >> >> >>>>>>> it
> >> >> >>>>>>>>> is
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> "ok"
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> even if it has some perf impacts - or too big
> >> >> >>>>>>>>>>>>>>>>>>>>> (spark
> >> >> >>>>> does
> >> >> >>>>>>> full
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> size
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> /
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> #workers).
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> The workaround is what we see in the ES I/O: a
> >> >> >>>> maxSize
> >> >> >>>>>>> which
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> does
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> an
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> eager flush. The issue is that then the
> >> >> >>>>>>>>>>>>>>>>>>>>> checkpoint
> >> >> >>>>>>>>>>>>>>>>>>>>> is
> >> >> >>>>> not
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> respected
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> and you can process multiple times the same
> >> >> >>>>>>>>>>>>>>>>>>>>> records.
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> Any plan to make this API reliable and
> >> >> >>>>>>>>>>>>>>>>>>>>> controllable
> >> >> >>>>> from
> >> >> >>>>>> a
> >> >> >>>>>>>>> beam
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> point
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> of view (at least in a max manner)?
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> Thanks,
> >> >> >>>>>>>>>>>>>>>>>>>>> Romain Manni-Bucau
> >> >> >>>>>>>>>>>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github |
> >> >> >>>>>>>>>>>>>>>>>>>>> LinkedIn
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> --
> >> >> >>>>>>>>>>>>>> Jean-Baptiste Onofré
> >> >> >>>>>>>>>>>>>> [email protected]
> >> >> >>>>>>>>>>>>>> http://blog.nanthrax.net
> >> >> >>>>>>>>>>>>>> Talend - http://www.talend.com
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> --
> >> >> >>>>>>>>>> Jean-Baptiste Onofré
> >> >> >>>>>>>>>> [email protected]
> >> >> >>>>>>>>>> http://blog.nanthrax.net
> >> >> >>>>>>>>>> Talend - http://www.talend.com
> >> >> >>>>>>>>>
> >> >> >>>>>>>
> >> >> >>>>>>
> >> >> >>>>>
> >> >> >>>>
> >> >>
> >> >> --
> >> >> Jean-Baptiste Onofré
> >> >> [email protected]
> >> >> http://blog.nanthrax.net
> >> >> Talend - http://www.talend.com
> >>
> >
>
>

Reply via email to