Multiple runners like Flink, Dataflow and others do provide something stronger than at-least-once processing: The results produced by the pipeline are as if each element was processed exactly-once. Each individual element might be processed more than once in case of retries, but the internal checkpointing ensures that they duplicates are handled correctly. There is a two-part description of how Dataflow achieves it, I highly part 1 <https://cloud.google.com/blog/products/gcp/after-lambda-exactly-once-processing-in-google-cloud-dataflow-part-1>, part 2 <https://cloud.google.com/blog/products/gcp/after-lambda-exactly-once-processing-in-cloud-dataflow-part-2-ensuring-low-latency> .
One way currently Dataflow allows restarts of a pipeline with same guarantees is through 'update <https://cloud.google.com/dataflow/pipelines/updating-a-pipeline>', which ensures existing checkpoints and state are carried over. I don't know if there is a single document describing this for multiple runners. That would be very useful. These guarantees are provided even in the case of simultaneous failures in different parts. It is possible to imagine some severe failures that could effectively stall the pipeline, but should not produce incorrect results. Raghu. On Wed, Aug 22, 2018 at 2:04 PM Micah Whitacre <[email protected]> wrote: > Sorry I mistyped, we need "at least once" processing. <facepalm/> > > On Wed, Aug 22, 2018 at 9:39 AM, Micah Whitacre <[email protected]> > wrote: > >> > Could you describe your durability requirements a bit more? >> >> The requirement is that we need "at most once" processing of the data. >> So I'm perfectly happy retrying the processing. I'm more concerned about >> data loss/skipping data in the event of processing failures, pipeline >> operations (starting/restarting), failures in the runner underlying >> infrastructure, and the fun use cases when they might happen at the same >> time (e.g. underlying infrastructure problems that cause processing >> failures and need us to restart the pipeline :)). >> >> Are there any good resources talking about the differences at the >> boundaries or the assumed guarantees? >> >> On Tue, Aug 21, 2018 at 5:05 PM, Raghu Angadi <[email protected]> wrote: >> >>> >>> On Tue, Aug 21, 2018 at 2:49 PM Micah Whitacre <[email protected]> >>> wrote: >>> >>>> > Is there a reason you can't trust the runner to be durable storage >>>> for inprocess work? >>>> >>>> That's a fair question. Are there any good resources documenting the >>>> durability/stability of the different runners? I assume there are some >>>> stability requirements regarding its handling of "bundles" but it would be >>>> nice to have that info available. One of the reasons we are targeting the >>>> Direct runner is to let us work with the project and let us temporarily >>>> delay picking a runner. Durability seems like another important aspect to >>>> evaluate. >>>> >>> >>> Could you describe your durability requirements a bit more? >>> All the major runners comparable durability guarantees on processing >>> within a running pipeline (these are required for Beam model). The >>> differences arise at the boundaries: what happens when you stop the >>> pipeline, can the pipeline be updated with new code with the old state, >>> etc. >>> >>> An often confusing area is about side effects (like committing Kafka >>> offsets in your case).. the users always have to assume that processing >>> might be retried (even if it rarely occurs). >>> >>> >>>> >>>> On Tue, Aug 21, 2018 at 4:24 PM, Raghu Angadi <[email protected]> >>>> wrote: >>>> >>>>> On Tue, Aug 21, 2018 at 2:04 PM Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> Is there a reason you can't trust the runner to be durable storage >>>>>> for inprocess work? >>>>>> >>>>>> I can understand that the DirectRunner only stores things in memory >>>>>> but other runners have stronger durability guarantees. >>>>>> >>>>> >>>>> I think the requirement is about producing a side effect (committing >>>>> offsets to Kafka) after some processing completes in the pipeline. Wait() >>>>> transform helps with that. The the user still has to commit the offsets >>>>> explicitly and can't get similar functionality in KafkaIO. >>>>> >>>>> >>>>>> On Tue, Aug 21, 2018 at 9:58 AM Raghu Angadi <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> I think by 'KafkaUnboundedSource checkpointing' you mean enabling >>>>>>> 'commitOffsetsInFinalize()' on KafkaIO source. >>>>>>> It is better option than enable.auto.commit, but does not exactly do >>>>>>> what you want in this moment. It is invoked after the first stage >>>>>>> ('Simple >>>>>>> Transformation' in your case). This is certainly true for Dataflow and I >>>>>>> think is also the case for DirectRunner. >>>>>>> >>>>>>> I don't see way to leverage built-in checkpoint for consistency >>>>>>> externally. You would have to manually commit offsets. >>>>>>> >>>>>>> On Tue, Aug 21, 2018 at 8:55 AM Micah Whitacre <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> I'm starting with a very simple pipeline that will read from Kafka >>>>>>>> -> Simple Transformation -> GroupByKey -> Persist the data. We are >>>>>>>> also >>>>>>>> applying some simple windowing/triggering that will persist the data >>>>>>>> after >>>>>>>> every 100 elements or every 60 seconds to balance slow trickles of >>>>>>>> data as >>>>>>>> well as not storing too much in memory. For now I'm just running with >>>>>>>> the >>>>>>>> DirectRunner since this is just a small processing problem. >>>>>>>> >>>>>>>> With the potential for failure during the persisting of the data, >>>>>>>> we want to ensure that the Kafka offsets are not updated until we have >>>>>>>> successfully persisted the data. Looking at KafkaIO it seems like our >>>>>>>> two >>>>>>>> options for persisting offsets are: >>>>>>>> * Kafka's enable.auto.commit >>>>>>>> * KafkaUnboundedSource checkpointing. >>>>>>>> >>>>>>>> The first option would commit prematurely before we could guarantee >>>>>>>> the data was persisted. I can't unfortunately find many details about >>>>>>>> the >>>>>>>> checkpointing so I was wondering if there was a way to configure it or >>>>>>>> tune >>>>>>>> it more appropriately. >>>>>>>> >>>>>>>> Specifically I'm hoping to understand the flow so I can rely on the >>>>>>>> built in KafkaIO functionality without having to write our own offset >>>>>>>> management. Or is it more common to write your own? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Micah >>>>>>>> >>>>>>> >>>> >> >
