I mean: if these runners have some limitation that forces them into only
supporting tiny bundles, there's a good chance that this limitation will
also apply to whatever beam model API you propose as a fix, and they won't
be able to implement it.

On Thu, Nov 30, 2017, 11:19 AM Eugene Kirpichov <kirpic...@google.com>
wrote:

> So is your main concern potential poor performance on runners that choose
> to use a very small bundle size? (Currently an IO can trivially handle too
> large bundles, simply by flushing when enough data accumulates, which is
> what all IOs do - but indeed working around having unreasonably small
> bundles is much harder)
>
> If so, I think, rather than making a model change, we should understand
> why those runners are choosing such a small bundle size, and potentially
> fix them.
>
> On Thu, Nov 30, 2017, 11:01 AM Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>
>>
>>
>> Le 30 nov. 2017 19:23, "Kenneth Knowles" <k...@google.com> a écrit :
>>
>> On Thu, Nov 30, 2017 at 10:03 AM, Romain Manni-Bucau <
>> rmannibu...@gmail.com> wrote:
>>
>>> Hmm,
>>>
>>> ESIO:
>>> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L847
>>> JDBCIO:
>>> https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L592
>>> MongoIO:
>>> https://github.com/apache/beam/blob/master/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java#L657
>>> etc...
>>>
>>> They all use the same pattern.
>>>
>>
>> This is actually correct - if you have some triggers set up to yield some
>> low latency, then things need to actually be flushed here. If the runner
>> provides a one-element bundle it could be because data volume has dipped.
>> In this case, you pay per element instead of getting good amortization, but
>> since data volume is low this is not so bad and anyhow the only way to
>> yield the desired latency.
>>
>> Romain - just to echo some others, did you have a particular combination
>> of runner + IO that you wanted to target for improvement? That would focus
>> the discussion and we could think about what to change in the runner or IO
>> or discover an issue that they cannot solve.
>>
>>
>> I want to ensure EsIO will never do a flush of 1 element on any runner
>> without a timertrigger - assuming data volume is continuous and with a size
>> > 1.
>>
>> Really rephrased, my concern is that bundle which is a great
>> infra/environment feedback is today owned by beam code which defeats that
>> great purpose since beam doesnt use the impl for that. This notion should
>> be unified (size + timeout are often the defaults to trigger an "end" and
>> would work) accross runners or it should be hidden from the transform
>> developers IMHO.
>>
>> Note the low latency point is not linked to bundle size but, as you
>> mentionned, triggers (timeout or event based) which means both worlds can
>> work together in harmony and outcome to a valid bundle api (without any
>> change, yeah) and exposure to the user.
>>
>>
>>
>> Kenn
>>
>>
>>
>>>
>>> From what you wrote - and technically I agree but in current state my
>>> point is valid I think - you should drop bundle from the whole user
>>> API and make it all @Internal, no?
>>>
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>>
>>>
>>> 2017-11-30 18:58 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
>>> > Agree, but maybe we can inform the runner if wanted no ?
>>> >
>>> > Honestly, from my side, I'm fine with the current situation as it's
>>> runner
>>> > specific.
>>> >
>>> > Regards
>>> > JB
>>> >
>>> > On 11/30/2017 06:12 PM, Reuven Lax wrote:
>>> >>
>>> >> I don't think it belongs in PIpelineOptions, as bundle size is always
>>> a
>>> >> runner thing.
>>> >>
>>> >> We could consider adding a new generic RunnerOptions, however I'm not
>>> >> convinced all runners can actually support this.
>>> >>
>>> >> On Thu, Nov 30, 2017 at 6:09 AM, Romain Manni-Bucau <
>>> rmannibu...@gmail.com
>>> >> <mailto:rmannibu...@gmail.com>> wrote:
>>> >>
>>> >>     Guys,
>>> >>
>>> >>     what about moving getMaxBundleSize from flink options to pipeline
>>> >>     options. I think all runners can support it right? Spark code
>>> needs
>>> >>     the merge of the v2 before being able to be implemented probably
>>> but I
>>> >>     don't see any blocker.
>>> >>
>>> >>     wdyt?
>>> >>
>>> >>     Romain Manni-Bucau
>>> >>     @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>> >>
>>> >>
>>> >>     2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau <
>>> rmannibu...@gmail.com
>>> >>     <mailto:rmannibu...@gmail.com>>:
>>> >>
>>> >>      > @Eugene: "workaround" as specific to the IO each time and
>>> therefore
>>> >>      > still highlight a lack in the core.
>>> >>      >
>>> >>      > Other comments inline
>>> >>      >
>>> >>      >
>>> >>      > 2017-11-19 7:40 GMT+01:00 Robert Bradshaw
>>> >> <rober...@google.com.invalid>:
>>> >>      >> There is a possible fourth issue that we don't handle well:
>>> >> efficiency. For
>>> >>      >> very large bundles, it may be advantageous to avoid replaying
>>> a
>>> >> bunch of
>>> >>      >> idempotent operations if there were a way to record what ones
>>> >> we're sure
>>> >>      >> went through. Not sure if that's the issue here (though one
>>> could
>>> >> possibly
>>> >>      >> do this with SDFs, one can preemptively returning periodically
>>> >> before an
>>> >>      >> element (or portion thereof) is done).
>>> >>      >
>>> >>      > +1, also lead to the IO handling its own chunking/bundles and
>>> >>      > therefore solves all issues at once.
>>> >>      >
>>> >>      >>
>>> >>      >> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov <
>>> >>      >> kirpic...@google.com.invalid> wrote:
>>> >>      >>
>>> >>      >>> I disagree that the usage of document id in ES is a
>>> "workaround"
>>> >> - it does
>>> >>      >>> not address any *accidental *complexity
>>> >>      >>> <https://en.wikipedia.org/wiki/No_Silver_Bullet
>>> >>     <https://en.wikipedia.org/wiki/No_Silver_Bullet>> coming from
>>> >> shortcomings
>>> >>      >>> of Beam, it addresses the *essential* complexity that a
>>> >> distributed system
>>> >>      >>> forces one to take it as a fact of nature that the same write
>>> >>      >>> (mutation) will happen multiple times, so if you want a
>>> mutation
>>> >> to happen
>>> >>      >>> "as-if" it happened exactly once, the mutation itself must be
>>> >> idempotent
>>> >>      >>> <https://en.wikipedia.org/wiki/Idempotence
>>> >>     <https://en.wikipedia.org/wiki/Idempotence>>. Insert-with-id
>>> (upsert
>>> >>      >>> <https://en.wikipedia.org/wiki/Merge_(SQL)
>>> >>     <https://en.wikipedia.org/wiki/Merge_(SQL)>>) is a classic
>>> example of
>>> >> an
>>> >>      >>> idempotent mutation, and it's very good that Elasticsearch
>>> >> provides it - if
>>> >>      >>> it didn't, no matter how good of an API Beam had, achieving
>>> >> exactly-once
>>> >>      >>> writes would be theoretically impossible. Are we in
>>> agreement on
>>> >> this so
>>> >>      >>> far?
>>> >>      >>>
>>> >>      >>> Next: you seem to be discussing 3 issues together, all of
>>> which
>>> >> are valid
>>> >>      >>> issues, but they seem unrelated to me:
>>> >>      >>> 1. Exactly-once mutation
>>> >>      >>> 2. Batching multiple mutations into one RPC.
>>> >>      >>> 3. Backpressure
>>> >>      >>>
>>> >>      >>> #1: was considered above. The system the IO is talking to
>>> has to
>>> >> support
>>> >>      >>> idempotent mutations, in an IO-specific way, and the IO has
>>> to
>>> >> take
>>> >>      >>> advantage of them, in the IO-specific way - end of story.
>>> >>      >
>>> >>      > Agree but don't forget the original point was about "chunks"
>>> and
>>> >> not
>>> >>      > individual records.
>>> >>      >
>>> >>      >>>
>>> >>      >>> #2: a batch of idempotent operations is also idempotent, so
>>> this
>>> >> doesn't
>>> >>      >>> add anything new semantically. Syntactically - Beam already
>>> >> allows you to
>>> >>      >>> write your own batching by notifying you of permitted batch
>>> >> boundaries
>>> >>      >>> (Start/FinishBundle). Sure, it could do more, but from my
>>> >> experience the
>>> >>      >>> batching in IOs I've seen is one of the easiest and least
>>> >> error-prone
>>> >>      >>> parts, so I don't see something worth an extended discussion
>>> >> here.
>>> >>      >
>>> >>      > "Beam already allows you to
>>> >>      >  write your own batching by notifying you of permitted batch
>>> >> boundaries
>>> >>      >  (Start/FinishBundle)"
>>> >>      >
>>> >>      > Is wrong since the bundle is potentially the whole PCollection
>>> >> (spark)
>>> >>      > so this is not even an option until you use the SDF (back to
>>> the
>>> >> same
>>> >>      > point).
>>> >>      > Once again the API looks fine but no implementation makes it
>>> true.
>>> >> It
>>> >>      > would be easy to change it in spark, flink can be ok since it
>>> >> targets
>>> >>      > more the streaming case, not sure of others, any idea?
>>> >>      >
>>> >>      >
>>> >>      >>>
>>> >>      >>> #3: handling backpressure is a complex problem with multiple
>>> >> facets: 1) how
>>> >>      >>> do you know you're being throttled, and by how much are you
>>> >> exceeding the
>>> >>      >>> external system's capacity?
>>> >>      >
>>> >>      > This is the whole point of backpressure, the system sends it
>>> back
>>> >> to
>>> >>      > you (header like or status technic in general)
>>> >>      >
>>> >>      >>> 2) how do you communicate this signal to the
>>> >>      >>> runner?
>>> >>      >
>>> >>      > You are a client so you get the meta in the response - whatever
>>> >> techno.
>>> >>      >
>>> >>      >>> 3) what does the runner do in response?
>>> >>      >
>>> >>      > Runner nothing but the IO adapts its handling as mentionned
>>> before
>>> >>      > (wait and retry, skip, ... depending the config)
>>> >>      >
>>> >>      >>> 4) how do you wait until
>>> >>      >>> it's ok to try again?
>>> >>      >
>>> >>      > This is one point to probably enhance in beam but waiting in
>>> the
>>> >>      > processing is an option if the source has some buffering
>>> otherwise
>>> >> it
>>> >>      > requires to have a buffer fallback and max size if the wait
>>> mode is
>>> >>      > activated.
>>> >>      >
>>> >>      >>> You seem to be advocating for solving one facet of this
>>> problem,
>>> >> which is:
>>> >>      >>> you want it to be possible to signal to the runner "I'm being
>>> >> throttled,
>>> >>      >>> please end the bundle", right? If so - I think this (ending
>>> the
>>> >> bundle) is
>>> >>      >>> unnecessary: the DoFn can simply do an exponential back-off
>>> sleep
>>> >> loop.
>>> >>      >
>>> >>      > Agree, never said the runner should know but GBK+output doesnt
>>> work
>>> >>      > cause you dont own the GBK.
>>> >>      >
>>> >>      >>> This is e.g. what DatastoreIO does
>>> >>      >>> <https://github.com/apache/beam/blob/master/sdks/java/io/
>>> >>     <https://github.com/apache/beam/blob/master/sdks/java/io/>
>>> >>      >>> google-cloud-platform/src/main/java/org/apache/beam/sdk/
>>> >>      >>> io/gcp/datastore/DatastoreV1.java#L1318>
>>> >>      >>> and
>>> >>      >>> this is in general how most systems I've seen handle
>>> >> backpressure. Is there
>>> >>      >>> something I'm missing? In particular, is there any compelling
>>> >> reason why
>>> >>      >>> you think it'd be beneficial e.g. for DatastoreIO to commit
>>> the
>>> >> results of
>>> >>      >>> the bundle so far before processing other elements?
>>> >>      >
>>> >>      > It was more about ensuring you validate early a subset of the
>>> whole
>>> >>      > bundle and avoid to reprocess it if it fails later.
>>> >>      >
>>> >>      >
>>> >>      > So to summarize I see 2 outcomes:
>>> >>      >
>>> >>      > 1. impl SDF in all runners
>>> >>      > 2. make the bundle size upper bounded - through a pipeline
>>> option -
>>> >> in
>>> >>      > all runners, not sure this one is doable everywhere since I
>>> mainly
>>> >>      > checked spark case
>>> >>      >
>>> >>      >>>
>>> >>      >>> Again, it might be that I'm still misunderstanding what
>>> you're
>>> >> trying to
>>> >>      >>> say. One of the things it would help to clarify would be -
>>> >> exactly what do
>>> >>      >>> you mean by "how batch frameworks solved that for years":
>>> can you
>>> >> point at
>>> >>      >>> an existing API in some other framework that achieves what
>>> you
>>> >> want?
>>> >>      >>>
>>> >>      >>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau
>>> >>     <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
>>> >>
>>> >>      >>> wrote:
>>> >>      >>>
>>> >>      >>> > Eugene, point - and issue with a single sample - is you can
>>> >> always find
>>> >>      >>> > *workarounds* on a case by case basis as the id one with
>>> ES but
>>> >> beam
>>> >>      >>> doesnt
>>> >>      >>> > solve the problem as a framework.
>>> >>      >>> >
>>> >>      >>> > From my past, I clearly dont see how batch frameworks
>>> solved
>>> >> that for
>>> >>      >>> years
>>> >>      >>> > and beam is not able to do it - keep in mind it is the same
>>> >> kind of
>>> >>      >>> techno,
>>> >>      >>> > it just uses different sources and bigger clusters so no
>>> real
>>> >> reason to
>>> >>      >>> not
>>> >>      >>> > have the same feature quality. The only potential reason i
>>> see
>>> >> is there
>>> >>      >>> is
>>> >>      >>> > no tracking of the state into the cluster - e2e. But i
>>> dont see
>>> >> why there
>>> >>      >>> > wouldnt be. Do I miss something here?
>>> >>      >>> >
>>> >>      >>> > An example could be: take a github crawler computing stats
>>> on
>>> >> the whole
>>> >>      >>> > girhub repos which is based on a rest client as example.
>>> You
>>> >> will need to
>>> >>      >>> > handle the rate limit and likely want to "commit" each
>>> time you
>>> >> reach a
>>> >>      >>> > rate limit with likely some buffering strategy with a max
>>> size
>>> >> before
>>> >>      >>> > really waiting. How do you do it with a GBK independent of
>>> your
>>> >> dofn? You
>>> >>      >>> > are not able to compose correctly the fn between them :(.
>>> >>      >>> >
>>> >>      >>> >
>>> >>      >>> > Le 18 nov. 2017 20:48, "Eugene Kirpichov"
>>> >> <kirpic...@google.com.invalid>
>>> >>      >>> a
>>> >>      >>> > écrit :
>>> >>      >>> >
>>> >>      >>> > After giving this thread my best attempt at understanding
>>> >> exactly what is
>>> >>      >>> > the problem and the proposed solution, I'm afraid I still
>>> fail
>>> >> to
>>> >>      >>> > understand both. To reiterate, I think the only way to make
>>> >> progress here
>>> >>      >>> > is to be more concrete: (quote) take some IO that you think
>>> >> could be
>>> >>      >>> easier
>>> >>      >>> > to write with your proposed API, give the contents of a
>>> >> hypothetical
>>> >>      >>> > PCollection being written to this IO, give the code of a
>>> >> hypothetical
>>> >>      >>> DoFn
>>> >>      >>> > implementing the write using your API, and explain what
>>> you'd
>>> >> expect to
>>> >>      >>> > happen at runtime. I'm going to re-engage in this thread
>>> after
>>> >> such an
>>> >>      >>> > example is given.
>>> >>      >>> >
>>> >>      >>> > On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau
>>> >>     <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
>>> >>
>>> >>      >>> > wrote:
>>> >>      >>> >
>>> >>      >>> > > First bundle retry is unusable with dome runners like
>>> spark
>>> >> where the
>>> >>      >>> > > bundle size is the collection size / number of work. This
>>> >> means a user
>>> >>      >>> > cant
>>> >>      >>> > > use bundle API or feature reliably and portably - which
>>> is
>>> >> beam
>>> >>      >>> promise.
>>> >>      >>> > > Aligning chunking and bundles would guarantee that bit
>>> can be
>>> >> not
>>> >>      >>> > desired,
>>> >>      >>> > > that is why i thought it can be another feature.
>>> >>      >>> > >
>>> >>      >>> > > GBK works until the IO knows about that and both
>>> concepts are
>>> >> not
>>> >>      >>> always
>>> >>      >>> > > orthogonal - backpressure like systems is a trivial
>>> common
>>> >> example.
>>> >>      >>> This
>>> >>      >>> > > means the IO (dofn) must be able to do it itself at some
>>> >> point.
>>> >>      >>> > >
>>> >>      >>> > > Also note the GBK works only if the IO can take a list
>>> which
>>> >> is never
>>> >>      >>> the
>>> >>      >>> > > case today.
>>> >>      >>> > >
>>> >>      >>> > > Big questions for me are: is SDF the way to go since it
>>> >> provides the
>>> >>      >>> > needed
>>> >>      >>> > > API bit is not yet supported? What about existing IO?
>>> Should
>>> >> beam
>>> >>      >>> provide
>>> >>      >>> > > an auto wrapping of dofn for that pre-aggregated support
>>> and
>>> >> simulate
>>> >>      >>> > > bundles to the actual IO impl to keep the existing API?
>>> >>      >>> > >
>>> >>      >>> > >
>>> >>      >>> > > Le 17 nov. 2017 19:20, "Raghu Angadi"
>>> >> <rang...@google.com.invalid> a
>>> >>      >>> > > écrit :
>>> >>      >>> > >
>>> >>      >>> > > On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau <
>>> >>      >>> > rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>
>>> >>
>>> >>      >>> > > >
>>> >>      >>> > > wrote:
>>> >>      >>> > >
>>> >>      >>> > > > Yep, just take ES IO, if a part of a bundle fails you
>>> are
>>> >> in an
>>> >>      >>> > > > unmanaged state. This is the case for all O (of IO ;)).
>>> >> Issue is not
>>> >>      >>> > > > much about "1" (the code it takes) but more the fact it
>>> >> doesn't
>>> >>      >>> > > > integrate with runner features and retries potentially:
>>> >> what happens
>>> >>      >>> > > > if a bundle has a failure? => undefined today. 2. I'm
>>> fine
>>> >> with it
>>> >>      >>> > > > while we know exactly what happens when we restart
>>> after a
>>> >> bundle
>>> >>      >>> > > > failure. With ES the timestamp can be used for
>>> instance.
>>> >>      >>> > > >
>>> >>      >>> > >
>>> >>      >>> > > This deterministic batching can be achieved even now
>>> with an
>>> >> extra
>>> >>      >>> > > GroupByKey (and if you want ordering on top of that, will
>>> >> need another
>>> >>      >>> > > GBK). Don't know if that is too costly in your case. I
>>> would
>>> >> need bit
>>> >>      >>> > more
>>> >>      >>> > > details on handling ES IO write retries to see it could
>>> be
>>> >> simplified.
>>> >>      >>> > Note
>>> >>      >>> > > that retries occur with or without any failures in your
>>> DoFn.
>>> >>      >>> > >
>>> >>      >>> > > The biggest negative with GBK approach is that it doesn't
>>> >> provide same
>>> >>      >>> > > guarantees on Flink.
>>> >>      >>> > >
>>> >>      >>> > > I don't see how GroubIntoBatches in Beam provides
>>> specific
>>> >> guarantees
>>> >>      >>> on
>>> >>      >>> > > deterministic batches.
>>> >>      >>> > >
>>> >>      >>> > > Thinking about it the SDF is really a way to do it since
>>> the
>>> >> SDF will
>>> >>      >>> > > > manage the bulking and associated with the runner
>>> "retry"
>>> >> it seems it
>>> >>      >>> > > > covers the needs.
>>> >>      >>> > > >
>>> >>      >>> > > > Romain Manni-Bucau
>>> >>      >>> > > > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>> >>      >>> > > >
>>> >>      >>> > > >
>>> >>      >>> > > > 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov
>>> >>      >>> > <kirpic...@google.com.invalid
>>> >>      >>> > > >:
>>> >>      >>> > > > > I must admit I'm still failing to understand the
>>> problem,
>>> >> so let's
>>> >>      >>> > step
>>> >>      >>> > > > > back even further.
>>> >>      >>> > > > >
>>> >>      >>> > > > > Could you give an example of an IO that is currently
>>> >> difficult to
>>> >>      >>> > > > implement
>>> >>      >>> > > > > specifically because of lack of the feature you're
>>> >> talking about?
>>> >>      >>> > > > >
>>> >>      >>> > > > > I'm asking because I've reviewed almost all Beam IOs
>>> and
>>> >> don't
>>> >>      >>> recall
>>> >>      >>> > > > > seeing a similar problem. Sure, a lot of IOs do
>>> batching
>>> >> within a
>>> >>      >>> > > bundle,
>>> >>      >>> > > > > but 1) it doesn't take up much code (granted, it
>>> would be
>>> >> even
>>> >>      >>> easier
>>> >>      >>> > > if
>>> >>      >>> > > > > Beam did it for us) and 2) I don't remember any of
>>> them
>>> >> requiring
>>> >>      >>> the
>>> >>      >>> > > > > batches to be deterministic, and I'm having a hard
>>> time
>>> >> imagining
>>> >>      >>> > what
>>> >>      >>> > > > kind
>>> >>      >>> > > > > of storage system would be able to deduplicate if
>>> batches
>>> >> were
>>> >>      >>> > > > > deterministic but wouldn't be able to deduplicate if
>>> they
>>> >> weren't.
>>> >>      >>> > > > >
>>> >>      >>> > > > > On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau <
>>> >>      >>> > > > rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
>>> >>
>>> >>      >>> > > > > wrote:
>>> >>      >>> > > > >
>>> >>      >>> > > > >> Ok, let me try to step back and summarize what we
>>> have
>>> >> today and
>>> >>      >>> > what
>>> >>      >>> > > I
>>> >>      >>> > > > >> miss:
>>> >>      >>> > > > >>
>>> >>      >>> > > > >> 1. we can handle chunking in beam through group in
>>> batch
>>> >> (or
>>> >>      >>> > > equivalent)
>>> >>      >>> > > > >> but:
>>> >>      >>> > > > >>    > it is not built-in into the transforms (IO)
>>> and it
>>> >> is
>>> >>      >>> > controlled
>>> >>      >>> > > > >> from outside the transforms so no way for a
>>> transform to
>>> >> do it
>>> >>      >>> > > > >> properly without handling itself a composition and
>>> links
>>> >> between
>>> >>      >>> > > > >> multiple dofns to have notifications and potentially
>>> >> react
>>> >>      >>> properly
>>> >>      >>> > or
>>> >>      >>> > > > >> handle backpressure from its backend
>>> >>      >>> > > > >> 2. there is no restart feature because there is no
>>> real
>>> >> state
>>> >>      >>> > handling
>>> >>      >>> > > > >> at the moment. this sounds fully delegated to the
>>> runner
>>> >> but I was
>>> >>      >>> > > > >> hoping to have more guarantees from the used API to
>>> be
>>> >> able to
>>> >>      >>> > restart
>>> >>      >>> > > > >> a pipeline (mainly batch since it can be irrelevant
>>> or
>>> >> delegates
>>> >>      >>> to
>>> >>      >>> > > > >> the backend for streams) and handle only not
>>> commited
>>> >> records so
>>> >>      >>> it
>>> >>      >>> > > > >> requires some persistence outside the main IO
>>> storages
>>> >> to do it
>>> >>      >>> > > > >> properly
>>> >>      >>> > > > >>    > note this is somehow similar to the monitoring
>>> >> topic which
>>> >>      >>> miss
>>> >>      >>> > > > >> persistence ATM so it can end up to beam to have a
>>> >> pluggable
>>> >>      >>> storage
>>> >>      >>> > > > >> for a few concerns
>>> >>      >>> > > > >>
>>> >>      >>> > > > >>
>>> >>      >>> > > > >> Short term I would be happy with 1 solved properly,
>>> long
>>> >> term I
>>> >>      >>> hope
>>> >>      >>> > 2
>>> >>      >>> > > > >> will be tackled without workarounds requiring custom
>>> >> wrapping of
>>> >>      >>> IO
>>> >>      >>> > to
>>> >>      >>> > > > >> use a custom state persistence.
>>> >>      >>> > > > >>
>>> >>      >>> > > > >>
>>> >>      >>> > > > >>
>>> >>      >>> > > > >> Romain Manni-Bucau
>>> >>      >>> > > > >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>> >>      >>> > > > >>
>>> >>      >>> > > > >>
>>> >>      >>> > > > >> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré
>>> >>     <j...@nanthrax.net <mailto:j...@nanthrax.net>>:
>>> >>
>>> >>      >>> > > > >> > Thanks for the explanation. Agree, we might talk
>>> about
>>> >> different
>>> >>      >>> > > > things
>>> >>      >>> > > > >> > using the same wording.
>>> >>      >>> > > > >> >
>>> >>      >>> > > > >> > I'm also struggling to understand the use case
>>> (for a
>>> >> generic
>>> >>      >>> > DoFn).
>>> >>      >>> > > > >> >
>>> >>      >>> > > > >> > Regards
>>> >>      >>> > > > >> > JB
>>> >>      >>> > > > >> >
>>> >>      >>> > > > >> >
>>> >>      >>> > > > >> > On 11/17/2017 07:40 AM, Eugene Kirpichov wrote:
>>> >>      >>> > > > >> >>
>>> >>      >>> > > > >> >> To avoid spending a lot of time pursuing a false
>>> >> path, I'd like
>>> >>      >>> > to
>>> >>      >>> > > > say
>>> >>      >>> > > > >> >> straight up that SDF is definitely not going to
>>> help
>>> >> here,
>>> >>      >>> > despite
>>> >>      >>> > > > the
>>> >>      >>> > > > >> >> fact
>>> >>      >>> > > > >> >> that its API includes the term "checkpoint". In
>>> SDF,
>>> >> the
>>> >>      >>> > > "checkpoint"
>>> >>      >>> > > > >> >> captures the state of processing within a single
>>> >> element. If
>>> >>      >>> > you're
>>> >>      >>> > > > >> >> applying an SDF to 1000 elements, it will, like
>>> any
>>> >> other DoFn,
>>> >>      >>> > be
>>> >>      >>> > > > >> applied
>>> >>      >>> > > > >> >> to each of them independently and in parallel,
>>> and
>>> >> you'll have
>>> >>      >>> > 1000
>>> >>      >>> > > > >> >> checkpoints capturing the state of processing
>>> each of
>>> >> these
>>> >>      >>> > > elements,
>>> >>      >>> > > > >> >> which
>>> >>      >>> > > > >> >> is probably not what you want.
>>> >>      >>> > > > >> >>
>>> >>      >>> > > > >> >> I'm afraid I still don't understand what kind of
>>> >> checkpoint you
>>> >>      >>> > > > need, if
>>> >>      >>> > > > >> >> it
>>> >>      >>> > > > >> >> is not just deterministic grouping into batches.
>>> >> "Checkpoint"
>>> >>      >>> is
>>> >>      >>> > a
>>> >>      >>> > > > very
>>> >>      >>> > > > >> >> broad term and it's very possible that everybody
>>> in
>>> >> this thread
>>> >>      >>> > is
>>> >>      >>> > > > >> talking
>>> >>      >>> > > > >> >> about different things when saying it. So it
>>> would
>>> >> help if you
>>> >>      >>> > > could
>>> >>      >>> > > > >> give
>>> >>      >>> > > > >> >> a
>>> >>      >>> > > > >> >> more concrete example: for example, take some IO
>>> that
>>> >> you think
>>> >>      >>> > > > could be
>>> >>      >>> > > > >> >> easier to write with your proposed API, give the
>>> >> contents of a
>>> >>      >>> > > > >> >> hypothetical
>>> >>      >>> > > > >> >> PCollection being written to this IO, give the
>>> code
>>> >> of a
>>> >>      >>> > > hypothetical
>>> >>      >>> > > > >> DoFn
>>> >>      >>> > > > >> >> implementing the write using your API, and
>>> explain
>>> >> what you'd
>>> >>      >>> > > expect
>>> >>      >>> > > > to
>>> >>      >>> > > > >> >> happen at runtime.
>>> >>      >>> > > > >> >>
>>> >>      >>> > > > >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain
>>> Manni-Bucau
>>> >>      >>> > > > >> >> <rmannibu...@gmail.com
>>> >> <mailto:rmannibu...@gmail.com>>
>>> >>
>>> >>      >>> > > > >> >> wrote:
>>> >>      >>> > > > >> >>
>>> >>      >>> > > > >> >>> @Eugene: yes and the other alternative of
>>> Reuven too
>>> >> but it is
>>> >>      >>> > > still
>>> >>      >>> > > > >> >>> 1. relying on timers, 2. not really checkpointed
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> In other words it seems all solutions are to
>>> create
>>> >> a chunk of
>>> >>      >>> > > size
>>> >>      >>> > > > 1
>>> >>      >>> > > > >> >>> and replayable to fake the lack of chunking in
>>> the
>>> >> framework.
>>> >>      >>> > This
>>> >>      >>> > > > >> >>> always implies a chunk handling outside the
>>> >> component
>>> >>      >>> (typically
>>> >>      >>> > > > >> >>> before for an output). My point is I think IO
>>> need
>>> >> it in their
>>> >>      >>> > own
>>> >>      >>> > > > >> >>> "internal" or at least control it themselves
>>> since
>>> >> the chunk
>>> >>      >>> > size
>>> >>      >>> > > is
>>> >>      >>> > > > >> >>> part of the IO handling most of the time.
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> I think JB spoke of the same "group before"
>>> trick
>>> >> using
>>> >>      >>> > > restrictions
>>> >>      >>> > > > >> >>> which can work I have to admit if SDF are
>>> >> implemented by
>>> >>      >>> > runners.
>>> >>      >>> > > Is
>>> >>      >>> > > > >> >>> there a roadmap/status on that? Last time I
>>> checked
>>> >> SDF was a
>>> >>      >>> > > great
>>> >>      >>> > > > >> >>> API without support :(.
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> Romain Manni-Bucau
>>> >>      >>> > > > >> >>> @rmannibucau |  Blog | Old Blog | Github |
>>> LinkedIn
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
>>> >>      >>> > > > >> >>> <kirpic...@google.com.invalid>:
>>> >>      >>> > > > >> >>>>
>>> >>      >>> > > > >> >>>> JB, not sure what you mean? SDFs and triggers
>>> are
>>> >> unrelated,
>>> >>      >>> > and
>>> >>      >>> > > > the
>>> >>      >>> > > > >> >>>> post
>>> >>      >>> > > > >> >>>> doesn't mention the word. Did you mean
>>> something
>>> >> else, e.g.
>>> >>      >>> > > > >> restriction
>>> >>      >>> > > > >> >>>> perhaps? Either way I don't think SDFs are the
>>> >> solution here;
>>> >>      >>> > > SDFs
>>> >>      >>> > > > >> have
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> to
>>> >>      >>> > > > >> >>>>
>>> >>      >>> > > > >> >>>> do with the ability to split the processing of
>>> *a
>>> >> single
>>> >>      >>> > element*
>>> >>      >>> > > > over
>>> >>      >>> > > > >> >>>> multiple calls, whereas Romain I think is
>>> asking
>>> >> for
>>> >>      >>> repeatable
>>> >>      >>> > > > >> grouping
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> of
>>> >>      >>> > > > >> >>>>
>>> >>      >>> > > > >> >>>> *multiple* elements.
>>> >>      >>> > > > >> >>>>
>>> >>      >>> > > > >> >>>> Romain - does
>>> >>      >>> > > > >> >>>>
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >>
>>> https://github.com/apache/beam/blob/master/sdks/java/
>>> >>     <https://github.com/apache/beam/blob/master/sdks/java/>
>>> >>      >>> > > > core/src/main/java/org/apache/beam/sdk/transforms/
>>> >>      >>> GroupIntoBatches.java
>>> >>      >>> > > > >> >>>>
>>> >>      >>> > > > >> >>>> do what
>>> >>      >>> > > > >> >>>> you want?
>>> >>      >>> > > > >> >>>>
>>> >>      >>> > > > >> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste
>>> >> Onofré <
>>> >>      >>> > > > >> j...@nanthrax.net <mailto:j...@nanthrax.net>>
>>> >>      >>> > > > >> >>>> wrote:
>>> >>      >>> > > > >> >>>>
>>> >>      >>> > > > >> >>>>> It sounds like the "Trigger" in the Splittable
>>> >> DoFn, no ?
>>> >>      >>> > > > >> >>>>>
>>> >>      >>> > > > >> >>>>>
>>> >> https://beam.apache.org/blog/2017/08/16/splittable-do-fn
>>> >>     <https://beam.apache.org/blog/2017/08/16/splittable-do-fn>.
>>> >>
>>> >>      >>> html
>>> >>      >>> > > > >> >>>>>
>>> >>      >>> > > > >> >>>>> Regards
>>> >>      >>> > > > >> >>>>> JB
>>> >>      >>> > > > >> >>>>>
>>> >>      >>> > > > >> >>>>>
>>> >>      >>> > > > >> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau
>>> wrote:
>>> >>      >>> > > > >> >>>>>>
>>> >>      >>> > > > >> >>>>>> it gives the fn/transform the ability to
>>> save a
>>> >> state - it
>>> >>      >>> > can
>>> >>      >>> > > > get
>>> >>      >>> > > > >> >>>>>> back on "restart" / whatever unit we can use,
>>> >> probably
>>> >>      >>> runner
>>> >>      >>> > > > >> >>>>>> dependent? Without that you need to rewrite
>>> all
>>> >> IO usage
>>> >>      >>> with
>>> >>      >>> > > > >> >>>>>> something like the previous pattern which
>>> makes
>>> >> the IO not
>>> >>      >>> > self
>>> >>      >>> > > > >> >>>>>> sufficient and kind of makes the entry cost
>>> and
>>> >> usage of
>>> >>      >>> beam
>>> >>      >>> > > way
>>> >>      >>> > > > >> >>>>>> further.
>>> >>      >>> > > > >> >>>>>>
>>> >>      >>> > > > >> >>>>>> In my mind it is exactly what
>>> jbatch/spring-batch
>>> >> uses but
>>> >>      >>> > > > adapted
>>> >>      >>> > > > >> to
>>> >>      >>> > > > >> >>>>>> beam (stream in particular) case.
>>> >>      >>> > > > >> >>>>>>
>>> >>      >>> > > > >> >>>>>> Romain Manni-Bucau
>>> >>      >>> > > > >> >>>>>> @rmannibucau |  Blog | Old Blog | Github |
>>> >> LinkedIn
>>> >>      >>> > > > >> >>>>>>
>>> >>      >>> > > > >> >>>>>>
>>> >>      >>> > > > >> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax
>>> >>      >>> > <re...@google.com.invalid
>>> >>      >>> > > >:
>>> >>      >>> > > > >> >>>>>>>
>>> >>      >>> > > > >> >>>>>>> Romain,
>>> >>      >>> > > > >> >>>>>>>
>>> >>      >>> > > > >> >>>>>>> Can you define what you mean by checkpoint?
>>> What
>>> >> are the
>>> >>      >>> > > > semantics,
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> what
>>> >>      >>> > > > >> >>>>>>>
>>> >>      >>> > > > >> >>>>>>> does it accomplish?
>>> >>      >>> > > > >> >>>>>>>
>>> >>      >>> > > > >> >>>>>>> Reuven
>>> >>      >>> > > > >> >>>>>>>
>>> >>      >>> > > > >> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain
>>> >> Manni-Bucau <
>>> >>      >>> > > > >> >>>>>
>>> >>      >>> > > > >> >>>>> rmannibu...@gmail.com
>>> >> <mailto:rmannibu...@gmail.com>>
>>> >>
>>> >>      >>> > > > >> >>>>>>>
>>> >>      >>> > > > >> >>>>>>> wrote:
>>> >>      >>> > > > >> >>>>>>>
>>> >>      >>> > > > >> >>>>>>>> Yes, what I propose earlier was:
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> I. checkpoint marker:
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> @AnyBeamAnnotation
>>> >>      >>> > > > >> >>>>>>>> @CheckpointAfter
>>> >>      >>> > > > >> >>>>>>>> public void someHook(SomeContext ctx);
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> II. pipeline.apply(ParDo.of(new
>>> >>      >>> > > > >> MyFn()).withCheckpointAlgorithm(new
>>> >>      >>> > > > >> >>>>>>>> CountingAlgo()))
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> III. (I like this one less)
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> // in the dofn
>>> >>      >>> > > > >> >>>>>>>> @CheckpointTester
>>> >>      >>> > > > >> >>>>>>>> public boolean shouldCheckpoint();
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> IV. @Checkpointer Serializable
>>> getCheckpoint();
>>> >> in the
>>> >>      >>> dofn
>>> >>      >>> > > per
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> element
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> Romain Manni-Bucau
>>> >>      >>> > > > >> >>>>>>>> @rmannibucau |  Blog | Old Blog | Github |
>>> >> LinkedIn
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi
>>> >>      >>> > > > <rang...@google.com.invalid
>>> >>      >>> > > > >> >>>>
>>> >>      >>> > > > >> >>>> :
>>> >>      >>> > > > >> >>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>> How would you define it (rough API is
>>> fine)?.
>>> >> Without
>>> >>      >>> more
>>> >>      >>> > > > >> details,
>>> >>      >>> > > > >> >>>>>
>>> >>      >>> > > > >> >>>>> it is
>>> >>      >>> > > > >> >>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>> not easy to see wider applicability and
>>> >> feasibility in
>>> >>      >>> > > > runners.
>>> >>      >>> > > > >> >>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain
>>> >> Manni-Bucau <
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> rmannibu...@gmail.com
>>> >> <mailto:rmannibu...@gmail.com>>
>>> >>
>>> >>      >>> > > > >> >>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>> wrote:
>>> >>      >>> > > > >> >>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>> This is a fair summary of the current
>>> state
>>> >> but also
>>> >>      >>> > where
>>> >>      >>> > > > beam
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> can
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> have a
>>> >>      >>> > > > >> >>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>> very strong added value and make big data
>>> >> great and
>>> >>      >>> > smooth.
>>> >>      >>> > > > >> >>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>> Instead of this replay feature isnt
>>> >> checkpointing
>>> >>      >>> > willable?
>>> >>      >>> > > > In
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> particular
>>> >>      >>> > > > >> >>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>> with SDF no?
>>> >>      >>> > > > >> >>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi"
>>> >>      >>> > > > >> <rang...@google.com.invalid>
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> a
>>> >>      >>> > > > >> >>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>> écrit :
>>> >>      >>> > > > >> >>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> Core issue here is that there is no
>>> explicit
>>> >> concept
>>> >>      >>> of
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> 'checkpoint'
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> in
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> Beam (UnboundedSource has a method
>>> >> 'getCheckpointMark'
>>> >>      >>> > but
>>> >>      >>> > > > that
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> refers to
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> the checkoint on external source).
>>> Runners
>>> >> do
>>> >>      >>> checkpoint
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> internally
>>> >>      >>> > > > >> >>>>>
>>> >>      >>> > > > >> >>>>> as
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> implementation detail. Flink's
>>> checkpoint
>>> >> model is
>>> >>      >>> > > entirely
>>> >>      >>> > > > >> >>>>>
>>> >>      >>> > > > >> >>>>> different
>>> >>      >>> > > > >> >>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>> from
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> Dataflow's and Spark's.
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> @StableReplay helps, but it does not
>>> >> explicitly talk
>>> >>      >>> > about
>>> >>      >>> > > a
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> checkpoint
>>> >>      >>> > > > >> >>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>> by
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> design.
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> If you are looking to achieve some
>>> >> guarantees with a
>>> >>      >>> > > > >> sink/DoFn, I
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> think
>>> >>      >>> > > > >> >>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>> it
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> is better to start with the
>>> requirements. I
>>> >> worked on
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> exactly-once
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> sink
>>> >>      >>> > > > >> >>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>> for
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()),
>>> where
>>> >> we
>>> >>      >>> > > essentially
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> reshard
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> the
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> elements and assign sequence numbers to
>>> >> elements with
>>> >>      >>> in
>>> >>      >>> > > > each
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> shard.
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> Duplicates in replays are avoided based
>>> on
>>> >> these
>>> >>      >>> > sequence
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> numbers.
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> DoFn
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> state API is used to buffer out-of order
>>> >> replays. The
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> implementation
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> strategy works in Dataflow but not in
>>> Flink
>>> >> which has
>>> >>      >>> a
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> horizontal
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> checkpoint. KafkaIO checks for
>>> >> compatibility.
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain
>>> >> Manni-Bucau <
>>> >>      >>> > > > >> >>>>>>>>>>> rmannibu...@gmail.com
>>> >> <mailto:rmannibu...@gmail.com>>
>>> >>
>>> >>      >>> > > > >> >>>>>>>>>>> wrote:
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>> Hi guys,
>>> >>      >>> > > > >> >>>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>> The subject is a bit provocative but
>>> the
>>> >> topic is
>>> >>      >>> real
>>> >>      >>> > > and
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> coming
>>> >>      >>> > > > >> >>>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>> again and again with the beam usage:
>>> how a
>>> >> dofn can
>>> >>      >>> > > handle
>>> >>      >>> > > > >> some
>>> >>      >>> > > > >> >>>>>>>>>>>> "chunking".
>>> >>      >>> > > > >> >>>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>> The need is to be able to commit each N
>>> >> records but
>>> >>      >>> > with
>>> >>      >>> > > N
>>> >>      >>> > > > not
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> too
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> big.
>>> >>      >>> > > > >> >>>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>> The natural API for that in beam is the
>>> >> bundle one
>>> >>      >>> but
>>> >>      >>> > > > bundles
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> are
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> not
>>> >>      >>> > > > >> >>>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>> reliable since they can be very small
>>> >> (flink) - we
>>> >>      >>> can
>>> >>      >>> > > say
>>> >>      >>> > > > it
>>> >>      >>> > > > >> is
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> "ok"
>>> >>      >>> > > > >> >>>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>> even if it has some perf impacts - or
>>> too
>>> >> big (spark
>>> >>      >>> > does
>>> >>      >>> > > > full
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> size
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> /
>>> >>      >>> > > > >> >>>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>> #workers).
>>> >>      >>> > > > >> >>>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>> The workaround is what we see in the ES
>>> >> I/O: a
>>> >>      >>> maxSize
>>> >>      >>> > > > which
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> does
>>> >>      >>> > > > >> >>>>>
>>> >>      >>> > > > >> >>>>> an
>>> >>      >>> > > > >> >>>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>> eager flush. The issue is that then the
>>> >> checkpoint is
>>> >>      >>> > not
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>> respected
>>> >>      >>> > > > >> >>>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>> and you can process multiple times the
>>> same
>>> >> records.
>>> >>      >>> > > > >> >>>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>> Any plan to make this API reliable and
>>> >> controllable
>>> >>      >>> > from
>>> >>      >>> > > a
>>> >>      >>> > > > >> beam
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>>>> point
>>> >>      >>> > > > >> >>>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>> of view (at least in a max manner)?
>>> >>      >>> > > > >> >>>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>> Thanks,
>>> >>      >>> > > > >> >>>>>>>>>>>> Romain Manni-Bucau
>>> >>      >>> > > > >> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog |
>>> Github |
>>> >> LinkedIn
>>> >>      >>> > > > >> >>>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>>>
>>> >>      >>> > > > >> >>>>>>>>
>>> >>      >>> > > > >> >>>>>
>>> >>      >>> > > > >> >>>>> --
>>> >>      >>> > > > >> >>>>> Jean-Baptiste Onofré
>>> >>      >>> > > > >> >>>>> jbono...@apache.org <mailto:
>>> jbono...@apache.org>
>>> >>      >>> > > > >> >>>>> http://blog.nanthrax.net
>>> >>      >>> > > > >> >>>>> Talend - http://www.talend.com
>>> >>      >>> > > > >> >>>>>
>>> >>      >>> > > > >> >>>
>>> >>      >>> > > > >> >>
>>> >>      >>> > > > >> >
>>> >>      >>> > > > >> > --
>>> >>      >>> > > > >> > Jean-Baptiste Onofré
>>> >>      >>> > > > >> > jbono...@apache.org <mailto:jbono...@apache.org>
>>> >>      >>> > > > >> > http://blog.nanthrax.net
>>> >>      >>> > > > >> > Talend - http://www.talend.com
>>> >>      >>> > > > >>
>>> >>      >>> > > >
>>> >>      >>> > >
>>> >>      >>> >
>>> >>      >>>
>>> >>
>>> >>
>>> >
>>> > --
>>> > Jean-Baptiste Onofré
>>> > jbono...@apache.org
>>> > http://blog.nanthrax.net
>>> > Talend - http://www.talend.com
>>>
>>>
>>
>>

Reply via email to