Hi,

I am currently working on a simple application that requires exactly-once
end-to-end guarantee.

I am reading data from Kafka and writing it back to Kafka.

When I use `DeliveryGuarantee.AT_LEAST_ONCE` at the Kafka Sink level,
everything works fine.
Here's the relevant code:

KafkaSink<String> sink = KafkaSink.<String>builder()
    . . .
    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    . . .
    .build();

Unfortunately, when I switch to DeliveryGuarantee.EXACTLY_ONCE, I encounter
the following error during error handling (High Availability mode in k8s)::

Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
  at
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266)
~[flink-connector-kafka-1.17.1.jar:1.17.1]
. . .
Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper

The code causing this issue is as follows
(org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer):

Object transactionManager = this.getTransactionManager();
        synchronized(transactionManager) {
            Object topicPartitionBookkeeper = getField(transactionManager,
"topicPartitionBookkeeper");
            transitionTransactionManagerStateTo(transactionManager,
"INITIALIZING");
            invoke(topicPartitionBookkeeper, "reset");
            setField(transactionManager, "producerIdAndEpoch",
createProducerIdAndEpoch(producerId, epoch));

I am using Apache Kafka 1.17.1 and Apache Kafka Client
(org.apache.kafka:kafka-clients) 3.5.1.
I have examined the code of
org.apache.kafka.clients.producer.internals.TransactionManager, which is
used by org.apache.kafka.clients.producer.KafkaProducer.
I can see the producerIdAndEpoch field, but there is no
topicPartitionBookkeeper field.

Could you please advise which version of KafkaProducer is compatible with
the flink-connector-kafka? And am I missing something in my configuration?

Kind regards
Krzysztof

Reply via email to