This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new bb47d39  KAFKA-6917; Process txn completion asynchronously to avoid 
deadlock (#5036)
bb47d39 is described below

commit bb47d39ab3de9494acade26cf4a7ecce2248d144
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Fri May 18 21:30:12 2018 +0100

    KAFKA-6917; Process txn completion asynchronously to avoid deadlock (#5036)
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>, Jason Gustafson 
<ja...@confluent.io>
---
 .../kafka/coordinator/group/GroupCoordinator.scala | 11 ++++----
 .../coordinator/group/GroupMetadataManager.scala   | 30 ++++++++++++++++------
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 .../coordinator/group/GroupCoordinatorTest.scala   | 23 +++++++++++------
 4 files changed, 43 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index bea4e83..25b5780 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -80,8 +80,7 @@ class GroupCoordinator(val brokerId: Int,
    */
   def startup(enableMetadataExpiration: Boolean = true) {
     info("Starting up.")
-    if (enableMetadataExpiration)
-      groupManager.enableMetadataExpiration()
+    groupManager.startup(enableMetadataExpiration)
     isActive.set(true)
     info("Startup complete.")
   }
@@ -441,12 +440,12 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  def handleTxnCompletion(producerId: Long,
-                          offsetsPartitions: Iterable[TopicPartition],
-                          transactionResult: TransactionResult) {
+  def scheduleHandleTxnCompletion(producerId: Long,
+                                  offsetsPartitions: Iterable[TopicPartition],
+                                  transactionResult: TransactionResult) {
     require(offsetsPartitions.forall(_.topic == 
Topic.GROUP_METADATA_TOPIC_NAME))
     val isCommit = transactionResult == TransactionResult.COMMIT
-    groupManager.handleTxnCompletion(producerId, 
offsetsPartitions.map(_.partition).toSet, isCommit)
+    groupManager.scheduleHandleTxnCompletion(producerId, 
offsetsPartitions.map(_.partition).toSet, isCommit)
   }
 
   private def doCommitOffsets(group: GroupMetadata,
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index fae08e2..e22623f 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -96,13 +96,14 @@ class GroupMetadataManager(brokerId: Int,
     }
   )
 
-  def enableMetadataExpiration() {
+  def startup(enableMetadataExpiration: Boolean) {
     scheduler.startup()
-
-    scheduler.schedule(name = "delete-expired-group-metadata",
-      fun = cleanupGroupMetadata,
-      period = config.offsetsRetentionCheckIntervalMs,
-      unit = TimeUnit.MILLISECONDS)
+    if (enableMetadataExpiration) {
+      scheduler.schedule(name = "delete-expired-group-metadata",
+        fun = cleanupGroupMetadata,
+        period = config.offsetsRetentionCheckIntervalMs,
+        unit = TimeUnit.MILLISECONDS)
+    }
   }
 
   def currentGroups: Iterable[GroupMetadata] = groupMetadataCache.values
@@ -746,7 +747,20 @@ class GroupMetadataManager(brokerId: Int,
     info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - 
startMs} milliseconds.")
   }
 
-  def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], 
isCommit: Boolean) {
+  /**
+   * Complete pending transactional offset commits of the groups of 
`producerId` from the provided
+   * `completedPartitions`. This method is invoked when a commit or abort 
marker is fully written
+   * to the log. It may be invoked when a group lock is held by the caller, 
for instance when delayed
+   * operations are completed while appending offsets for a group. Since we 
need to acquire one or
+   * more group metadata locks to handle transaction completion, this 
operation is scheduled on
+   * the scheduler thread to avoid deadlocks.
+   */
+  def scheduleHandleTxnCompletion(producerId: Long, completedPartitions: 
Set[Int], isCommit: Boolean): Unit = {
+    scheduler.schedule(s"handleTxnCompletion-$producerId", () =>
+      handleTxnCompletion(producerId, completedPartitions, isCommit))
+  }
+
+  private[group] def handleTxnCompletion(producerId: Long, 
completedPartitions: Set[Int], isCommit: Boolean): Unit = {
     val pendingGroups = groupsBelongingToPartitions(producerId, 
completedPartitions)
     pendingGroups.foreach { case (groupId) =>
       getGroup(groupId) match {
@@ -755,7 +769,7 @@ class GroupMetadataManager(brokerId: Int,
             group.completePendingTxnOffsetCommit(producerId, isCommit)
             removeProducerGroup(producerId, groupId)
           }
-       }
+        }
         case _ =>
           info(s"Group $groupId has moved away from $brokerId after 
transaction marker was written but before the " +
             s"cache was updated. The cache on the new group owner will be 
updated instead.")
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index d6b478d..ee004f5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1564,7 +1564,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         // as soon as the end transaction marker has been written for a 
transactional offset commit,
         // call to the group coordinator to materialize the offsets into the 
cache
         try {
-          groupCoordinator.handleTxnCompletion(producerId, 
successfulOffsetsPartitions, result)
+          groupCoordinator.scheduleHandleTxnCompletion(producerId, 
successfulOffsetsPartitions, result)
         } catch {
           case e: Exception =>
             error(s"Received an exception while trying to update the offsets 
cache on transaction marker append", e)
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index aa44d14..3592d6a 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -839,7 +839,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId)
 
     // Send commit marker.
-    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.COMMIT)
+    handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.COMMIT)
 
     // Validate that committed offset is materialized.
     val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -864,7 +864,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId)
 
     // Validate that the pending commit is discarded.
-    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.ABORT)
+    handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.ABORT)
 
     val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
     assertEquals(Errors.NONE, secondReqError)
@@ -886,14 +886,14 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tp).map(_.offset))
 
     val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId)
-    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.ABORT)
+    handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.ABORT)
 
     val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
     assertEquals(Errors.NONE, secondReqError)
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
secondReqPartitionData.get(tp).map(_.offset))
 
     // Ignore spurious commit.
-    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.COMMIT)
+    handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.COMMIT)
 
     val (thirdReqError, thirdReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
     assertEquals(Errors.NONE, thirdReqError)
@@ -930,7 +930,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
 
     // We got a commit for only one __consumer_offsets partition. We should 
only materialize it's group offsets.
-    groupCoordinator.handleTxnCompletion(producerId, 
List(offsetTopicPartitions(0)), TransactionResult.COMMIT)
+    handleTxnCompletion(producerId, List(offsetTopicPartitions(0)), 
TransactionResult.COMMIT)
     groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match {
       case (error, partData) =>
         errors.append(error)
@@ -956,7 +956,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(1).get(partitions(1)).map(_.offset))
 
     // Now we receive the other marker.
-    groupCoordinator.handleTxnCompletion(producerId, 
List(offsetTopicPartitions(1)), TransactionResult.COMMIT)
+    handleTxnCompletion(producerId, List(offsetTopicPartitions(1)), 
TransactionResult.COMMIT)
     errors.clear()
     partitionData.clear()
     groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match {
@@ -1006,7 +1006,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
 
     // producer0 commits its transaction.
-    groupCoordinator.handleTxnCompletion(producerIds(0), 
List(offsetTopicPartition), TransactionResult.COMMIT)
+    handleTxnCompletion(producerIds(0), List(offsetTopicPartition), 
TransactionResult.COMMIT)
     groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match {
       case (error, partData) =>
         errors.append(error)
@@ -1021,7 +1021,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(0).get(partitions(1)).map(_.offset))
 
     // producer1 now commits its transaction.
-    groupCoordinator.handleTxnCompletion(producerIds(1), 
List(offsetTopicPartition), TransactionResult.COMMIT)
+    handleTxnCompletion(producerIds(1), List(offsetTopicPartition), 
TransactionResult.COMMIT)
 
     groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match {
       case (error, partData) =>
@@ -1540,4 +1540,11 @@ class GroupCoordinatorTest extends JUnitSuite {
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
 
+  def handleTxnCompletion(producerId: Long,
+                          offsetsPartitions: Iterable[TopicPartition],
+                          transactionResult: TransactionResult): Unit = {
+    val isCommit = transactionResult == TransactionResult.COMMIT
+    groupCoordinator.groupManager.handleTxnCompletion(producerId, 
offsetsPartitions.map(_.partition).toSet, isCommit)
+  }
+
 }

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.

Reply via email to