This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push: new be58d73 KAFKA-6917; Process txn completion asynchronously to avoid deadlock (#5036) be58d73 is described below commit be58d7370667e8f0670a3e480674794e6cbb8231 Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Fri May 18 21:30:12 2018 +0100 KAFKA-6917; Process txn completion asynchronously to avoid deadlock (#5036) Reviewers: Guozhang Wang <wangg...@gmail.com>, Jason Gustafson <ja...@confluent.io> --- .../kafka/coordinator/group/GroupCoordinator.scala | 11 ++++---- .../coordinator/group/GroupMetadataManager.scala | 30 ++++++++++++++++------ core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- .../group/GroupCoordinatorConcurrencyTest.scala | 24 +++++++++-------- .../coordinator/group/GroupCoordinatorTest.scala | 23 +++++++++++------ 5 files changed, 56 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 5ae8552..6cc0a41 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -81,8 +81,7 @@ class GroupCoordinator(val brokerId: Int, */ def startup(enableMetadataExpiration: Boolean = true) { info("Starting up.") - if (enableMetadataExpiration) - groupManager.enableMetadataExpiration() + groupManager.startup(enableMetadataExpiration) isActive.set(true) info("Startup complete.") } @@ -488,12 +487,12 @@ class GroupCoordinator(val brokerId: Int, } } - def handleTxnCompletion(producerId: Long, - offsetsPartitions: Iterable[TopicPartition], - transactionResult: TransactionResult) { + def scheduleHandleTxnCompletion(producerId: Long, + offsetsPartitions: Iterable[TopicPartition], + transactionResult: TransactionResult) { require(offsetsPartitions.forall(_.topic == Topic.GROUP_METADATA_TOPIC_NAME)) val isCommit = transactionResult == TransactionResult.COMMIT - groupManager.handleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit) + groupManager.scheduleHandleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit) } private def doCommitOffsets(group: GroupMetadata, diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 7d4fe03..44a9369 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -135,13 +135,14 @@ class GroupMetadataManager(brokerId: Int, }) }) - def enableMetadataExpiration() { + def startup(enableMetadataExpiration: Boolean) { scheduler.startup() - - scheduler.schedule(name = "delete-expired-group-metadata", - fun = cleanupGroupMetadata, - period = config.offsetsRetentionCheckIntervalMs, - unit = TimeUnit.MILLISECONDS) + if (enableMetadataExpiration) { + scheduler.schedule(name = "delete-expired-group-metadata", + fun = cleanupGroupMetadata, + period = config.offsetsRetentionCheckIntervalMs, + unit = TimeUnit.MILLISECONDS) + } } def currentGroups: Iterable[GroupMetadata] = groupMetadataCache.values @@ -794,7 +795,20 @@ class GroupMetadataManager(brokerId: Int, info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.") } - def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean) { + /** + * Complete pending transactional offset commits of the groups of `producerId` from the provided + * `completedPartitions`. This method is invoked when a commit or abort marker is fully written + * to the log. It may be invoked when a group lock is held by the caller, for instance when delayed + * operations are completed while appending offsets for a group. Since we need to acquire one or + * more group metadata locks to handle transaction completion, this operation is scheduled on + * the scheduler thread to avoid deadlocks. + */ + def scheduleHandleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): Unit = { + scheduler.schedule(s"handleTxnCompletion-$producerId", () => + handleTxnCompletion(producerId, completedPartitions, isCommit)) + } + + private[group] def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): Unit = { val pendingGroups = groupsBelongingToPartitions(producerId, completedPartitions) pendingGroups.foreach { case (groupId) => getGroup(groupId) match { @@ -803,7 +817,7 @@ class GroupMetadataManager(brokerId: Int, group.completePendingTxnOffsetCommit(producerId, isCommit) removeProducerGroup(producerId, groupId) } - } + } case _ => info(s"Group $groupId has moved away from $brokerId after transaction marker was written but before the " + s"cache was updated. The cache on the new group owner will be updated instead.") diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9e79afa..5d68c98 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1623,7 +1623,7 @@ class KafkaApis(val requestChannel: RequestChannel, // as soon as the end transaction marker has been written for a transactional offset commit, // call to the group coordinator to materialize the offsets into the cache try { - groupCoordinator.handleTxnCompletion(producerId, successfulOffsetsPartitions, result) + groupCoordinator.scheduleHandleTxnCompletion(producerId, successfulOffsetsPartitions, result) } catch { case e: Exception => error(s"Received an exception while trying to update the offsets cache on transaction marker append", e) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala index 44e1356..befd22a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala @@ -27,7 +27,7 @@ import kafka.server.{ DelayedOperationPurgatory, KafkaConfig } import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{ JoinGroupRequest, TransactionResult } +import org.apache.kafka.common.requests.JoinGroupRequest import org.easymock.EasyMock import org.junit.Assert._ import org.junit.{ After, Before, Test } @@ -117,7 +117,6 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest verifyConcurrentRandomSequences(createGroupMembers, allOperationsWithTxn) } - abstract class GroupOperation[R, C] extends Operation { val responseFutures = new ConcurrentHashMap[GroupMember, Future[R]]() @@ -228,8 +227,17 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest val offsets = immutable.Map(tp -> OffsetAndMetadata(1)) val producerId = 1000L val producerEpoch : Short = 2 + // When transaction offsets are appended to the log, transactions may be scheduled for + // completion. Since group metadata locks are acquired for transaction completion, include + // this in the callback to test that there are no deadlocks. + def callbackWithTxnCompletion(errors: Map[TopicPartition, Errors]): Unit = { + val offsetsPartitions = (0 to numPartitions).map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _)) + groupCoordinator.groupManager.scheduleHandleTxnCompletion(producerId, + offsetsPartitions.map(_.partition).toSet, isCommit = random.nextBoolean) + responseCallback(errors) + } groupCoordinator.handleTxnCommitOffsets(member.group.groupId, - producerId, producerEpoch, offsets, responseCallback) + producerId, producerEpoch, offsets, callbackWithTxnCompletion) } } @@ -241,19 +249,14 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest override def runWithCallback(member: GroupMember, responseCallback: CompleteTxnCallback): Unit = { val producerId = 1000L val offsetsPartitions = (0 to numPartitions).map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _)) - groupCoordinator.handleTxnCompletion(producerId, offsetsPartitions, transactionResult(member.group.groupId)) + groupCoordinator.groupManager.handleTxnCompletion(producerId, + offsetsPartitions.map(_.partition).toSet, isCommit = random.nextBoolean) responseCallback(Errors.NONE) } override def awaitAndVerify(member: GroupMember): Unit = { val error = await(member, 500) assertEquals(Errors.NONE, error) } - // Test both commit and abort. Group ids used in the test have the format <prefix><index> - // Use the last digit of the index to decide between commit and abort. - private def transactionResult(groupId: String): TransactionResult = { - val lastDigit = groupId(groupId.length - 1).toInt - if (lastDigit % 2 == 0) TransactionResult.COMMIT else TransactionResult.ABORT - } } class LeaveGroupOperation extends GroupOperation[LeaveGroupCallbackParams, LeaveGroupCallback] { @@ -273,7 +276,6 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest object GroupCoordinatorConcurrencyTest { - type JoinGroupCallback = JoinGroupResult => Unit type SyncGroupCallbackParams = (Array[Byte], Errors) type SyncGroupCallback = (Array[Byte], Errors) => Unit diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 2c9e81d..8529bf9 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -841,7 +841,7 @@ class GroupCoordinatorTest extends JUnitSuite { val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) // Send commit marker. - groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT) + handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT) // Validate that committed offset is materialized. val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) @@ -866,7 +866,7 @@ class GroupCoordinatorTest extends JUnitSuite { val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) // Validate that the pending commit is discarded. - groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT) + handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT) val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) assertEquals(Errors.NONE, secondReqError) @@ -888,14 +888,14 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset)) val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) - groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT) + handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT) val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) assertEquals(Errors.NONE, secondReqError) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), secondReqPartitionData.get(tp).map(_.offset)) // Ignore spurious commit. - groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT) + handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT) val (thirdReqError, thirdReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) assertEquals(Errors.NONE, thirdReqError) @@ -932,7 +932,7 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1))) // We got a commit for only one __consumer_offsets partition. We should only materialize it's group offsets. - groupCoordinator.handleTxnCompletion(producerId, List(offsetTopicPartitions(0)), TransactionResult.COMMIT) + handleTxnCompletion(producerId, List(offsetTopicPartitions(0)), TransactionResult.COMMIT) groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match { case (error, partData) => errors.append(error) @@ -958,7 +958,7 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(1).get(partitions(1)).map(_.offset)) // Now we receive the other marker. - groupCoordinator.handleTxnCompletion(producerId, List(offsetTopicPartitions(1)), TransactionResult.COMMIT) + handleTxnCompletion(producerId, List(offsetTopicPartitions(1)), TransactionResult.COMMIT) errors.clear() partitionData.clear() groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match { @@ -1008,7 +1008,7 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1))) // producer0 commits its transaction. - groupCoordinator.handleTxnCompletion(producerIds(0), List(offsetTopicPartition), TransactionResult.COMMIT) + handleTxnCompletion(producerIds(0), List(offsetTopicPartition), TransactionResult.COMMIT) groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match { case (error, partData) => errors.append(error) @@ -1023,7 +1023,7 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(0).get(partitions(1)).map(_.offset)) // producer1 now commits its transaction. - groupCoordinator.handleTxnCompletion(producerIds(1), List(offsetTopicPartition), TransactionResult.COMMIT) + handleTxnCompletion(producerIds(1), List(offsetTopicPartition), TransactionResult.COMMIT) groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match { case (error, partData) => @@ -1629,4 +1629,11 @@ class GroupCoordinatorTest extends JUnitSuite { Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) } + def handleTxnCompletion(producerId: Long, + offsetsPartitions: Iterable[TopicPartition], + transactionResult: TransactionResult): Unit = { + val isCommit = transactionResult == TransactionResult.COMMIT + groupCoordinator.groupManager.handleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit) + } + } -- To stop receiving notification emails like this one, please contact j...@apache.org.