[
https://issues.apache.org/jira/browse/FLINK-39996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18091680#comment-18091680
]
Shekhar Prasad Rajak commented on FLINK-39996:
----------------------------------------------
I am looking into this.
But we have to work on these items first
https://issues.apache.org/jira/browse/KAFKA-20739
https://issues.apache.org/jira/browse/KAFKA-20738
> Migrate KafkaSink EOS recovery to Kafka public PreparedTxnState API
> -------------------------------------------------------------------
>
> Key: FLINK-39996
> URL: https://issues.apache.org/jira/browse/FLINK-39996
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: kafka-5.0.0
> Reporter: Shekhar Prasad Rajak
> Priority: Major
> Fix For: kafka-5.1.0
>
>
> Flink KafkaSink exactly-once recovery currently persists transactionalId,
> producerId, and epoch in KafkaCommittable, then uses
> FlinkKafkaInternalProducer.resumeTransaction(...) to restore KafkaProducer
> internals via reflection before calling commitTransaction().
> This works today, but it depends on Kafka private internals and is fragile
> across Kafka client changes. Kafka now has a public 2PC-oriented API shape:
> PreparedTxnState state = producer.prepareTransaction();
> and intended recovery flow:
> producer.initTransactions(true);
> producer.completeTransaction(new PreparedTxnState(serializedState));
> Flink should eventually migrate KafkaSink EOS recovery to this public API
> once Kafka client/broker supports keepPreparedTxn=true end-to-end.
>
>
> Current Behavior
> - ExactlyOnceKafkaWriter.prepareCommit() creates KafkaCommittable from
> producer id/epoch/transactional id.
> - FlinkKafkaInternalProducer.precommitTransaction() only marks local Flink
> wrapper state.
> - KafkaCommitter resumes the transaction through
> FlinkKafkaInternalProducer.resumeTransaction(...).
> - resumeTransaction(...) mutates KafkaProducer internals via reflection.
>
> Expected Behavior
> When the Kafka dependency supports public prepared transaction recovery,
> Flink should persist:
> transactionalId + PreparedTxnState
> and recover/commit using public Kafka APIs:
> producer.initTransactions(true);
> producer.completeTransaction(new PreparedTxnState(serializedPreparedState));
--
This message was sent by Atlassian Jira
(v8.20.10#820010)