Artem Livshits created KAFKA-20310:
--------------------------------------
Summary: nextProducerId and prevProducerId fields never persisted
to transaction log
Key: KAFKA-20310
URL: https://issues.apache.org/jira/browse/KAFKA-20310
Project: Kafka
Issue Type: Bug
Components: core
Affects Versions: 4.0.0
Reporter: Artem Livshits
The valueToBytes() method in
[TransactionLog.java|https://github.com/apache/kafka/blob/7d53410d8febf4e39a65cba75951a941fe83c337/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java#L87]
never writes nextProducerId and prevProducerId.
*Scenario1: Coordinator failover during PREPARE_COMMIT/PREPARE_ABORT with epoch
exhaustion*
1. Transaction in PREPARE_COMMIT state with epoch exhaustion:
- producerId = 100, producerEpoch = 32767 (Short.MAX_VALUE)
- nextProducerId = 101, nextProducerEpoch = 0
2. Metadata written to log, but nextProducerId/nextProducerEpoch are not
persisted
3. Coordinator leadership changes
4. On recovery, transaction log read gives:
- producerId = 100, producerEpoch = 32767
- nextProducerId = -1 (NO_PRODUCER_ID)
- nextProducerEpoch = -1 (NO_PRODUCER_EPOCH)
5. Transaction markers complete, prepareComplete() is called:
Line 331-333 in TransactionMetadata.java:
if (clientTransactionVersion.supportsEpochBump() && nextProducerId !=
RecordBatch.NO_PRODUCER_ID) {
data.producerId = nextProducerId; // Would rotate to 101
data.producerEpoch = hasNextProducerEpoch() ? nextProducerEpoch : 0;
} else {
data.producerId = producerId; // STUCK at 100
data.producerEpoch = clientProducerEpoch(); // STUCK at 32767
}
6. Because nextProducerId is -1, the else branch executes
7. Producer stays at (100, 32767) - epoch exhausted
8. Next InitProducerId fails: "Cannot allocate any more producer epochs"
*Scenario 2: Client retry after coordinator failover during epoch rotation*
1. Client has producer (100, 32766), epoch exhaustion imminent
2. InitProducerId triggers epoch rotation:
- prevProducerId = 100
- producerId = 101, producerEpoch = 0
3. Metadata written to log, but prevProducerId is NOT persisted
4. Coordinator fails before client receives response
5. On recovery:
- prevProducerId = -1 (NO_PRODUCER_ID)
- producerId = 101, producerEpoch = 0
6. Client retries InitProducerId with expectedProducerIdAndEpoch = (100, 32766)
7. Validation in TransactionCoordinator.scala line 243-245:
(producerIdAndEpoch.producerId == txnMetadata.prevProducerId && // 100 ==
-1?
TransactionMetadata.isEpochExhausted(producerIdAndEpoch.epoch))
8. Check fails, returns PRODUCER_FENCED
The fix is to add
if (logVersion >= 1) {
value.setPreviousProducerId(txnMetadata.prevProducerId());
value.setNextProducerId(txnMetadata.nextProducerId());
}
to valueToBytes() method in
[TransactionLog.java|https://github.com/apache/kafka/blob/7d53410d8febf4e39a65cba75951a941fe83c337/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java#L87]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)