This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 4a57c6d2289 KAFKA-19999 Transaction coordinator livelock caused by
invalid producer epoch (#21176)
4a57c6d2289 is described below
commit 4a57c6d22894a0cd2504a2eefda42fc3aec86a9b
Author: Ken Huang <[email protected]>
AuthorDate: Sun Dec 28 12:50:08 2025 +0800
KAFKA-19999 Transaction coordinator livelock caused by invalid producer
epoch (#21176)
In Transaction Version 2, strict epoch validation (`markerEpoch >
currentEpoch`) causes hanging transactions in two scenarios:
1. **Coordinator recovery**: When reloading PREPARE_COMMIT/ABORT from
transaction log, retried markers are rejected with
`InvalidProducerEpochException` because they use the same epoch
2. **Network retry**: When marker write succeeds but response is lost,
coordinator retries are rejected for the same reason
Both cases leave transactions permanently hanging in PREPARE state,
causing clients to fail with `CONCURRENT_TRANSACTIONS`.
Detect idempotent marker retries in
`ProducerStateManager.checkProducerEpoch()` by checking three
conditions:
1. Transaction Version ≥ 2
2. markerEpoch == currentEpoch (same epoch)
3. currentTxnFirstOffset is empty (transaction already completed)
When all conditions are met, treat the marker as a successful idempotent
retry instead of throwing an error.
Reviewers: Justine Olshan <[email protected]>, TaiJuWu
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../storage/internals/log/ProducerAppendInfo.java | 16 ++++++++++++++++
.../internals/log/ProducerStateManagerTest.java | 20 ++++++++++++++++++++
2 files changed, 36 insertions(+)
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
index 1f182444a87..976bd79e948 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
@@ -119,6 +119,22 @@ public class ProducerAppendInfo {
boolean invalidEpoch = (transactionVersion >= 2) ? (producerEpoch <=
current) : (producerEpoch < current);
if (invalidEpoch) {
+ // TV2 Idempotent Marker Retry Detection (KAFKA-19999):
+ // When markerEpoch == currentEpoch and no transaction is ongoing,
this indicates
+ // a retry of a marker that was already successfully written.
Common scenarios:
+ // 1. Coordinator recovery: reloading PREPARE_COMMIT/ABORT from
transaction log
+ // 2. Network retry: marker was written but response was lost due
to disconnection
+ // In both cases, the transaction has already ended
(currentTxnFirstOffset is empty).
+ // We suppress the InvalidProducerEpochException and allow the
duplicate marker to
+ // be written to the log.
+ if (transactionVersion >= 2 &&
+ producerEpoch == current &&
+ updatedEntry.currentTxnFirstOffset().isEmpty()) {
+ log.info("Idempotent transaction marker retry detected for
producer {} epoch {}. " +
+ "Transaction already completed, allowing
duplicate marker write.",
+ producerId, producerEpoch);
+ return;
+ }
String comparison = (transactionVersion >= 2) ? "<=" : "<";
String message = "Epoch of producer " + producerId + " at offset "
+ offset + " in " + topicPartition +
" is " + producerEpoch + ", which is " + comparison + "
the last seen epoch " + current +
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
index 4d737de4ab4..b504fb76636 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
@@ -1090,6 +1090,26 @@ public class ProducerStateManagerTest {
assertNull(stateManager.verificationStateEntry(producerId));
}
+ @Test
+ public void testIdempotentTransactionMarkerRetryTV2() {
+ short transactionVersion = 2;
+ appendClientEntry(stateManager, producerId, epoch, defaultSequence,
99, true);
+ short markerEpoch = (short) (epoch + 1);
+ appendEndTxnMarker(stateManager, producerId, markerEpoch,
ControlRecordType.COMMIT, 100, transactionVersion);
+
+ ProducerStateEntry entry =
getLastEntryOrElseThrownByProducerId(stateManager, producerId);
+ assertEquals(markerEpoch, entry.producerEpoch());
+ assertEquals(OptionalLong.empty(), entry.currentTxnFirstOffset());
+
+ assertDoesNotThrow(() ->
+ appendEndTxnMarker(stateManager, producerId, markerEpoch,
ControlRecordType.COMMIT, 101, transactionVersion)
+ );
+
+ ProducerStateEntry entryAfterRetry =
getLastEntryOrElseThrownByProducerId(stateManager, producerId);
+ assertEquals(markerEpoch, entryAfterRetry.producerEpoch());
+ assertEquals(OptionalLong.empty(),
entryAfterRetry.currentTxnFirstOffset());
+ }
+
@Test
public void testRejectNonZeroSequenceForTransactionsV2WithEmptyState() {
// Create a verification state entry that supports epoch bump
(transactions v2)