Hi, Hang. There have been a lot of changes made to the TransactionsManager in the Kafka client in 2022. (e.g. https://github.com/apache/kafka/commit/3ea7b418fb3d7e9fc74c27751c1b02b04877f197 ).
Version 3.2.3 was the last one when the TransactionsManager class contained attributes (e.g., topicPartitionBookkeeper) referenced by flink-connector-kafka (1.17.1). Thanks once again. Krzysztof pon., 11 wrz 2023 o 11:24 Hang Ruan <ruanhang1...@gmail.com> napisał(a): > Hi, Krzysztof. > > I find that this part has been changed in PR[1] when updating the kafka > client version to 3.4.0. > This fix is not released yet. Maybe you can package and check it by > yourself. > > Best, > Hang > > [1] https://github.com/apache/flink-connector-kafka/pull/11 > > Krzysztof Jankiewicz <jankiewicz.krzysz...@gmail.com> 于2023年9月10日周日 > 21:52写道: > >> Hi, >> >> I am currently working on a simple application that requires exactly-once >> end-to-end guarantee. >> >> I am reading data from Kafka and writing it back to Kafka. >> >> When I use `DeliveryGuarantee.AT_LEAST_ONCE` at the Kafka Sink level, >> everything works fine. >> Here's the relevant code: >> >> KafkaSink<String> sink = KafkaSink.<String>builder() >> . . . >> .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) >> . . . >> .build(); >> >> Unfortunately, when I switch to DeliveryGuarantee.EXACTLY_ONCE, I >> encounter the following error during error handling (High Availability mode >> in k8s):: >> >> Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version >> at >> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266) >> ~[flink-connector-kafka-1.17.1.jar:1.17.1] >> . . . >> Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper >> >> The code causing this issue is as follows >> (org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer): >> >> Object transactionManager = this.getTransactionManager(); >> synchronized(transactionManager) { >> Object topicPartitionBookkeeper = >> getField(transactionManager, "topicPartitionBookkeeper"); >> transitionTransactionManagerStateTo(transactionManager, >> "INITIALIZING"); >> invoke(topicPartitionBookkeeper, "reset"); >> setField(transactionManager, "producerIdAndEpoch", >> createProducerIdAndEpoch(producerId, epoch)); >> >> I am using Apache Kafka 1.17.1 and Apache Kafka Client >> (org.apache.kafka:kafka-clients) 3.5.1. >> I have examined the code of >> org.apache.kafka.clients.producer.internals.TransactionManager, which is >> used by org.apache.kafka.clients.producer.KafkaProducer. >> I can see the producerIdAndEpoch field, but there is no >> topicPartitionBookkeeper field. >> >> Could you please advise which version of KafkaProducer is compatible with >> the flink-connector-kafka? And am I missing something in my configuration? >> >> Kind regards >> Krzysztof >> >