bob-barrett commented on a change in pull request #8239:
URL: https://github.com/apache/kafka/pull/8239#discussion_r454914857



##########
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
##########
@@ -210,7 +214,9 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
     if (producerEpoch == Short.MaxValue)
       throw new IllegalStateException(s"Cannot fence producer with epoch equal 
to Short.MaxValue since this would overflow")
 
-    prepareTransitionTo(PrepareEpochFence, producerId, (producerEpoch + 
1).toShort, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs,
+    val bumpedEpoch = if (hasFailedEpochFence) producerEpoch else 
(producerEpoch + 1).toShort

Review comment:
       Added

##########
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -501,6 +502,21 @@ class TransactionCoordinator(brokerId: Int,
               info(s"Aborting sending of transaction markers and returning 
$error error to client for $transactionalId's EndTransaction request of 
$txnMarkerResult, " +
                 s"since appending $newMetadata to transaction log with 
coordinator epoch $coordinatorEpoch failed")
 
+              if (isEpochFence) {
+                txnManager.getTransactionState(transactionalId).foreach {
+                  case None =>
+                    warn(s"The coordinator still owns the transaction 
partition for $transactionalId, but there is " +
+                      s"no metadata in the cache; this is not expected")
+
+                  case Some(epochAndMetadata) =>
+                    if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) 
{
+                      // This was attempted epoch fence that failed, so mark 
this state on the metadata
+                      epochAndMetadata.transactionMetadata.hasFailedEpochFence 
= true
+                      warn("")

Review comment:
       Thanks, fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to