Shekhar Prasad Rajak created FLINK-39996:
--------------------------------------------
Summary: Migrate KafkaSink EOS recovery to Kafka public
PreparedTxnState API
Key: FLINK-39996
URL: https://issues.apache.org/jira/browse/FLINK-39996
Project: Flink
Issue Type: Improvement
Components: Connectors / Kafka
Affects Versions: kafka-5.0.0
Reporter: Shekhar Prasad Rajak
Fix For: kafka-5.1.0
Flink KafkaSink exactly-once recovery currently persists transactionalId,
producerId, and epoch in KafkaCommittable, then uses
FlinkKafkaInternalProducer.resumeTransaction(...) to restore KafkaProducer
internals via reflection before calling commitTransaction().
This works today, but it depends on Kafka private internals and is fragile
across Kafka client changes. Kafka now has a public 2PC-oriented API shape:
PreparedTxnState state = producer.prepareTransaction();
and intended recovery flow:
producer.initTransactions(true);
producer.completeTransaction(new PreparedTxnState(serializedState));
Flink should eventually migrate KafkaSink EOS recovery to this public API
once Kafka client/broker supports keepPreparedTxn=true end-to-end.
Current Behavior
- ExactlyOnceKafkaWriter.prepareCommit() creates KafkaCommittable from
producer id/epoch/transactional id.
- FlinkKafkaInternalProducer.precommitTransaction() only marks local Flink
wrapper state.
- KafkaCommitter resumes the transaction through
FlinkKafkaInternalProducer.resumeTransaction(...).
- resumeTransaction(...) mutates KafkaProducer internals via reflection.
Expected Behavior
When the Kafka dependency supports public prepared transaction recovery,
Flink should persist:
transactionalId + PreparedTxnState
and recover/commit using public Kafka APIs:
producer.initTransactions(true);
producer.completeTransaction(new PreparedTxnState(serializedPreparedState));
--
This message was sent by Atlassian Jira
(v8.20.10#820010)