Repository: kafka
Updated Branches:
  refs/heads/0.11.0 4e532c485 -> 4701d4cc7


KAFKA-6042: Avoid deadlock between two groups with delayed operations

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Guozhang Wang 
<wangg...@gmail.com>

Closes #4103 from rajinisivaram/KAFKA-6042-group-deadlock


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4701d4cc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4701d4cc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4701d4cc

Branch: refs/heads/0.11.0
Commit: 4701d4cc7cc281a6a88856d85d97dafa12994885
Parents: 4e532c4
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Sat Oct 21 20:17:58 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Oct 23 11:48:45 2017 -0700

----------------------------------------------------------------------
 .../coordinator/group/DelayedHeartbeat.scala    |  2 +-
 .../kafka/coordinator/group/DelayedJoin.scala   |  4 +-
 .../coordinator/group/GroupCoordinator.scala    | 34 +++++++------
 .../kafka/coordinator/group/GroupMetadata.scala |  7 ++-
 .../group/GroupMetadataManager.scala            | 17 +++----
 .../transaction/DelayedTxnMarker.scala          |  8 ++--
 .../transaction/TransactionCoordinator.scala    |  8 ++--
 .../TransactionMarkerChannelManager.scala       |  4 +-
 ...nsactionMarkerRequestCompletionHandler.scala |  2 +-
 .../transaction/TransactionMetadata.scala       |  8 +++-
 .../transaction/TransactionStateManager.scala   | 38 ++++++++++-----
 .../scala/kafka/server/DelayedOperation.scala   |  7 +--
 .../scala/kafka/server/DelayedProduce.scala     |  6 ++-
 .../scala/kafka/server/ReplicaManager.scala     |  6 ++-
 .../group/GroupCoordinatorTest.scala            | 12 +++--
 .../group/GroupMetadataManagerTest.scala        |  9 ++--
 .../TransactionMarkerChannelManagerTest.scala   |  7 ++-
 .../TransactionStateManagerTest.scala           |  9 ++--
 .../kafka/server/DelayedOperationTest.scala     | 50 ++++++++++++++++++--
 .../scala/unit/kafka/server/KafkaApisTest.scala |  7 ++-
 20 files changed, 170 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala 
b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
index 73d5d0f..5f16acb 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
@@ -28,7 +28,7 @@ private[group] class DelayedHeartbeat(coordinator: 
GroupCoordinator,
                                       member: MemberMetadata,
                                       heartbeatDeadline: Long,
                                       sessionTimeout: Long)
-  extends DelayedOperation(sessionTimeout) {
+  extends DelayedOperation(sessionTimeout, Some(group.lock)) {
 
   override def tryComplete(): Boolean = 
coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, 
forceComplete _)
   override def onExpiration() = coordinator.onExpireHeartbeat(group, member, 
heartbeatDeadline)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala 
b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
index 5232287..c75c0d4 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
@@ -33,7 +33,7 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                  group: GroupMetadata,
-                                 rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout) {
+                                 rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, Some(group.lock)) {
 
   override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, 
forceComplete _)
   override def onExpiration() = coordinator.onExpireJoin()
@@ -58,7 +58,7 @@ private[group] class InitialDelayedJoin(coordinator: 
GroupCoordinator,
   override def tryComplete(): Boolean = false
 
   override def onComplete(): Unit = {
-    group synchronized  {
+    group.inLock  {
       if (group.newMemberAdded && remainingMs != 0) {
         group.newMemberAdded = false
         val delay = min(configuredRebalanceDelay, remainingMs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 42bc3c3..6d4da03 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -40,6 +40,12 @@ import scala.math.max
  *
  * Each Kafka server instantiates a coordinator which is responsible for a set 
of
  * groups. Groups are assigned to coordinators based on their group names.
+ * <p>
+ * <b>Delayed operation locking notes:</b>
+ * Delayed operations in GroupCoordinator use `group` as the delayed operation
+ * lock. ReplicaManager.appendRecords may be invoked while holding the group 
lock
+ * used by its callback.  The delayed callback may acquire the group lock
+ * since the delayed operation is completed only if the group lock can be 
acquired.
  */
 class GroupCoordinator(val brokerId: Int,
                        val groupConfig: GroupConfig,
@@ -142,7 +148,7 @@ class GroupCoordinator(val brokerId: Int,
                           protocolType: String,
                           protocols: List[(String, Array[Byte])],
                           responseCallback: JoinCallback) {
-    group synchronized {
+    group.inLock {
       if (!group.is(Empty) && (!group.protocolType.contains(protocolType) || 
!group.supportsProtocols(protocols.map(_._1).toSet))) {
         // if the new member does not support the group protocol, reject it
         responseCallback(joinError(memberId, 
Errors.INCONSISTENT_GROUP_PROTOCOL))
@@ -245,7 +251,7 @@ class GroupCoordinator(val brokerId: Int,
                           memberId: String,
                           groupAssignment: Map[String, Array[Byte]],
                           responseCallback: SyncCallback) {
-    group synchronized {
+    group.inLock {
       if (!group.has(memberId)) {
         responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
       } else if (generationId != group.generationId) {
@@ -270,7 +276,7 @@ class GroupCoordinator(val brokerId: Int,
               val assignment = groupAssignment ++ missing.map(_ -> 
Array.empty[Byte]).toMap
 
               groupManager.storeGroup(group, assignment, (error: Errors) => {
-                group synchronized {
+                group.inLock {
                   // another member may have joined the group while we were 
awaiting this callback,
                   // so we must ensure we are still in the AwaitingSync state 
and the same generation
                   // when it gets invoked. if we have transitioned to another 
state, then do nothing
@@ -314,7 +320,7 @@ class GroupCoordinator(val brokerId: Int,
           responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
         case Some(group) =>
-          group synchronized {
+          group.inLock {
             if (group.is(Dead) || !group.has(memberId)) {
               responseCallback(Errors.UNKNOWN_MEMBER_ID)
             } else {
@@ -346,7 +352,7 @@ class GroupCoordinator(val brokerId: Int,
           responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
         case Some(group) =>
-          group synchronized {
+          group.inLock {
             group.currentState match {
               case Dead =>
                 // if the group is marked as dead, it means some other thread 
has just removed the group
@@ -446,7 +452,7 @@ class GroupCoordinator(val brokerId: Int,
                               producerEpoch: Short,
                               offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
                               responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit) {
-    group synchronized {
+    group.inLock {
       if (group.is(Dead)) {
         responseCallback(offsetMetadata.mapValues(_ => 
Errors.UNKNOWN_MEMBER_ID))
       } else if ((generationId < 0 && group.is(Empty)) || (producerId != 
NO_PRODUCER_ID)) {
@@ -503,7 +509,7 @@ class GroupCoordinator(val brokerId: Int,
       groupManager.getGroup(groupId) match {
         case None => (Errors.NONE, GroupCoordinator.DeadGroup)
         case Some(group) =>
-          group synchronized {
+          group.inLock {
             (Errors.NONE, group.summary)
           }
       }
@@ -526,7 +532,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   private def onGroupUnloaded(group: GroupMetadata) {
-    group synchronized {
+    group.inLock {
       info(s"Unloading group metadata for ${group.groupId} with generation 
${group.generationId}")
       val previousState = group.currentState
       group.transitionTo(Dead)
@@ -555,7 +561,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   private def onGroupLoaded(group: GroupMetadata) {
-    group synchronized {
+    group.inLock {
       info(s"Loading group metadata for ${group.groupId} with generation 
${group.generationId}")
       assert(group.is(Stable) || group.is(Empty))
       
group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group,
 _))
@@ -663,7 +669,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   private def maybePrepareRebalance(group: GroupMetadata) {
-    group synchronized {
+    group.inLock {
       if (group.canRebalance)
         prepareRebalance(group)
     }
@@ -703,7 +709,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
-    group synchronized {
+    group.inLock {
       if (group.notYetRejoinedMembers.isEmpty)
         forceComplete()
       else false
@@ -715,7 +721,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def onCompleteJoin(group: GroupMetadata) {
-    group synchronized {
+    group.inLock {
       // remove any members who haven't joined the group yet
       group.notYetRejoinedMembers.foreach { failedMember =>
         group.remove(failedMember.memberId)
@@ -765,7 +771,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, 
heartbeatDeadline: Long, forceComplete: () => Boolean) = {
-    group synchronized {
+    group.inLock {
       if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving)
         forceComplete()
       else false
@@ -773,7 +779,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, 
heartbeatDeadline: Long) {
-    group synchronized {
+    group.inLock {
       if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
         info(s"Member ${member.memberId} in group ${group.groupId} has failed, 
removing it from the group")
         removeMemberAndUpdateGroup(group, member)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 35a1fc7..aa4887d 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -17,9 +17,10 @@
 package kafka.coordinator.group
 
 import java.util.UUID
+import java.util.concurrent.locks.ReentrantLock
 
 import kafka.common.OffsetAndMetadata
-import kafka.utils.{Logging, nonthreadsafe}
+import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
 import org.apache.kafka.common.TopicPartition
 
 import scala.collection.{Seq, immutable, mutable}
@@ -154,6 +155,8 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
 
   private var state: GroupState = initialState
 
+  private[group] val lock = new ReentrantLock
+
   private val members = new mutable.HashMap[String, MemberMetadata]
 
   private val offsets = new mutable.HashMap[TopicPartition, 
CommitRecordMetadataAndOffset]
@@ -172,6 +175,8 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
   var protocol: String = null
   var newMemberAdded: Boolean = false
 
+  def inLock[T](fun: => T): T = CoreUtils.inLock(lock)(fun)
+
   def is(groupState: GroupState) = state == groupState
   def not(groupState: GroupState) = state != groupState
   def has(memberId: String) = members.contains(memberId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 7c0fa6b..b1c2d0c 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -85,7 +85,7 @@ class GroupMetadataManager(brokerId: Int,
   newGauge("NumOffsets",
     new Gauge[Int] {
       def value = groupMetadataCache.values.map(group => {
-        group synchronized { group.numOffsets }
+        group.inLock { group.numOffsets }
       }).sum
     }
   )
@@ -242,6 +242,7 @@ class GroupMetadataManager(brokerId: Int,
       internalTopicsAllowed = true,
       isFromClient = false,
       entriesPerPartition = records,
+      delayedProduceLock = Some(group.lock),
       responseCallback = callback)
   }
 
@@ -259,7 +260,7 @@ class GroupMetadataManager(brokerId: Int,
       validateOffsetMetadataLength(offsetAndMetadata.metadata)
     }
 
-    group synchronized {
+    group.inLock {
       if (!group.hasReceivedConsistentOffsetCommits)
         warn(s"group: ${group.groupId} with leader: ${group.leaderId} has 
received offset commits from consumers as well " +
           s"as transactional producers. Mixing both types of offset commits 
will generally result in surprises and " +
@@ -308,7 +309,7 @@ class GroupMetadataManager(brokerId: Int,
             // the offset and metadata to cache if the append status has no 
error
             val status = responseStatus(offsetTopicPartition)
 
-            val responseError = group synchronized {
+            val responseError = group.inLock {
               if (status.error == Errors.NONE) {
                 if (!group.is(Dead)) {
                   filteredOffsetMetadata.foreach { case (topicPartition, 
offsetAndMetadata) =>
@@ -367,12 +368,12 @@ class GroupMetadataManager(brokerId: Int,
           }
 
           if (isTxnOffsetCommit) {
-            group synchronized {
+            group.inLock {
               addProducerGroup(producerId, group.groupId)
               group.prepareTxnOffsetCommit(producerId, offsetMetadata)
             }
           } else {
-            group synchronized {
+            group.inLock {
               group.prepareOffsetCommit(offsetMetadata)
             }
           }
@@ -401,7 +402,7 @@ class GroupMetadataManager(brokerId: Int,
         (topicPartition, new 
OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", 
Errors.NONE))
       }.toMap
     } else {
-      group synchronized {
+      group.inLock {
         if (group.is(Dead)) {
           topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
             (topicPartition, new 
OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", 
Errors.NONE))
@@ -673,7 +674,7 @@ class GroupMetadataManager(brokerId: Int,
     var offsetsRemoved = 0
 
     groupMetadataCache.foreach { case (groupId, group) =>
-      val (removedOffsets, groupIsDead, generation) = group synchronized {
+      val (removedOffsets, groupIsDead, generation) = group.inLock {
         val removedOffsets = deletedTopicPartitions match {
           case Some(topicPartitions) => group.removeOffsets(topicPartitions)
           case None => group.removeExpiredOffsets(startMs)
@@ -746,7 +747,7 @@ class GroupMetadataManager(brokerId: Int,
     val pendingGroups = groupsBelongingToPartitions(producerId, 
completedPartitions)
     pendingGroups.foreach { case (groupId) =>
       getGroup(groupId) match {
-        case Some(group) => group synchronized {
+        case Some(group) => group.inLock {
           if (!group.is(Dead)) {
             group.completePendingTxnOffsetCommit(producerId, isCommit)
             removeProducerGroup(producerId, groupId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala 
b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
index bc0f1b7..cf18b81 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
@@ -17,6 +17,7 @@
 package kafka.coordinator.transaction
 
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.Lock
 
 import kafka.server.DelayedOperation
 import org.apache.kafka.common.protocol.Errors
@@ -25,11 +26,12 @@ import org.apache.kafka.common.protocol.Errors
   * Delayed transaction state change operations that are added to the 
purgatory without timeout (i.e. these operations should never time out)
   */
 private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata,
-                                           completionCallback: Errors => Unit)
-  extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365)) {
+                                           completionCallback: Errors => Unit,
+                                           lock: Lock)
+  extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365), Some(lock)) {
 
   override def tryComplete(): Boolean = {
-    txnMetadata synchronized {
+    txnMetadata.inLock {
       if (txnMetadata.topicPartitions.isEmpty)
         forceComplete()
       else false

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 85c19c5..74be79c 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -131,7 +131,7 @@ class TransactionCoordinator(brokerId: Int,
           val coordinatorEpoch = existingEpochAndMetadata.coordinatorEpoch
           val txnMetadata = existingEpochAndMetadata.transactionMetadata
 
-          txnMetadata synchronized {
+          txnMetadata.inLock {
             prepareInitProduceIdTransit(transactionalId, transactionTimeoutMs, 
coordinatorEpoch, txnMetadata)
           }
       }
@@ -235,7 +235,7 @@ class TransactionCoordinator(brokerId: Int,
           val txnMetadata = epochAndMetadata.transactionMetadata
 
           // generate the new transaction metadata with added partitions
-          txnMetadata synchronized {
+          txnMetadata.inLock {
             if (txnMetadata.producerId != producerId) {
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             } else if (txnMetadata.producerEpoch != producerEpoch) {
@@ -298,7 +298,7 @@ class TransactionCoordinator(brokerId: Int,
           val txnMetadata = epochAndTxnMetadata.transactionMetadata
           val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
 
-          txnMetadata synchronized {
+          txnMetadata.inLock {
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             else if (txnMetadata.producerEpoch != producerEpoch)
@@ -362,7 +362,7 @@ class TransactionCoordinator(brokerId: Int,
                 case Some(epochAndMetadata) =>
                   if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
                     val txnMetadata = epochAndMetadata.transactionMetadata
-                    txnMetadata synchronized {
+                    txnMetadata.inLock {
                       if (txnMetadata.producerId != producerId)
                         Left(Errors.INVALID_PRODUCER_ID_MAPPING)
                       else if (txnMetadata.producerEpoch != producerEpoch)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 9c3ffd9..c3086e3 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -263,7 +263,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
       }
     }
 
-    val delayedTxnMarker = new DelayedTxnMarker(txnMetadata, 
appendToLogCallback)
+    val delayedTxnMarker = new DelayedTxnMarker(txnMetadata, 
appendToLogCallback, txnStateManager.stateReadLock)
     txnMarkerPurgatory.tryCompleteElseWatch(delayedTxnMarker, 
Seq(transactionalId))
 
     addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId, 
txnMetadata.producerEpoch, txnResult, coordinatorEpoch, 
txnMetadata.topicPartitions.toSet)
@@ -340,7 +340,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
 
                 val txnMetadata = epochAndMetadata.transactionMetadata
 
-                txnMetadata synchronized {
+                txnMetadata.inLock {
                   topicPartitions.foreach(txnMetadata.removePartition)
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 54960b9..f01f314 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -131,7 +131,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: 
Int,
               txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
               abortSending = true
             } else {
-              txnMetadata synchronized {
+              txnMetadata.inLock {
                 for ((topicPartition: TopicPartition, error: Errors) <- 
errors) {
                   error match {
                     case Errors.NONE =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 405e639..486a887 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -16,7 +16,9 @@
  */
 package kafka.coordinator.transaction
 
-import kafka.utils.{Logging, nonthreadsafe}
+import java.util.concurrent.locks.ReentrantLock
+
+import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.RecordBatch
 
@@ -156,6 +158,10 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
   // initialized as the same as the current state
   var pendingState: Option[TransactionState] = None
 
+  private[transaction] val lock = new ReentrantLock
+
+  def inLock[T](fun: => T): T = CoreUtils.inLock(lock)(fun)
+
   def addPartitions(partitions: collection.Set[TopicPartition]): Unit = {
     topicPartitions ++= partitions
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index ed63620..b0ec067 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -56,6 +56,16 @@ object TransactionStateManager {
  * 1. the transaction log, which is a special internal topic.
  * 2. the transaction metadata including its ongoing transaction status.
  * 3. the background expiration of the transaction as well as the 
transactional id.
+ *
+ * <b>Delayed operation locking notes:</b>
+ * Delayed operations in TransactionStateManager use `stateLock.readLock` as 
the delayed operation
+ * lock. Delayed operations are completed only if `stateLock.readLock` can be 
acquired.
+ * Delayed callbacks may acquire `stateLock.readLock` or any of the 
`txnMetadata` locks.
+ * <ul>
+ * <li>`stateLock.readLock` must never be acquired while holding `txnMetadata` 
lock.</li>
+ * <li>`txnMetadata` lock must never be acquired while holding 
`stateLock.writeLock`.</li>
+ * <li>`ReplicaManager.appendRecords` should never be invoked while holding a 
`txnMetadata` lock.</li>
+ * </ul>
  */
 class TransactionStateManager(brokerId: Int,
                               zkUtils: ZkUtils,
@@ -95,6 +105,7 @@ class TransactionStateManager(brokerId: Int,
       loadingPartitions.add(partitionAndLeaderEpoch)
     }
   }
+  private[transaction] def stateReadLock = stateLock.readLock
 
   // this is best-effort expiration of an ongoing transaction which has been 
open for more than its
   // txn timeout value, we do not need to grab the lock on the metadata object 
upon checking its state
@@ -136,7 +147,7 @@ class TransactionStateManager(brokerId: Int,
             }.filter { case (_, txnMetadata) =>
               txnMetadata.txnLastUpdateTimestamp <= now - 
config.transactionalIdExpirationMs
             }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata synchronized {
+              val txnMetadataTransition = txnMetadata.inLock {
                 txnMetadata.prepareDead()
               }
               TransactionalIdCoordinatorEpochAndMetadata(transactionalId, 
entry.coordinatorEpoch, txnMetadataTransition)
@@ -166,7 +177,7 @@ class TransactionStateManager(brokerId: Int,
                     .foreach { txnMetadataCacheEntry =>
                       toRemove.foreach { idCoordinatorEpochAndMetadata =>
                         val txnMetadata = 
txnMetadataCacheEntry.metadataPerTransactionalId.get(idCoordinatorEpochAndMetadata.transactionalId)
-                        txnMetadata synchronized {
+                        txnMetadata.inLock {
                           if (txnMetadataCacheEntry.coordinatorEpoch == 
idCoordinatorEpochAndMetadata.coordinatorEpoch
                             && txnMetadata.pendingState.contains(Dead)
                             && txnMetadata.producerEpoch == 
idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
@@ -196,7 +207,8 @@ class TransactionStateManager(brokerId: Int,
           internalTopicsAllowed = true,
           isFromClient = false,
           recordsPerPartition,
-          removeFromCacheCallback
+          removeFromCacheCallback,
+          Some(stateLock.readLock)
         )
       }
 
@@ -376,7 +388,7 @@ class TransactionStateManager(brokerId: Int,
           val transactionsPendingForCompletion = new 
mutable.ListBuffer[TransactionalIdCoordinatorEpochAndTransitMetadata]
           loadedTransactions.foreach {
             case (transactionalId, txnMetadata) =>
-              txnMetadata synchronized {
+              txnMetadata.inLock {
                 // if state is PrepareCommit or PrepareAbort we need to 
complete the transaction
                 txnMetadata.state match {
                   case PrepareAbort =>
@@ -508,7 +520,7 @@ class TransactionStateManager(brokerId: Int,
           case Right(Some(epochAndMetadata)) =>
             val metadata = epochAndMetadata.transactionMetadata
 
-            metadata synchronized {
+            metadata.inLock {
               if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
                 // the cache may have been changed due to txn topic partition 
emigration and immigration,
                 // in this case directly return NOT_COORDINATOR to client and 
let it to re-discover the transaction coordinator
@@ -535,7 +547,7 @@ class TransactionStateManager(brokerId: Int,
         getTransactionState(transactionalId) match {
           case Right(Some(epochAndTxnMetadata)) =>
             val metadata = epochAndTxnMetadata.transactionMetadata
-            metadata synchronized {
+            metadata.inLock {
               if (epochAndTxnMetadata.coordinatorEpoch == coordinatorEpoch) {
                 if (retryOnError(responseError)) {
                   info(s"TransactionalId ${metadata.transactionalId} append 
transaction log for $newMetadata transition failed due to $responseError, " +
@@ -585,24 +597,28 @@ class TransactionStateManager(brokerId: Int,
         case Right(Some(epochAndMetadata)) =>
           val metadata = epochAndMetadata.transactionMetadata
 
-          metadata synchronized {
+          val append: Boolean = metadata.inLock {
             if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
               // the coordinator epoch has changed, reply to client 
immediately with with NOT_COORDINATOR
               responseCallback(Errors.NOT_COORDINATOR)
+              false
             } else {
               // do not need to check the metadata object itself since no 
concurrent thread should be able to modify it
               // under the same coordinator epoch, so directly append to txn 
log now
-
-              replicaManager.appendRecords(
+              true
+            }
+          }
+          if (append) {
+            replicaManager.appendRecords(
                 newMetadata.txnTimeoutMs.toLong,
                 TransactionLog.EnforcedRequiredAcks,
                 internalTopicsAllowed = true,
                 isFromClient = false,
                 recordsPerPartition,
-                updateCacheCallback)
+                updateCacheCallback,
+                delayedProduceLock = Some(stateLock.readLock))
 
               trace(s"Appending new metadata $newMetadata for transaction id 
$transactionalId with coordinator epoch $coordinatorEpoch to the local 
transaction log")
-            }
           }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala 
b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 2188923..9b35165 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
+import java.util.concurrent.locks.{Lock, ReentrantLock, ReentrantReadWriteLock}
 
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
@@ -43,11 +43,12 @@ import scala.collection.mutable.ListBuffer
  *
  * A subclass of DelayedOperation needs to provide an implementation of both 
onComplete() and tryComplete().
  */
-abstract class DelayedOperation(override val delayMs: Long) extends TimerTask 
with Logging {
+abstract class DelayedOperation(override val delayMs: Long,
+    lockOpt: Option[Lock] = None) extends TimerTask with Logging {
 
   private val completed = new AtomicBoolean(false)
   // Visible for testing
-  private[server] val lock: ReentrantLock = new ReentrantLock
+  private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)
 
   /*
    * Force completing the delayed operation, if not already completed.

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala 
b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 9ae5a72..8b5c120 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.Lock
 
 import com.yammer.metrics.core.Meter
 import kafka.metrics.KafkaMetricsGroup
@@ -54,8 +55,9 @@ case class ProduceMetadata(produceRequiredAcks: Short,
 class DelayedProduce(delayMs: Long,
                      produceMetadata: ProduceMetadata,
                      replicaManager: ReplicaManager,
-                     responseCallback: Map[TopicPartition, PartitionResponse] 
=> Unit)
-  extends DelayedOperation(delayMs) {
+                     responseCallback: Map[TopicPartition, PartitionResponse] 
=> Unit,
+                     lockOpt: Option[Lock] = None)
+  extends DelayedOperation(delayMs, lockOpt) {
 
   // first update the acks pending variable according to the error code
   produceMetadata.produceStatus.foreach { case (topicPartition, status) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index dc01788..13d1d93 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -19,6 +19,7 @@ package kafka.server
 import java.io.{File, IOException}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import java.util.concurrent.locks.Lock
 
 import com.yammer.metrics.core.Gauge
 import kafka.api._
@@ -366,7 +367,8 @@ class ReplicaManager(val config: KafkaConfig,
                     internalTopicsAllowed: Boolean,
                     isFromClient: Boolean,
                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
-                    responseCallback: Map[TopicPartition, PartitionResponse] 
=> Unit) {
+                    responseCallback: Map[TopicPartition, PartitionResponse] 
=> Unit,
+                    delayedProduceLock: Option[Lock] = None) {
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
       val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
@@ -383,7 +385,7 @@ class ReplicaManager(val config: KafkaConfig,
       if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, 
localProduceResults)) {
         // create delayed produce operation
         val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
-        val delayedProduce = new DelayedProduce(timeout, produceMetadata, 
this, responseCallback)
+        val delayedProduce = new DelayedProduce(timeout, produceMetadata, 
this, responseCallback, delayedProduceLock)
 
         // create a list of (topic, partition) pairs to use as keys for this 
delayed produce operation
         val producerRequestKeys = entriesPerPartition.keys.map(new 
TopicPartitionOperationKey(_)).toSeq

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 19a475f..ac728f2 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -28,6 +28,7 @@ import 
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.{JoinGroupRequest, 
OffsetCommitRequest, OffsetFetchResponse, TransactionResult}
 import org.easymock.{Capture, EasyMock, IAnswer}
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
 
 import org.apache.kafka.common.internals.Topic
 import org.junit.Assert._
@@ -1350,7 +1351,8 @@ class GroupCoordinatorTest extends JUnitSuite {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
+      EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]])).andAnswer(new 
IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId) ->
           new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
@@ -1432,8 +1434,8 @@ class GroupCoordinatorTest extends JUnitSuite {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument))
-    ).andAnswer(new IAnswer[Unit] {
+      EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]])).andAnswer(new 
IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
           Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId) ->
             new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
@@ -1460,8 +1462,8 @@ class GroupCoordinatorTest extends JUnitSuite {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument))
-    ).andAnswer(new IAnswer[Unit] {
+      EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]])).andAnswer(new 
IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupCoordinator.partitionFor(groupId)) ->
           new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index aacfbbc..df4a15a 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.internals.Topic
 
 import scala.collection.JavaConverters._
 import scala.collection._
+import java.util.concurrent.locks.ReentrantLock
 
 class GroupMetadataManagerTest {
 
@@ -1305,8 +1306,8 @@ class GroupMetadataManagerTest {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument))
-    )
+      EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]]))
     
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
     capturedArgument
   }
@@ -1318,8 +1319,8 @@ class GroupMetadataManagerTest {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument))
-    ).andAnswer(new IAnswer[Unit] {
+      EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]])).andAnswer(new 
IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(groupTopicPartition ->
           new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 9e7fb13..d537bb0 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -16,6 +16,8 @@
  */
 package kafka.coordinator.transaction
 
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
 import kafka.utils.timer.MockTimer
 import kafka.utils.TestUtils
@@ -86,7 +88,10 @@ class TransactionMarkerChannelManagerTest {
     
EasyMock.expect(txnStateManager.getTransactionState(EasyMock.eq(transactionalId2)))
       .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata2))))
       .anyTimes()
-
+    val stateLock = new ReentrantReadWriteLock
+    EasyMock.expect(txnStateManager.stateReadLock)
+      .andReturn(stateLock.readLock)
+      .anyTimes()
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 524f971..7b10c80 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -17,6 +17,7 @@
 package kafka.coordinator.transaction
 
 import java.nio.ByteBuffer
+import java.util.concurrent.locks.ReentrantLock
 
 import kafka.log.Log
 import kafka.server.{FetchDataInfo, LogOffsetMetadata, ReplicaManager}
@@ -497,8 +498,8 @@ class TransactionStateManagerTest {
           EasyMock.eq(true),
           EasyMock.eq(false),
           EasyMock.eq(recordsByPartition),
-          EasyMock.capture(capturedArgument)
-        )).andAnswer(new IAnswer[Unit] {
+          EasyMock.capture(capturedArgument),
+          
EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]])).andAnswer(new 
IAnswer[Unit] {
           override def answer(): Unit = {
             capturedArgument.getValue.apply(
               Map(partition ->
@@ -596,8 +597,8 @@ class TransactionStateManagerTest {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument))
-    ).andAnswer(new IAnswer[Unit] {
+      EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]])).andAnswer(new 
IAnswer[Unit] {
         override def answer(): Unit = capturedArgument.getValue.apply(
           Map(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) ->
             new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index fdfb582..d4d79e5 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -124,12 +124,28 @@ class DelayedOperationTest {
 
   @Test
   def testDelayedOperationLock() {
+    verifyDelayedOperationLock(new MockDelayedOperation(100000L), 
mismatchedLocks = false)
+  }
+
+  @Test
+  def testDelayedOperationLockOverride() {
+    def newMockOperation = {
+      val lock = new ReentrantLock
+      new MockDelayedOperation(100000L, Some(lock), Some(lock))
+    }
+    verifyDelayedOperationLock(newMockOperation, mismatchedLocks = false)
+
+    verifyDelayedOperationLock(new MockDelayedOperation(100000L, None, 
Some(new ReentrantLock)),
+        mismatchedLocks = true)
+  }
+
+  def verifyDelayedOperationLock(mockDelayedOperation: => 
MockDelayedOperation, mismatchedLocks: Boolean) {
     val key = "key"
     val executorService = Executors.newSingleThreadExecutor
     try {
       def createDelayedOperations(count: Int): Seq[MockDelayedOperation] = {
         (1 to count).map { _ =>
-          val op = new MockDelayedOperation(100000L)
+          val op = mockDelayedOperation
           purgatory.tryCompleteElseWatch(op, Seq(key))
           assertFalse("Not completable", op.isCompleted)
           op
@@ -138,7 +154,7 @@ class DelayedOperationTest {
 
       def createCompletableOperations(count: Int): Seq[MockDelayedOperation] = 
{
         (1 to count).map { _ =>
-          val op = new MockDelayedOperation(100000L)
+          val op = mockDelayedOperation
           op.completable = true
           op
         }
@@ -182,6 +198,27 @@ class DelayedOperationTest {
         checkAndComplete(ops, Seq(ops(1)))
       } finally {
         runOnAnotherThread(ops(0).lock.unlock(), true)
+        checkAndComplete(Seq(ops(0)), Seq(ops(0)))
+      }
+
+      // Lock acquired by response callback held by another thread, should not 
block
+      // if the response lock is used as operation lock, only operations
+      // that can be locked without blocking on the current thread should 
complete
+      ops = createDelayedOperations(2)
+      ops(0).responseLockOpt.foreach { lock =>
+        runOnAnotherThread(lock.lock(), true)
+        try {
+          try {
+            checkAndComplete(ops, Seq(ops(1)))
+            assertFalse("Should have failed with mismatched locks", 
mismatchedLocks)
+          } catch {
+            case e: IllegalStateException =>
+              assertTrue("Should not have failed with valid locks", 
mismatchedLocks)
+          }
+        } finally {
+          runOnAnotherThread(lock.unlock(), true)
+          checkAndComplete(Seq(ops(0)), Seq(ops(0)))
+        }
       }
 
       // Immediately completable operations should complete without locking
@@ -197,8 +234,9 @@ class DelayedOperationTest {
   }
 
 
-  class MockDelayedOperation(delayMs: Long)
-    extends DelayedOperation(delayMs) {
+  class MockDelayedOperation(delayMs: Long,
+      lockOpt: Option[ReentrantLock] = None,
+      val responseLockOpt: Option[ReentrantLock] = None) extends 
DelayedOperation(delayMs, lockOpt) {
     var completable = false
 
     def awaitExpiration() {
@@ -219,6 +257,10 @@ class DelayedOperationTest {
     }
 
     override def onComplete() {
+      responseLockOpt.foreach { lock =>
+        if (!lock.tryLock())
+          throw new IllegalStateException("Response callback lock could not be 
acquired in callback")
+      }
       synchronized {
         notify()
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4701d4cc/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d4718ec..86bd135 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -178,7 +178,8 @@ class KafkaApisTest {
       EasyMock.eq(true),
       EasyMock.eq(false),
       EasyMock.anyObject(),
-      EasyMock.capture(responseCallback))).andAnswer(new IAnswer[Unit] {
+      EasyMock.capture(responseCallback),
+      EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
       override def answer(): Unit = {
         responseCallback.getValue.apply(Map(tp2 -> new 
PartitionResponse(Errors.NONE)))
       }
@@ -215,7 +216,8 @@ class KafkaApisTest {
       EasyMock.eq(true),
       EasyMock.eq(false),
       EasyMock.anyObject(),
-      EasyMock.capture(responseCallback))).andAnswer(new IAnswer[Unit] {
+      EasyMock.capture(responseCallback),
+      EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
       override def answer(): Unit = {
         responseCallback.getValue.apply(Map(tp2 -> new 
PartitionResponse(Errors.NONE)))
       }
@@ -244,6 +246,7 @@ class KafkaApisTest {
       EasyMock.eq(true),
       EasyMock.eq(false),
       EasyMock.anyObject(),
+      EasyMock.anyObject(),
       EasyMock.anyObject()))
 
     EasyMock.replay(replicaManager)

Reply via email to