Repository: kafka
Updated Branches:
  refs/heads/trunk 71fe23b44 -> 3c9e30a2f


MINOR: Tighten up locking when aborting expired transactions

This is a followup to #4137

Author: Apurva Mehta <apu...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #4146 from apurvam/MINOR-followups-to-bump-epoch-on-expire-patch


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3c9e30a2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3c9e30a2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3c9e30a2

Branch: refs/heads/trunk
Commit: 3c9e30a2f71c83b7efd45a65ffb5df5a80f48d19
Parents: 71fe23b
Author: Apurva Mehta <apu...@confluent.io>
Authored: Tue Oct 31 09:57:05 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Oct 31 09:57:05 2017 -0700

----------------------------------------------------------------------
 .../transaction/TransactionCoordinator.scala    | 69 +++++++++-----------
 .../kafka/api/TransactionsTest.scala            |  4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala | 14 +++-
 3 files changed, 45 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3c9e30a2/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index b307a39..6ad1f40 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -320,8 +320,7 @@ class TransactionCoordinator(brokerId: Int,
                 else
                   PrepareAbort
 
-                if (nextState == PrepareAbort && 
txnMetadata.pendingState.isDefined
-                  && txnMetadata.pendingState.get == PrepareEpochFence) {
+                if (nextState == PrepareAbort && 
txnMetadata.pendingState.contains(PrepareEpochFence)) {
                   // We should clear the pending state to make way for the 
transition to PrepareAbort and also bump
                   // the epoch in the transaction metadata we are about to 
append.
                   txnMetadata.pendingState = None
@@ -454,42 +453,38 @@ class TransactionCoordinator(brokerId: Int,
 
         case Some(epochAndTxnMetadata) =>
           val txnMetadata = epochAndTxnMetadata.transactionMetadata
-          val producerIdHasChanged = txnMetadata.inLock {
-            txnMetadata.producerId != txnIdAndPidEpoch.producerId
-          }
-          if (producerIdHasChanged) {
-            error(s"Found incorrect producerId when expiring transactionalId: 
${txnIdAndPidEpoch.transactionalId}. " +
-              s"Expected producerId: ${txnIdAndPidEpoch.producerId}. Found 
producerId: " +
-              s"${epochAndTxnMetadata.transactionMetadata.producerId}")
-            Left(Errors.INVALID_PRODUCER_ID_MAPPING)
-          } else {
-            val transitMetadata: Either[Errors, TxnTransitMetadata] = 
txnMetadata.inLock {
-              if (txnMetadata.pendingTransitionInProgress)
-                Left(Errors.CONCURRENT_TRANSACTIONS)
-              else
-                Right(txnMetadata.prepareFenceProducerEpoch())
-            }
-            transitMetadata match {
-              case Right(txnTransitMetadata) =>
-                handleEndTransaction(txnMetadata.transactionalId,
-                  txnTransitMetadata.producerId,
-                  txnTransitMetadata.producerEpoch,
-                  TransactionResult.ABORT,
-                  {
-                    case Errors.NONE =>
-                      info(s"Completed rollback ongoing transaction of 
transactionalId: ${txnIdAndPidEpoch.transactionalId} due to timeout")
-                    case e @ (Errors.INVALID_PRODUCER_ID_MAPPING |
-                              Errors.INVALID_PRODUCER_EPOCH |
-                              Errors.CONCURRENT_TRANSACTIONS) =>
-                      debug(s"Rolling back ongoing transaction of 
transactionalId: ${txnIdAndPidEpoch.transactionalId} has aborted due to 
${e.exceptionName}")
-                    case e =>
-                      warn(s"Rolling back ongoing transaction of 
transactionalId: ${txnIdAndPidEpoch.transactionalId} failed due to 
${e.exceptionName}")
-                  })
-                Right(txnTransitMetadata)
-              case (error) =>
-                Left(error)
+          val transitMetadata = txnMetadata.inLock {
+            if (txnMetadata.producerId != txnIdAndPidEpoch.producerId) {
+              error(s"Found incorrect producerId when expiring 
transactionalId: ${txnIdAndPidEpoch.transactionalId}. " +
+                s"Expected producerId: ${txnIdAndPidEpoch.producerId}. Found 
producerId: " +
+                s"${txnMetadata.producerId}")
+              Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+            } else if (txnMetadata.pendingTransitionInProgress) {
+              Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else {
+              Right(txnMetadata.prepareFenceProducerEpoch())
             }
-         }
+          }
+          transitMetadata match {
+            case Right(txnTransitMetadata) =>
+              handleEndTransaction(txnMetadata.transactionalId,
+                txnTransitMetadata.producerId,
+                txnTransitMetadata.producerEpoch,
+                TransactionResult.ABORT,
+                {
+                  case Errors.NONE =>
+                    info(s"Completed rollback ongoing transaction of 
transactionalId: ${txnIdAndPidEpoch.transactionalId} due to timeout")
+                  case e @ (Errors.INVALID_PRODUCER_ID_MAPPING |
+                            Errors.INVALID_PRODUCER_EPOCH |
+                            Errors.CONCURRENT_TRANSACTIONS) =>
+                    debug(s"Rolling back ongoing transaction of 
transactionalId: ${txnIdAndPidEpoch.transactionalId} has aborted due to 
${e.exceptionName}")
+                  case e =>
+                    warn(s"Rolling back ongoing transaction of 
transactionalId: ${txnIdAndPidEpoch.transactionalId} failed due to 
${e.exceptionName}")
+                })
+              Right(txnTransitMetadata)
+            case (error) =>
+              Left(error)
+          }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c9e30a2/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 3eee7f1..bb6b520 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -482,14 +482,14 @@ class TransactionsTest extends KafkaServerTestHarness {
     // Verify that the first message was aborted and the second one was never 
written at all.
     val nonTransactionalConsumer = nonTransactionalConsumers(0)
     nonTransactionalConsumer.subscribe(List(topic1).asJava)
-    val records = TestUtils.consumeRemainingRecords(nonTransactionalConsumer, 
1000)
+    val records = TestUtils.consumeRecordsFor(nonTransactionalConsumer, 1000)
     assertEquals(1, records.size)
     assertEquals("1", TestUtils.recordValueAsString(records.head))
 
     val transactionalConsumer = transactionalConsumers.head
     transactionalConsumer.subscribe(List(topic1).asJava)
 
-    val transactionalRecords = 
TestUtils.consumeRemainingRecords(transactionalConsumer, 1000)
+    val transactionalRecords = 
TestUtils.consumeRecordsFor(transactionalConsumer, 1000)
     assertTrue(transactionalRecords.isEmpty)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c9e30a2/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 974a493..a99cd50 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1382,13 +1382,21 @@ object TestUtils extends Logging {
     records
   }
 
-  def consumeRemainingRecords[K, V](consumer: KafkaConsumer[K, V], timeout: 
Long): Seq[ConsumerRecord[K, V]] = {
+  /**
+    * Will consume all the records for the given consumer for the specified 
duration. If you want to drain all the
+    * remaining messages in the partitions the consumer is subscribed to, the 
duration should be set high enough so
+    * that the consumer has enough time to poll everything. This would be 
based on the number of expected messages left
+    * in the topic, and should not be too large (ie. more than a second) in 
our tests.
+    *
+    * @return All the records consumed by the consumer within the specified 
duration.
+    */
+  def consumeRecordsFor[K, V](consumer: KafkaConsumer[K, V], duration: Long): 
Seq[ConsumerRecord[K, V]] = {
     val startTime = System.currentTimeMillis()
     val records = new ArrayBuffer[ConsumerRecord[K, V]]()
     waitUntilTrue(() => {
       records ++= consumer.poll(50).asScala
-      System.currentTimeMillis() - startTime > timeout
-    }, s"The timeout $timeout was greater than the maximum wait time.")
+      System.currentTimeMillis() - startTime > duration
+    }, s"The timeout $duration was greater than the maximum wait time.")
     records
   }
 

Reply via email to