Hi, Hang.

There have been a lot of changes made to the TransactionsManager in the
Kafka client in 2022.
(e.g.
https://github.com/apache/kafka/commit/3ea7b418fb3d7e9fc74c27751c1b02b04877f197
).

Version 3.2.3 was the last one when the TransactionsManager class contained
attributes (e.g., topicPartitionBookkeeper) referenced by
flink-connector-kafka (1.17.1).

Thanks once again.
Krzysztof

pon., 11 wrz 2023 o 11:24 Hang Ruan <ruanhang1...@gmail.com> napisał(a):

> Hi, Krzysztof.
>
> I find that this part has been changed in PR[1] when updating the kafka
> client version to 3.4.0.
> This fix is not released yet. Maybe you can package and check it by
> yourself.
>
> Best,
> Hang
>
> [1] https://github.com/apache/flink-connector-kafka/pull/11
>
> Krzysztof Jankiewicz <jankiewicz.krzysz...@gmail.com> 于2023年9月10日周日
> 21:52写道:
>
>> Hi,
>>
>> I am currently working on a simple application that requires exactly-once
>> end-to-end guarantee.
>>
>> I am reading data from Kafka and writing it back to Kafka.
>>
>> When I use `DeliveryGuarantee.AT_LEAST_ONCE` at the Kafka Sink level,
>> everything works fine.
>> Here's the relevant code:
>>
>> KafkaSink<String> sink = KafkaSink.<String>builder()
>>     . . .
>>     .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>>     . . .
>>     .build();
>>
>> Unfortunately, when I switch to DeliveryGuarantee.EXACTLY_ONCE, I
>> encounter the following error during error handling (High Availability mode
>> in k8s)::
>>
>> Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
>>   at
>> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266)
>> ~[flink-connector-kafka-1.17.1.jar:1.17.1]
>> . . .
>> Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper
>>
>> The code causing this issue is as follows
>> (org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer):
>>
>> Object transactionManager = this.getTransactionManager();
>>         synchronized(transactionManager) {
>>             Object topicPartitionBookkeeper =
>> getField(transactionManager, "topicPartitionBookkeeper");
>>             transitionTransactionManagerStateTo(transactionManager,
>> "INITIALIZING");
>>             invoke(topicPartitionBookkeeper, "reset");
>>             setField(transactionManager, "producerIdAndEpoch",
>> createProducerIdAndEpoch(producerId, epoch));
>>
>> I am using Apache Kafka 1.17.1 and Apache Kafka Client
>> (org.apache.kafka:kafka-clients) 3.5.1.
>> I have examined the code of
>> org.apache.kafka.clients.producer.internals.TransactionManager, which is
>> used by org.apache.kafka.clients.producer.KafkaProducer.
>> I can see the producerIdAndEpoch field, but there is no
>> topicPartitionBookkeeper field.
>>
>> Could you please advise which version of KafkaProducer is compatible with
>> the flink-connector-kafka? And am I missing something in my configuration?
>>
>> Kind regards
>> Krzysztof
>>
>

Reply via email to