I think you don’t need to enable EOS in this case since KafkaIO has a dedicated 
EOS-sink implementation for Beam part (btw, it’s not supported by all runners) 
and it relies on setting “enable.idempotence=true” for KafkaProducer.
I’m not sure that you can achieve “at least once” semantics with current 
KafkaIO implementation.

> On 16 Jun 2020, at 17:16, Eleanore Jin <eleanore....@gmail.com> wrote:
> 
> Hi All, 
> 
> I previously asked a few questions regarding enable EOS (exactly once 
> semantics) please see below.
> 
> Our Beam pipeline uses KafkaIO to read from source topic, and then use 
> KafkaIO to publish to sink topic.
> 
> According to Max's answer to my previous questions, enable EOS with KafkaIO 
> will introduce latency, 
> as only after checkpoints of all message within the checkpoint interval, then 
> the KakfaIO.ExactlyOnceWriter
> processElement method will be called. So the latency depends on the 
> checkpoint interval.
> 
> I just wonder if I relax to At Least Once, do I still need to enable EOS on 
> KafkaIO? Or it is not required?
> If not, can you please provide some instruction how should it be done?
> 
> Thanks a lot!
> Eleanore
> 
> > Thanks for the response! the reason to setup the state backend is to
> > experiment Kafka EOS with Beam running on Flink.  Reading through the
> > code and this PR <https://github.com/apache/beam/pull/7991/files 
> > <https://github.com/apache/beam/pull/7991/files>>, can
> > you please help me clarify my understanding?
> >
> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
> > EOS, ExactlyOnceWriter processElement method is annotated
> > with @RequiresStableInput, so all the messages will be cached
> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
> > messages will be processed by ExactlyOnceWriter?
> 
> That's correct.
> 
> >
> > 2. Upon checkpoint, will those messages cached by
> > KeyedBufferingEleementsHandler also checkpointed?
> 
> Yes, the buffered elements will be checkpointed.
> 
> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
> > stream processing, the delay is based on the checkpoint interval? How to
> > reduce the latency while still have EOS guarantee?
> 
> Indeed, the checkpoint interval and the checkpoint duration limits the
> latency. Given the current design and the guarantees, there is no other
> way to influence the latency.
> 
> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
> > checkpoint successfully, the checkpointed offset will be committed back
> > to kafka, but if this operation does not finish successfully, and then
> > the job gets cancelled/stopped, and re-submit the job again (with the
> > same consumer group for source topics, but different jobID), then it is
> > possible duplicated processing still exists? because the consumed offset
> > is not committed back to kafka?
> 
> This option is for the Kafka consumer. AFAIK this is just a convenience
> method to commit the latest checkpointed offset to Kafka. This offset is
> not used when restoring from a checkpoint. However, if you don't restore
> from a checkpoint, you can resume from that offset which might be
> convenient or not, depending on your use case.
> 
> 

Reply via email to