[ 
https://issues.apache.org/jira/browse/KAFKA-20310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18066049#comment-18066049
 ] 

sanghyeok An commented on KAFKA-20310:
--------------------------------------

Hello [~alivshits] 

Thank you for identifying this issue!

I also agree that, in order to address the issue you mentioned, the minimal 
change would be to persist the Producer ID. 
I do have one question regarding this point.

Would it also make sense to persist the Producer Epoch? 
For now, starting from TV2, the epoch is incremented by 1, so I believe there 
is no semantic issue with inferring the value even if it is not persisted. That 
said, if in the future there are cases where the epoch may be incremented by 2 
or 3 rather than only by 1, it might be worth considering persisting the 
related fields in the Transaction State together in advance.

> nextProducerId and prevProducerId fields never persisted to transaction log
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-20310
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20310
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 4.0.0
>            Reporter: Artem Livshits
>            Priority: Critical
>
> The valueToBytes() method in 
> [TransactionLog.java|https://github.com/apache/kafka/blob/7d53410d8febf4e39a65cba75951a941fe83c337/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java#L87]
>  never writes nextProducerId and prevProducerId.
> *Scenario1: Coordinator failover during PREPARE_COMMIT/PREPARE_ABORT with 
> epoch exhaustion*
> 1. Transaction in PREPARE_COMMIT state with epoch exhaustion:
>    - producerId = 100, producerEpoch = 32767 (Short.MAX_VALUE)
>    - nextProducerId = 101, nextProducerEpoch = 0
> 2. Metadata written to log, but nextProducerId/nextProducerEpoch are not 
> persisted
> 3. Coordinator leadership changes
> 4. On recovery, transaction log read gives:
>    - producerId = 100, producerEpoch = 32767
>    - nextProducerId = -1 (NO_PRODUCER_ID)
>    - nextProducerEpoch = -1 (NO_PRODUCER_EPOCH)
> 5. Transaction markers complete, prepareComplete() is called:
>    Line 331-333 in TransactionMetadata.java:
> {code:java}
>  if (clientTransactionVersion.supportsEpochBump() && nextProducerId != 
> RecordBatch.NO_PRODUCER_ID) {
>      data.producerId = nextProducerId;         // Would rotate to 101
>      data.producerEpoch = hasNextProducerEpoch() ? nextProducerEpoch : 0;
>  } else {
>      data.producerId = producerId;             // STUCK at 100
>      data.producerEpoch = clientProducerEpoch();  // STUCK at 32767
>  }
> {code}
> 6. Because nextProducerId is -1, the else branch executes
> 7. Producer stays at (100, 32767) - epoch exhausted
> 8. Next InitProducerId fails: "Cannot allocate any more producer epochs"
> *Scenario 2: Client retry after coordinator failover during epoch rotation*
> 1. Client has producer (100, 32766), epoch exhaustion imminent
> 2. InitProducerId triggers epoch rotation:
>    - prevProducerId = 100
>    - producerId = 101, producerEpoch = 0
> 3. Metadata written to log, but prevProducerId is NOT persisted
> 4. Coordinator fails before client receives response
> 5. On recovery:
>    - prevProducerId = -1 (NO_PRODUCER_ID)
>    - producerId = 101, producerEpoch = 0
> 6. Client retries InitProducerId with expectedProducerIdAndEpoch = (100, 
> 32766)
> 7. Validation in TransactionCoordinator.scala line 243-245:
> {code:java}
>    (producerIdAndEpoch.producerId == txnMetadata.prevProducerId &&  // 100 == 
> -1?
>     TransactionMetadata.isEpochExhausted(producerIdAndEpoch.epoch))
> {code}
> 8. Check fails, returns PRODUCER_FENCED
> *The fix* is to add 
> {code:java}
>   if (logVersion >= 1) {
>       value.setPreviousProducerId(txnMetadata.prevProducerId());
>       value.setNextProducerId(txnMetadata.nextProducerId());
>   }
> {code}
> to valueToBytes() method in 
> [TransactionLog.java|https://github.com/apache/kafka/blob/7d53410d8febf4e39a65cba75951a941fe83c337/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java#L87]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to