Hi team

I'm subscribing 2 topics from Kafka Consumer, joining them and publishing
back to a new topic via KafkaProducer (with Exactly Once semantic)

As it's highly recommended to set uid for each operator, I'm curious how
this works. For example,

val topicASource = env
  .addSource(topicAConsumer)
  .uid("topicAConsumer")

val topicBSource = env
  .addSource(topicAConsumer)
  .uid("topicAConsumer")

val result = joinstream(env, topicASource, topicBSource)
  .uid("transformer")

val topicCSink = result
  .addSink(topicCProducer)
  .uid("topicCProducer")


in this code, is it necessary to set the UID of the transformer? If the
consumer offset is not committed until it finally gets published to sink,
will consumers replaying from offset from previous
checkpoint guarantee exactly once? even though transformer state is lost
when restarting?

Reply via email to