Artem Livshits created KAFKA-20310:
--------------------------------------

             Summary: nextProducerId and prevProducerId fields never persisted 
to transaction log
                 Key: KAFKA-20310
                 URL: https://issues.apache.org/jira/browse/KAFKA-20310
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 4.0.0
            Reporter: Artem Livshits


The valueToBytes() method in 
[TransactionLog.java|https://github.com/apache/kafka/blob/7d53410d8febf4e39a65cba75951a941fe83c337/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java#L87]
 never writes nextProducerId and prevProducerId.

*Scenario1: Coordinator failover during PREPARE_COMMIT/PREPARE_ABORT with epoch 
exhaustion*

1. Transaction in PREPARE_COMMIT state with epoch exhaustion:
   - producerId = 100, producerEpoch = 32767 (Short.MAX_VALUE)
   - nextProducerId = 101, nextProducerEpoch = 0

2. Metadata written to log, but nextProducerId/nextProducerEpoch are not 
persisted

3. Coordinator leadership changes

4. On recovery, transaction log read gives:
   - producerId = 100, producerEpoch = 32767
   - nextProducerId = -1 (NO_PRODUCER_ID)
   - nextProducerEpoch = -1 (NO_PRODUCER_EPOCH)

5. Transaction markers complete, prepareComplete() is called:

   Line 331-333 in TransactionMetadata.java:
   if (clientTransactionVersion.supportsEpochBump() && nextProducerId != 
RecordBatch.NO_PRODUCER_ID) {
       data.producerId = nextProducerId;         // Would rotate to 101
       data.producerEpoch = hasNextProducerEpoch() ? nextProducerEpoch : 0;
   } else {
       data.producerId = producerId;             // STUCK at 100
       data.producerEpoch = clientProducerEpoch();  // STUCK at 32767
   }

6. Because nextProducerId is -1, the else branch executes

7. Producer stays at (100, 32767) - epoch exhausted

8. Next InitProducerId fails: "Cannot allocate any more producer epochs"

*Scenario 2: Client retry after coordinator failover during epoch rotation*

1. Client has producer (100, 32766), epoch exhaustion imminent

2. InitProducerId triggers epoch rotation:
   - prevProducerId = 100
   - producerId = 101, producerEpoch = 0

3. Metadata written to log, but prevProducerId is NOT persisted

4. Coordinator fails before client receives response

5. On recovery:
   - prevProducerId = -1 (NO_PRODUCER_ID)
   - producerId = 101, producerEpoch = 0

6. Client retries InitProducerId with expectedProducerIdAndEpoch = (100, 32766)

7. Validation in TransactionCoordinator.scala line 243-245:
   (producerIdAndEpoch.producerId == txnMetadata.prevProducerId &&  // 100 == 
-1?
    TransactionMetadata.isEpochExhausted(producerIdAndEpoch.epoch))

8. Check fails, returns PRODUCER_FENCED

The fix is to add 

  if (logVersion >= 1) {
      value.setPreviousProducerId(txnMetadata.prevProducerId());
      value.setNextProducerId(txnMetadata.nextProducerId());
  }

to valueToBytes() method in 
[TransactionLog.java|https://github.com/apache/kafka/blob/7d53410d8febf4e39a65cba75951a941fe83c337/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java#L87]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to