Shekhar Prasad Rajak created FLINK-39996:
--------------------------------------------

             Summary: Migrate KafkaSink EOS recovery to Kafka public 
PreparedTxnState API
                 Key: FLINK-39996
                 URL: https://issues.apache.org/jira/browse/FLINK-39996
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
    Affects Versions: kafka-5.0.0
            Reporter: Shekhar Prasad Rajak
             Fix For: kafka-5.1.0


Flink KafkaSink exactly-once recovery currently persists transactionalId, 
producerId, and epoch in KafkaCommittable, then uses 
FlinkKafkaInternalProducer.resumeTransaction(...) to restore KafkaProducer
  internals via reflection before calling commitTransaction().

  This works today, but it depends on Kafka private internals and is fragile 
across Kafka client changes. Kafka now has a public 2PC-oriented API shape:

  PreparedTxnState state = producer.prepareTransaction();

  and intended recovery flow:

  producer.initTransactions(true);
  producer.completeTransaction(new PreparedTxnState(serializedState));

  Flink should eventually migrate KafkaSink EOS recovery to this public API 
once Kafka client/broker supports keepPreparedTxn=true end-to-end.

 

 

Current Behavior

  - ExactlyOnceKafkaWriter.prepareCommit() creates KafkaCommittable from 
producer id/epoch/transactional id.
  - FlinkKafkaInternalProducer.precommitTransaction() only marks local Flink 
wrapper state.
  - KafkaCommitter resumes the transaction through 
FlinkKafkaInternalProducer.resumeTransaction(...).
  - resumeTransaction(...) mutates KafkaProducer internals via reflection.

 

 Expected Behavior
  When the Kafka dependency supports public prepared transaction recovery, 
Flink should persist:

  transactionalId + PreparedTxnState

  and recover/commit using public Kafka APIs:

  producer.initTransactions(true);
  producer.completeTransaction(new PreparedTxnState(serializedPreparedState));



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to