Repository: kafka
Updated Branches:
  refs/heads/0.11.0 51ea8e76b -> ab554caee


KAFKA-5970; Use ReentrantLock for delayed operation lock to avoid blocking

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Guozhang Wang <wangg...@gmail.com>, Ismael Juma <ism...@juma.me.uk>, 
Jason Gustafson <ja...@confluent.io>

Closes #3956 from rajinisivaram/KAFKA-5970-delayedproduce-deadlock

(cherry picked from commit f8621b4174ddb14f9e8377da34e81e1b7ddd205f)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ab554cae
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ab554cae
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ab554cae

Branch: refs/heads/0.11.0
Commit: ab554caee561e522e3eb63163177a68088960169
Parents: 51ea8e7
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Wed Oct 4 11:35:37 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Oct 4 12:16:25 2017 -0700

----------------------------------------------------------------------
 .../coordinator/group/DelayedHeartbeat.scala    |  4 -
 .../kafka/coordinator/group/DelayedJoin.scala   |  4 -
 .../group/GroupMetadataManager.scala            |  3 +-
 .../transaction/DelayedTxnMarker.scala          |  4 -
 .../transaction/TransactionStateManager.scala   |  6 +-
 .../scala/kafka/server/DelayedOperation.scala   | 29 ++++---
 .../scala/kafka/server/DelayedProduce.scala     | 11 +--
 .../scala/kafka/server/ReplicaManager.scala     |  5 +-
 .../group/GroupCoordinatorTest.scala            |  9 +--
 .../group/GroupMetadataManagerTest.scala        |  6 +-
 .../TransactionStateManagerTest.scala           |  6 +-
 .../kafka/server/DelayedOperationTest.scala     | 81 +++++++++++++++++++-
 .../scala/unit/kafka/server/KafkaApisTest.scala |  7 +-
 13 files changed, 115 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ab554cae/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala 
b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
index 2cbdf30..73d5d0f 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
@@ -30,10 +30,6 @@ private[group] class DelayedHeartbeat(coordinator: 
GroupCoordinator,
                                       sessionTimeout: Long)
   extends DelayedOperation(sessionTimeout) {
 
-  // overridden since tryComplete already synchronizes on the group. This 
makes it safe to
-  // call purgatory operations while holding the group lock.
-  override def safeTryComplete(): Boolean = tryComplete()
-
   override def tryComplete(): Boolean = 
coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, 
forceComplete _)
   override def onExpiration() = coordinator.onExpireHeartbeat(group, member, 
heartbeatDeadline)
   override def onComplete() = coordinator.onCompleteHeartbeat()

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab554cae/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala 
b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
index 6a81242..5232287 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
@@ -35,10 +35,6 @@ private[group] class DelayedJoin(coordinator: 
GroupCoordinator,
                                  group: GroupMetadata,
                                  rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout) {
 
-  // overridden since tryComplete already synchronizes on the group. This 
makes it safe to
-  // call purgatory operations while holding the group lock.
-  override def safeTryComplete(): Boolean = tryComplete()
-
   override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, 
forceComplete _)
   override def onExpiration() = coordinator.onExpireJoin()
   override def onComplete() = coordinator.onCompleteJoin(group)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab554cae/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 9322ff2..7c0fa6b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -242,8 +242,7 @@ class GroupMetadataManager(brokerId: Int,
       internalTopicsAllowed = true,
       isFromClient = false,
       entriesPerPartition = records,
-      responseCallback = callback,
-      delayedProduceLock = Some(group))
+      responseCallback = callback)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab554cae/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala 
b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
index 82c4a8c..bc0f1b7 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
@@ -28,10 +28,6 @@ private[transaction] class DelayedTxnMarker(txnMetadata: 
TransactionMetadata,
                                            completionCallback: Errors => Unit)
   extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365)) {
 
-  // overridden since tryComplete already synchronizes on the existing txn 
metadata. This makes it safe to
-  // call purgatory operations while holding the group lock.
-  override def safeTryComplete(): Boolean = tryComplete()
-
   override def tryComplete(): Boolean = {
     txnMetadata synchronized {
       if (txnMetadata.topicPartitions.isEmpty)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab554cae/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index e0d5076..ed63620 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -196,8 +196,7 @@ class TransactionStateManager(brokerId: Int,
           internalTopicsAllowed = true,
           isFromClient = false,
           recordsPerPartition,
-          removeFromCacheCallback,
-          None
+          removeFromCacheCallback
         )
       }
 
@@ -600,8 +599,7 @@ class TransactionStateManager(brokerId: Int,
                 internalTopicsAllowed = true,
                 isFromClient = false,
                 recordsPerPartition,
-                updateCacheCallback,
-                delayedProduceLock = Some(newMetadata))
+                updateCacheCallback)
 
               trace(s"Appending new metadata $newMetadata for transaction id 
$transactionalId with coordinator epoch $coordinatorEpoch to the local 
transaction log")
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab554cae/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala 
b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 4ae1b13..2188923 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import java.util.concurrent.locks.ReentrantReadWriteLock
+import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
 
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
@@ -46,6 +46,8 @@ import scala.collection.mutable.ListBuffer
 abstract class DelayedOperation(override val delayMs: Long) extends TimerTask 
with Logging {
 
   private val completed = new AtomicBoolean(false)
+  // Visible for testing
+  private[server] val lock: ReentrantLock = new ReentrantLock
 
   /*
    * Force completing the delayed operation, if not already completed.
@@ -96,13 +98,18 @@ abstract class DelayedOperation(override val delayMs: Long) 
extends TimerTask wi
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete(). This can be overridden if the 
operation provides its
-   * own synchronization.
+   * Thread-safe variant of tryComplete() that attempts completion only if the 
lock can be acquired
+   * without blocking.
    */
-  def safeTryComplete(): Boolean = {
-    synchronized {
-      tryComplete()
-    }
+  private[server] def maybeTryComplete(): Boolean = {
+    if (lock.tryLock()) {
+      try {
+        tryComplete()
+      } finally {
+        lock.unlock()
+      }
+    } else
+      false
   }
 
   /*
@@ -196,7 +203,9 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
     // operation is unnecessarily added for watch. However, this is a less 
severe issue since the
     // expire reaper will clean it up periodically.
 
-    var isCompletedByMe = operation.safeTryComplete()
+    // At this point the only thread that can attempt this operation is this 
current thread
+    // Hence it is safe to tryComplete() without a lock
+    var isCompletedByMe = operation.tryComplete()
     if (isCompletedByMe)
       return true
 
@@ -213,7 +222,7 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
       }
     }
 
-    isCompletedByMe = operation.safeTryComplete()
+    isCompletedByMe = operation.maybeTryComplete()
     if (isCompletedByMe)
       return true
 
@@ -335,7 +344,7 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
         if (curr.isCompleted) {
           // another thread has completed this operation, just remove it
           iter.remove()
-        } else if (curr.safeTryComplete()) {
+        } else if (curr.maybeTryComplete()) {
           iter.remove()
           completed += 1
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab554cae/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala 
b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 0ff8d34..9ae5a72 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
 import com.yammer.metrics.core.Meter
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Pool
+
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -53,12 +54,9 @@ case class ProduceMetadata(produceRequiredAcks: Short,
 class DelayedProduce(delayMs: Long,
                      produceMetadata: ProduceMetadata,
                      replicaManager: ReplicaManager,
-                     responseCallback: Map[TopicPartition, PartitionResponse] 
=> Unit,
-                     lockOpt: Option[Object] = None)
+                     responseCallback: Map[TopicPartition, PartitionResponse] 
=> Unit)
   extends DelayedOperation(delayMs) {
 
-  val lock = lockOpt.getOrElse(this)
-
   // first update the acks pending variable according to the error code
   produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
     if (status.responseStatus.error == Errors.NONE) {
@@ -72,11 +70,6 @@ class DelayedProduce(delayMs: Long,
     trace("Initial partition status for %s is %s".format(topicPartition, 
status))
   }
 
-  override def safeTryComplete(): Boolean = lock synchronized {
-    tryComplete()
-  }
-
-
   /**
    * The delayed produce operation can be completed if every partition
    * it produces to is satisfied by one of the following:

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab554cae/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9b898e9..dc01788 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -366,8 +366,7 @@ class ReplicaManager(val config: KafkaConfig,
                     internalTopicsAllowed: Boolean,
                     isFromClient: Boolean,
                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
-                    responseCallback: Map[TopicPartition, PartitionResponse] 
=> Unit,
-                    delayedProduceLock: Option[Object] = None) {
+                    responseCallback: Map[TopicPartition, PartitionResponse] 
=> Unit) {
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
       val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
@@ -384,7 +383,7 @@ class ReplicaManager(val config: KafkaConfig,
       if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, 
localProduceResults)) {
         // create delayed produce operation
         val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
-        val delayedProduce = new DelayedProduce(timeout, produceMetadata, 
this, responseCallback, delayedProduceLock)
+        val delayedProduce = new DelayedProduce(timeout, produceMetadata, 
this, responseCallback)
 
         // create a list of (topic, partition) pairs to use as keys for this 
delayed produce operation
         val producerRequestKeys = entriesPerPartition.keys.map(new 
TopicPartitionOperationKey(_)).toSeq

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab554cae/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 592e343..19a475f 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -1350,8 +1350,7 @@ class GroupCoordinatorTest extends JUnitSuite {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]])).andAnswer(new 
IAnswer[Unit] {
+      EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId) ->
           new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
@@ -1433,8 +1432,7 @@ class GroupCoordinatorTest extends JUnitSuite {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]])
+      EasyMock.capture(capturedArgument))
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
           Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId) ->
@@ -1462,8 +1460,7 @@ class GroupCoordinatorTest extends JUnitSuite {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]])
+      EasyMock.capture(capturedArgument))
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupCoordinator.partitionFor(groupId)) ->

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab554cae/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index a8ce17e..aacfbbc 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1305,8 +1305,7 @@ class GroupMetadataManagerTest {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]])
+      EasyMock.capture(capturedArgument))
     )
     
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
     capturedArgument
@@ -1319,8 +1318,7 @@ class GroupMetadataManagerTest {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]])
+      EasyMock.capture(capturedArgument))
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(groupTopicPartition ->

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab554cae/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 6a35d41..524f971 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -497,8 +497,7 @@ class TransactionStateManagerTest {
           EasyMock.eq(true),
           EasyMock.eq(false),
           EasyMock.eq(recordsByPartition),
-          EasyMock.capture(capturedArgument),
-          EasyMock.eq(None)
+          EasyMock.capture(capturedArgument)
         )).andAnswer(new IAnswer[Unit] {
           override def answer(): Unit = {
             capturedArgument.getValue.apply(
@@ -597,8 +596,7 @@ class TransactionStateManagerTest {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument),
-      EasyMock.anyObject())
+      EasyMock.capture(capturedArgument))
     ).andAnswer(new IAnswer[Unit] {
         override def answer(): Unit = capturedArgument.getValue.apply(
           Map(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) ->

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab554cae/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index 82cf642..fdfb582 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -17,6 +17,11 @@
 
 package kafka.server
 
+import java.util.concurrent.{Executors, Future}
+import java.util.concurrent.locks.ReentrantLock
+
+import kafka.utils.CoreUtils.inLock
+
 import org.apache.kafka.common.utils.Time
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
@@ -117,9 +122,83 @@ class DelayedOperationTest {
     assertEquals(Nil, cancelledOperations)
   }
 
+  @Test
+  def testDelayedOperationLock() {
+    val key = "key"
+    val executorService = Executors.newSingleThreadExecutor
+    try {
+      def createDelayedOperations(count: Int): Seq[MockDelayedOperation] = {
+        (1 to count).map { _ =>
+          val op = new MockDelayedOperation(100000L)
+          purgatory.tryCompleteElseWatch(op, Seq(key))
+          assertFalse("Not completable", op.isCompleted)
+          op
+        }
+      }
+
+      def createCompletableOperations(count: Int): Seq[MockDelayedOperation] = 
{
+        (1 to count).map { _ =>
+          val op = new MockDelayedOperation(100000L)
+          op.completable = true
+          op
+        }
+      }
+
+      def runOnAnotherThread(fun: => Unit, shouldComplete: Boolean): Future[_] 
= {
+        val future = executorService.submit(new Runnable {
+          def run() = fun
+        })
+        if (shouldComplete)
+          future.get()
+        else
+          assertFalse("Should not have completed", future.isDone)
+        future
+      }
+
+      def checkAndComplete(completableOps: Seq[MockDelayedOperation], 
expectedComplete: Seq[MockDelayedOperation]): Unit = {
+        completableOps.foreach(op => op.completable = true)
+        val completed = purgatory.checkAndComplete(key)
+        assertEquals(expectedComplete.size, completed)
+        expectedComplete.foreach(op => assertTrue("Should have completed", 
op.isCompleted))
+        val expectedNotComplete = completableOps.toSet -- expectedComplete
+        expectedNotComplete.foreach(op => assertFalse("Should not have 
completed", op.isCompleted))
+      }
+
+      // If locks are free all completable operations should complete
+      var ops = createDelayedOperations(2)
+      checkAndComplete(ops, ops)
+
+      // Lock held by current thread, completable operations should complete
+      ops = createDelayedOperations(2)
+      inLock(ops(1).lock) {
+        checkAndComplete(ops, ops)
+      }
+
+      // Lock held by another thread, should not block, only operations that 
can be
+      // locked without blocking on the current thread should complete
+      ops = createDelayedOperations(2)
+      runOnAnotherThread(ops(0).lock.lock(), true)
+      try {
+        checkAndComplete(ops, Seq(ops(1)))
+      } finally {
+        runOnAnotherThread(ops(0).lock.unlock(), true)
+      }
+
+      // Immediately completable operations should complete without locking
+      ops = createCompletableOperations(2)
+      ops.foreach { op =>
+        assertTrue("Should have completed", purgatory.tryCompleteElseWatch(op, 
Seq(key)))
+        assertTrue("Should have completed", op.isCompleted)
+      }
+
+    } finally {
+      executorService.shutdown()
+    }
+  }
 
 
-  class MockDelayedOperation(delayMs: Long) extends DelayedOperation(delayMs) {
+  class MockDelayedOperation(delayMs: Long)
+    extends DelayedOperation(delayMs) {
     var completable = false
 
     def awaitExpiration() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab554cae/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 86bd135..d4718ec 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -178,8 +178,7 @@ class KafkaApisTest {
       EasyMock.eq(true),
       EasyMock.eq(false),
       EasyMock.anyObject(),
-      EasyMock.capture(responseCallback),
-      EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
+      EasyMock.capture(responseCallback))).andAnswer(new IAnswer[Unit] {
       override def answer(): Unit = {
         responseCallback.getValue.apply(Map(tp2 -> new 
PartitionResponse(Errors.NONE)))
       }
@@ -216,8 +215,7 @@ class KafkaApisTest {
       EasyMock.eq(true),
       EasyMock.eq(false),
       EasyMock.anyObject(),
-      EasyMock.capture(responseCallback),
-      EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
+      EasyMock.capture(responseCallback))).andAnswer(new IAnswer[Unit] {
       override def answer(): Unit = {
         responseCallback.getValue.apply(Map(tp2 -> new 
PartitionResponse(Errors.NONE)))
       }
@@ -246,7 +244,6 @@ class KafkaApisTest {
       EasyMock.eq(true),
       EasyMock.eq(false),
       EasyMock.anyObject(),
-      EasyMock.anyObject(),
       EasyMock.anyObject()))
 
     EasyMock.replay(replicaManager)

Reply via email to