I think you don’t need to enable EOS in this case since KafkaIO has a dedicated EOS-sink implementation for Beam part (btw, it’s not supported by all runners) and it relies on setting “enable.idempotence=true” for KafkaProducer. I’m not sure that you can achieve “at least once” semantics with current KafkaIO implementation.
> On 16 Jun 2020, at 17:16, Eleanore Jin <eleanore....@gmail.com> wrote: > > Hi All, > > I previously asked a few questions regarding enable EOS (exactly once > semantics) please see below. > > Our Beam pipeline uses KafkaIO to read from source topic, and then use > KafkaIO to publish to sink topic. > > According to Max's answer to my previous questions, enable EOS with KafkaIO > will introduce latency, > as only after checkpoints of all message within the checkpoint interval, then > the KakfaIO.ExactlyOnceWriter > processElement method will be called. So the latency depends on the > checkpoint interval. > > I just wonder if I relax to At Least Once, do I still need to enable EOS on > KafkaIO? Or it is not required? > If not, can you please provide some instruction how should it be done? > > Thanks a lot! > Eleanore > > > Thanks for the response! the reason to setup the state backend is to > > experiment Kafka EOS with Beam running on Flink. Reading through the > > code and this PR <https://github.com/apache/beam/pull/7991/files > > <https://github.com/apache/beam/pull/7991/files>>, can > > you please help me clarify my understanding? > > > > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve > > EOS, ExactlyOnceWriter processElement method is annotated > > with @RequiresStableInput, so all the messages will be cached > > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those > > messages will be processed by ExactlyOnceWriter? > > That's correct. > > > > > 2. Upon checkpoint, will those messages cached by > > KeyedBufferingEleementsHandler also checkpointed? > > Yes, the buffered elements will be checkpointed. > > > 3. It seems the way Beam provides Kafka EOS will introduce delays in the > > stream processing, the delay is based on the checkpoint interval? How to > > reduce the latency while still have EOS guarantee? > > Indeed, the checkpoint interval and the checkpoint duration limits the > latency. Given the current design and the guarantees, there is no other > way to influence the latency. > > > 4. commitOffsetsInFinalize is also enabled, does this mean, upon > > checkpoint successfully, the checkpointed offset will be committed back > > to kafka, but if this operation does not finish successfully, and then > > the job gets cancelled/stopped, and re-submit the job again (with the > > same consumer group for source topics, but different jobID), then it is > > possible duplicated processing still exists? because the consumed offset > > is not committed back to kafka? > > This option is for the Kafka consumer. AFAIK this is just a convenience > method to commit the latest checkpointed offset to Kafka. This offset is > not used when restoring from a checkpoint. However, if you don't restore > from a checkpoint, you can resume from that offset which might be > convenient or not, depending on your use case. > >