Hi, I am currently working on a simple application that requires exactly-once end-to-end guarantee.
I am reading data from Kafka and writing it back to Kafka. When I use `DeliveryGuarantee.AT_LEAST_ONCE` at the Kafka Sink level, everything works fine. Here's the relevant code: KafkaSink<String> sink = KafkaSink.<String>builder() . . . .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) . . . .build(); Unfortunately, when I switch to DeliveryGuarantee.EXACTLY_ONCE, I encounter the following error during error handling (High Availability mode in k8s):: Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266) ~[flink-connector-kafka-1.17.1.jar:1.17.1] . . . Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper The code causing this issue is as follows (org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer): Object transactionManager = this.getTransactionManager(); synchronized(transactionManager) { Object topicPartitionBookkeeper = getField(transactionManager, "topicPartitionBookkeeper"); transitionTransactionManagerStateTo(transactionManager, "INITIALIZING"); invoke(topicPartitionBookkeeper, "reset"); setField(transactionManager, "producerIdAndEpoch", createProducerIdAndEpoch(producerId, epoch)); I am using Apache Kafka 1.17.1 and Apache Kafka Client (org.apache.kafka:kafka-clients) 3.5.1. I have examined the code of org.apache.kafka.clients.producer.internals.TransactionManager, which is used by org.apache.kafka.clients.producer.KafkaProducer. I can see the producerIdAndEpoch field, but there is no topicPartitionBookkeeper field. Could you please advise which version of KafkaProducer is compatible with the flink-connector-kafka? And am I missing something in my configuration? Kind regards Krzysztof