In case of Elasticsearch: Elasticsearch takes a PCollection<String> with
JSON documents, which may contain a document id. ES will overwrite a
document with the same id if it exists, so in case of retries inserting the
same document multiple times will not lead to duplicates. I guess the
solution is to simply document that, if you'd like to avoid duplicate
documents, make sure to include an ID in your document, rather than letting
ES autogenerate it.

On Fri, Nov 17, 2017 at 9:37 AM Eugene Kirpichov <[email protected]>
wrote:

> The behavior if a bundle has a failure is quite defined: the entire bundle
> is considered failed and processing of the bundle's elements will get
> retried. The level at which retries are performed is unspecified: a runner
> would be allowed to retry the bundle, or it would be allowed to split the
> remaining elements in the PCollection into bundles differently , or it
> would technically be allowed to retry even the entire pipeline or a part of
> the pipeline (think Spark with its RDD lineage). Point is: if a pipeline
> succeeded, then everything has been processed.
>
> Of course, this means you may insert the same element multiple times. I'm
> afraid avoiding this is mathematically impossible and no API can fix this.
> The problem is that, if a failure happens, there is no way to know whether
> documents were actually inserted or not:
> - Suppose your insert RPC to Elasticsearch fails. It could be because
> network disconnected *before* ES received the insert request, but it also
> could be because it disconnected *after* ES received and processed it -
> i.e. the document was inserted, but you didn't receive the RPC response
> saying that.
> - Suppose the Beam bundle fails. It could be because some elements in the
> bundle couldn't be processed at all, or it could be e.g. because all
> elements were processed but something failed later on, e.g. the worker got
> stuck and was unable to report success of the bundle, so the runner decided
> to kill the worker and retry.
>
> Some storage systems, intended to be used in distributed systems, are
> conscious of this fundamental impossibility issue. For example:
> - BigQuery provides an "insert id" used for best-effort *insert-time*
> deduplication
> https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency
> - The design of Beam unbounded sources assumes that publishers may publish
> the same message several times due to retries, so it provides *read-time*
> deduplication via requiresDeduping() and getCurrentRecordId(). Read-time
> deduplication is in general more robust.
>
> (Regarding SDF: I feel like we're going in circles of confusion here. SDF
> is unrelated to batching, it is the reverse of batching in a sense:
> batching lets you process many elements as one, SDF lets you process a
> single element as [potentially infinitely] many)
>
> On Fri, Nov 17, 2017 at 1:02 AM Romain Manni-Bucau <[email protected]>
> wrote:
>
>> Yep, just take ES IO, if a part of a bundle fails you are in an
>> unmanaged state. This is the case for all O (of IO ;)). Issue is not
>> much about "1" (the code it takes) but more the fact it doesn't
>> integrate with runner features and retries potentially: what happens
>> if a bundle has a failure? => undefined today. 2. I'm fine with it
>> while we know exactly what happens when we restart after a bundle
>> failure. With ES the timestamp can be used for instance.
>>
>> Thinking about it the SDF is really a way to do it since the SDF will
>> manage the bulking and associated with the runner "retry" it seems it
>> covers the needs.
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov <[email protected]
>> >:
>> > I must admit I'm still failing to understand the problem, so let's step
>> > back even further.
>> >
>> > Could you give an example of an IO that is currently difficult to
>> implement
>> > specifically because of lack of the feature you're talking about?
>> >
>> > I'm asking because I've reviewed almost all Beam IOs and don't recall
>> > seeing a similar problem. Sure, a lot of IOs do batching within a
>> bundle,
>> > but 1) it doesn't take up much code (granted, it would be even easier if
>> > Beam did it for us) and 2) I don't remember any of them requiring the
>> > batches to be deterministic, and I'm having a hard time imagining what
>> kind
>> > of storage system would be able to deduplicate if batches were
>> > deterministic but wouldn't be able to deduplicate if they weren't.
>> >
>> > On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau <
>> [email protected]>
>> > wrote:
>> >
>> >> Ok, let me try to step back and summarize what we have today and what I
>> >> miss:
>> >>
>> >> 1. we can handle chunking in beam through group in batch (or
>> equivalent)
>> >> but:
>> >>    > it is not built-in into the transforms (IO) and it is controlled
>> >> from outside the transforms so no way for a transform to do it
>> >> properly without handling itself a composition and links between
>> >> multiple dofns to have notifications and potentially react properly or
>> >> handle backpressure from its backend
>> >> 2. there is no restart feature because there is no real state handling
>> >> at the moment. this sounds fully delegated to the runner but I was
>> >> hoping to have more guarantees from the used API to be able to restart
>> >> a pipeline (mainly batch since it can be irrelevant or delegates to
>> >> the backend for streams) and handle only not commited records so it
>> >> requires some persistence outside the main IO storages to do it
>> >> properly
>> >>    > note this is somehow similar to the monitoring topic which miss
>> >> persistence ATM so it can end up to beam to have a pluggable storage
>> >> for a few concerns
>> >>
>> >>
>> >> Short term I would be happy with 1 solved properly, long term I hope 2
>> >> will be tackled without workarounds requiring custom wrapping of IO to
>> >> use a custom state persistence.
>> >>
>> >>
>> >>
>> >> Romain Manni-Bucau
>> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>
>> >>
>> >> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré <[email protected]>:
>> >> > Thanks for the explanation. Agree, we might talk about different
>> things
>> >> > using the same wording.
>> >> >
>> >> > I'm also struggling to understand the use case (for a generic DoFn).
>> >> >
>> >> > Regards
>> >> > JB
>> >> >
>> >> >
>> >> > On 11/17/2017 07:40 AM, Eugene Kirpichov wrote:
>> >> >>
>> >> >> To avoid spending a lot of time pursuing a false path, I'd like to
>> say
>> >> >> straight up that SDF is definitely not going to help here, despite
>> the
>> >> >> fact
>> >> >> that its API includes the term "checkpoint". In SDF, the
>> "checkpoint"
>> >> >> captures the state of processing within a single element. If you're
>> >> >> applying an SDF to 1000 elements, it will, like any other DoFn, be
>> >> applied
>> >> >> to each of them independently and in parallel, and you'll have 1000
>> >> >> checkpoints capturing the state of processing each of these
>> elements,
>> >> >> which
>> >> >> is probably not what you want.
>> >> >>
>> >> >> I'm afraid I still don't understand what kind of checkpoint you
>> need, if
>> >> >> it
>> >> >> is not just deterministic grouping into batches. "Checkpoint" is a
>> very
>> >> >> broad term and it's very possible that everybody in this thread is
>> >> talking
>> >> >> about different things when saying it. So it would help if you could
>> >> give
>> >> >> a
>> >> >> more concrete example: for example, take some IO that you think
>> could be
>> >> >> easier to write with your proposed API, give the contents of a
>> >> >> hypothetical
>> >> >> PCollection being written to this IO, give the code of a
>> hypothetical
>> >> DoFn
>> >> >> implementing the write using your API, and explain what you'd
>> expect to
>> >> >> happen at runtime.
>> >> >>
>> >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau
>> >> >> <[email protected]>
>> >> >> wrote:
>> >> >>
>> >> >>> @Eugene: yes and the other alternative of Reuven too but it is
>> still
>> >> >>> 1. relying on timers, 2. not really checkpointed
>> >> >>>
>> >> >>> In other words it seems all solutions are to create a chunk of
>> size 1
>> >> >>> and replayable to fake the lack of chunking in the framework. This
>> >> >>> always implies a chunk handling outside the component (typically
>> >> >>> before for an output). My point is I think IO need it in their own
>> >> >>> "internal" or at least control it themselves since the chunk size
>> is
>> >> >>> part of the IO handling most of the time.
>> >> >>>
>> >> >>> I think JB spoke of the same "group before" trick using
>> restrictions
>> >> >>> which can work I have to admit if SDF are implemented by runners.
>> Is
>> >> >>> there a roadmap/status on that? Last time I checked SDF was a great
>> >> >>> API without support :(.
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> Romain Manni-Bucau
>> >> >>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> >>>
>> >> >>>
>> >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
>> >> >>> <[email protected]>:
>> >> >>>>
>> >> >>>> JB, not sure what you mean? SDFs and triggers are unrelated, and
>> the
>> >> >>>> post
>> >> >>>> doesn't mention the word. Did you mean something else, e.g.
>> >> restriction
>> >> >>>> perhaps? Either way I don't think SDFs are the solution here; SDFs
>> >> have
>> >> >>>
>> >> >>> to
>> >> >>>>
>> >> >>>> do with the ability to split the processing of *a single element*
>> over
>> >> >>>> multiple calls, whereas Romain I think is asking for repeatable
>> >> grouping
>> >> >>>
>> >> >>> of
>> >> >>>>
>> >> >>>> *multiple* elements.
>> >> >>>>
>> >> >>>> Romain - does
>> >> >>>>
>> >> >>>
>> >> >>>
>> >>
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
>> >> >>>>
>> >> >>>> do what
>> >> >>>> you want?
>> >> >>>>
>> >> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <
>> >> [email protected]>
>> >> >>>> wrote:
>> >> >>>>
>> >> >>>>> It sounds like the "Trigger" in the Splittable DoFn, no ?
>> >> >>>>>
>> >> >>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>> >> >>>>>
>> >> >>>>> Regards
>> >> >>>>> JB
>> >> >>>>>
>> >> >>>>>
>> >> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
>> >> >>>>>>
>> >> >>>>>> it gives the fn/transform the ability to save a state - it can
>> get
>> >> >>>>>> back on "restart" / whatever unit we can use, probably runner
>> >> >>>>>> dependent? Without that you need to rewrite all IO usage with
>> >> >>>>>> something like the previous pattern which makes the IO not self
>> >> >>>>>> sufficient and kind of makes the entry cost and usage of beam
>> way
>> >> >>>>>> further.
>> >> >>>>>>
>> >> >>>>>> In my mind it is exactly what jbatch/spring-batch uses but
>> adapted
>> >> to
>> >> >>>>>> beam (stream in particular) case.
>> >> >>>>>>
>> >> >>>>>> Romain Manni-Bucau
>> >> >>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax <[email protected]
>> >:
>> >> >>>>>>>
>> >> >>>>>>> Romain,
>> >> >>>>>>>
>> >> >>>>>>> Can you define what you mean by checkpoint? What are the
>> semantics,
>> >> >>>
>> >> >>> what
>> >> >>>>>>>
>> >> >>>>>>> does it accomplish?
>> >> >>>>>>>
>> >> >>>>>>> Reuven
>> >> >>>>>>>
>> >> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <
>> >> >>>>>
>> >> >>>>> [email protected]>
>> >> >>>>>>>
>> >> >>>>>>> wrote:
>> >> >>>>>>>
>> >> >>>>>>>> Yes, what I propose earlier was:
>> >> >>>>>>>>
>> >> >>>>>>>> I. checkpoint marker:
>> >> >>>>>>>>
>> >> >>>>>>>> @AnyBeamAnnotation
>> >> >>>>>>>> @CheckpointAfter
>> >> >>>>>>>> public void someHook(SomeContext ctx);
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>> II. pipeline.apply(ParDo.of(new
>> >> MyFn()).withCheckpointAlgorithm(new
>> >> >>>>>>>> CountingAlgo()))
>> >> >>>>>>>>
>> >> >>>>>>>> III. (I like this one less)
>> >> >>>>>>>>
>> >> >>>>>>>> // in the dofn
>> >> >>>>>>>> @CheckpointTester
>> >> >>>>>>>> public boolean shouldCheckpoint();
>> >> >>>>>>>>
>> >> >>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); in the dofn
>> per
>> >> >>>
>> >> >>> element
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>> Romain Manni-Bucau
>> >> >>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi
>> <[email protected]
>> >> >>>>
>> >> >>>> :
>> >> >>>>>>>>>
>> >> >>>>>>>>> How would you define it (rough API is fine)?. Without more
>> >> details,
>> >> >>>>>
>> >> >>>>> it is
>> >> >>>>>>>>>
>> >> >>>>>>>>> not easy to see wider applicability and feasibility in
>> runners.
>> >> >>>>>>>>>
>> >> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau <
>> >> >>>>>>>>
>> >> >>>>>>>> [email protected]>
>> >> >>>>>>>>>
>> >> >>>>>>>>> wrote:
>> >> >>>>>>>>>
>> >> >>>>>>>>>> This is a fair summary of the current state but also where
>> beam
>> >> >>>
>> >> >>> can
>> >> >>>>>>>>
>> >> >>>>>>>> have a
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> very strong added value and make big data great and smooth.
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> Instead of this replay feature isnt checkpointing willable?
>> In
>> >> >>>>>>>>
>> >> >>>>>>>> particular
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> with SDF no?
>> >> >>>>>>>>>>
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi"
>> >> <[email protected]>
>> >> >>>
>> >> >>> a
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> écrit :
>> >> >>>>>>>>>>
>> >> >>>>>>>>>>> Core issue here is that there is no explicit concept of
>> >> >>>
>> >> >>> 'checkpoint'
>> >> >>>>>>>>
>> >> >>>>>>>> in
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Beam (UnboundedSource has a method 'getCheckpointMark' but
>> that
>> >> >>>>>>>>
>> >> >>>>>>>> refers to
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> the checkoint on external source). Runners do checkpoint
>> >> >>>
>> >> >>> internally
>> >> >>>>>
>> >> >>>>> as
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> implementation detail. Flink's checkpoint model is entirely
>> >> >>>>>
>> >> >>>>> different
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> from
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Dataflow's and Spark's.
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> @StableReplay helps, but it does not explicitly talk about
>> a
>> >> >>>>>>>>
>> >> >>>>>>>> checkpoint
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> by
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> design.
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> If you are looking to achieve some guarantees with a
>> >> sink/DoFn, I
>> >> >>>>>>>>
>> >> >>>>>>>> think
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> it
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> is better to start with the requirements. I worked on
>> >> >>>
>> >> >>> exactly-once
>> >> >>>>>>>>
>> >> >>>>>>>> sink
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> for
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where we essentially
>> >> >>>
>> >> >>> reshard
>> >> >>>>>>>>
>> >> >>>>>>>> the
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> elements and assign sequence numbers to elements with in
>> each
>> >> >>>
>> >> >>> shard.
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Duplicates in replays are avoided based on these sequence
>> >> >>>
>> >> >>> numbers.
>> >> >>>>>>>>
>> >> >>>>>>>> DoFn
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> state API is used to buffer out-of order replays. The
>> >> >>>
>> >> >>> implementation
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> strategy works in Dataflow but not in Flink which has a
>> >> >>>
>> >> >>> horizontal
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> checkpoint. KafkaIO checks for compatibility.
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau <
>> >> >>>>>>>>>>> [email protected]>
>> >> >>>>>>>>>>> wrote:
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>> Hi guys,
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> The subject is a bit provocative but the topic is real and
>> >> >>>
>> >> >>> coming
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> again and again with the beam usage: how a dofn can handle
>> >> some
>> >> >>>>>>>>>>>> "chunking".
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> The need is to be able to commit each N records but with
>> N not
>> >> >>>
>> >> >>> too
>> >> >>>>>>>>
>> >> >>>>>>>> big.
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> The natural API for that in beam is the bundle one but
>> bundles
>> >> >>>
>> >> >>> are
>> >> >>>>>>>>
>> >> >>>>>>>> not
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> reliable since they can be very small (flink) - we can
>> say it
>> >> is
>> >> >>>>>>>>
>> >> >>>>>>>> "ok"
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> even if it has some perf impacts - or too big (spark does
>> full
>> >> >>>
>> >> >>> size
>> >> >>>>>>>>
>> >> >>>>>>>> /
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> #workers).
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> The workaround is what we see in the ES I/O: a maxSize
>> which
>> >> >>>
>> >> >>> does
>> >> >>>>>
>> >> >>>>> an
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> eager flush. The issue is that then the checkpoint is not
>> >> >>>
>> >> >>> respected
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> and you can process multiple times the same records.
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> Any plan to make this API reliable and controllable from a
>> >> beam
>> >> >>>>>>>>
>> >> >>>>>>>> point
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> of view (at least in a max manner)?
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> Thanks,
>> >> >>>>>>>>>>>> Romain Manni-Bucau
>> >> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>
>> >> >>>>> --
>> >> >>>>> Jean-Baptiste Onofré
>> >> >>>>> [email protected]
>> >> >>>>> http://blog.nanthrax.net
>> >> >>>>> Talend - http://www.talend.com
>> >> >>>>>
>> >> >>>
>> >> >>
>> >> >
>> >> > --
>> >> > Jean-Baptiste Onofré
>> >> > [email protected]
>> >> > http://blog.nanthrax.net
>> >> > Talend - http://www.talend.com
>> >>
>>
>

Reply via email to