Repository: kafka Updated Branches: refs/heads/trunk d985513b2 -> f8621b417
KAFKA-5970; Use ReentrantLock for delayed operation lock to avoid blocking Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Guozhang Wang <wangg...@gmail.com>, Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #3956 from rajinisivaram/KAFKA-5970-delayedproduce-deadlock Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f8621b41 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f8621b41 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f8621b41 Branch: refs/heads/trunk Commit: f8621b4174ddb14f9e8377da34e81e1b7ddd205f Parents: d985513 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Wed Oct 4 11:35:37 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Oct 4 11:35:37 2017 -0700 ---------------------------------------------------------------------- .../coordinator/group/DelayedHeartbeat.scala | 4 - .../kafka/coordinator/group/DelayedJoin.scala | 4 - .../group/GroupMetadataManager.scala | 3 +- .../transaction/DelayedTxnMarker.scala | 4 - .../transaction/TransactionStateManager.scala | 6 +- .../scala/kafka/server/DelayedOperation.scala | 29 ++++--- .../scala/kafka/server/DelayedProduce.scala | 11 +-- .../scala/kafka/server/ReplicaManager.scala | 3 +- .../group/GroupCoordinatorTest.scala | 3 - .../group/GroupMetadataManagerTest.scala | 2 - .../TransactionStateManagerTest.scala | 2 - .../kafka/server/DelayedOperationTest.scala | 81 +++++++++++++++++++- .../scala/unit/kafka/server/KafkaApisTest.scala | 3 - 13 files changed, 105 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala index 2cbdf30..73d5d0f 100644 --- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala +++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala @@ -30,10 +30,6 @@ private[group] class DelayedHeartbeat(coordinator: GroupCoordinator, sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { - // overridden since tryComplete already synchronizes on the group. This makes it safe to - // call purgatory operations while holding the group lock. - override def safeTryComplete(): Boolean = tryComplete() - override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _) override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) override def onComplete() = coordinator.onCompleteHeartbeat() http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala index 6a81242..5232287 100644 --- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala +++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala @@ -35,10 +35,6 @@ private[group] class DelayedJoin(coordinator: GroupCoordinator, group: GroupMetadata, rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout) { - // overridden since tryComplete already synchronizes on the group. This makes it safe to - // call purgatory operations while holding the group lock. - override def safeTryComplete(): Boolean = tryComplete() - override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _) override def onExpiration() = coordinator.onExpireJoin() override def onComplete() = coordinator.onCompleteJoin(group) http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index c818b57..7519dc4 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -243,8 +243,7 @@ class GroupMetadataManager(brokerId: Int, internalTopicsAllowed = true, isFromClient = false, entriesPerPartition = records, - responseCallback = callback, - delayedProduceLock = Some(group)) + responseCallback = callback) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala index 82c4a8c..bc0f1b7 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala @@ -28,10 +28,6 @@ private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata, completionCallback: Errors => Unit) extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365)) { - // overridden since tryComplete already synchronizes on the existing txn metadata. This makes it safe to - // call purgatory operations while holding the group lock. - override def safeTryComplete(): Boolean = tryComplete() - override def tryComplete(): Boolean = { txnMetadata synchronized { if (txnMetadata.topicPartitions.isEmpty) http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index 394817c..ad5d33b 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -196,8 +196,7 @@ class TransactionStateManager(brokerId: Int, internalTopicsAllowed = true, isFromClient = false, recordsPerPartition, - removeFromCacheCallback, - None + removeFromCacheCallback ) } @@ -601,8 +600,7 @@ class TransactionStateManager(brokerId: Int, internalTopicsAllowed = true, isFromClient = false, recordsPerPartition, - updateCacheCallback, - delayedProduceLock = Some(newMetadata)) + updateCacheCallback) trace(s"Appending new metadata $newMetadata for transaction id $transactionalId with coordinator epoch $coordinatorEpoch to the local transaction log") } http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/server/DelayedOperation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 8997395..86bf1ff 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -19,7 +19,7 @@ package kafka.server import java.util.concurrent._ import java.util.concurrent.atomic._ -import java.util.concurrent.locks.ReentrantReadWriteLock +import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock} import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup @@ -46,6 +46,8 @@ import scala.collection.mutable.ListBuffer abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging { private val completed = new AtomicBoolean(false) + // Visible for testing + private[server] val lock: ReentrantLock = new ReentrantLock /* * Force completing the delayed operation, if not already completed. @@ -96,13 +98,18 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask wi def tryComplete(): Boolean /** - * Thread-safe variant of tryComplete(). This can be overridden if the operation provides its - * own synchronization. + * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired + * without blocking. */ - def safeTryComplete(): Boolean = { - synchronized { - tryComplete() - } + private[server] def maybeTryComplete(): Boolean = { + if (lock.tryLock()) { + try { + tryComplete() + } finally { + lock.unlock() + } + } else + false } /* @@ -196,7 +203,9 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri // operation is unnecessarily added for watch. However, this is a less severe issue since the // expire reaper will clean it up periodically. - var isCompletedByMe = operation.safeTryComplete() + // At this point the only thread that can attempt this operation is this current thread + // Hence it is safe to tryComplete() without a lock + var isCompletedByMe = operation.tryComplete() if (isCompletedByMe) return true @@ -213,7 +222,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri } } - isCompletedByMe = operation.safeTryComplete() + isCompletedByMe = operation.maybeTryComplete() if (isCompletedByMe) return true @@ -335,7 +344,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri if (curr.isCompleted) { // another thread has completed this operation, just remove it iter.remove() - } else if (curr.safeTryComplete()) { + } else if (curr.maybeTryComplete()) { iter.remove() completed += 1 } http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/server/DelayedProduce.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 0d452cc..ebbd9ee 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit import com.yammer.metrics.core.Meter import kafka.metrics.KafkaMetricsGroup import kafka.utils.Pool + import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse @@ -53,12 +54,9 @@ case class ProduceMetadata(produceRequiredAcks: Short, class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, - responseCallback: Map[TopicPartition, PartitionResponse] => Unit, - lockOpt: Option[Object] = None) + responseCallback: Map[TopicPartition, PartitionResponse] => Unit) extends DelayedOperation(delayMs) { - val lock = lockOpt.getOrElse(this) - // first update the acks pending variable according to the error code produceMetadata.produceStatus.foreach { case (topicPartition, status) => if (status.responseStatus.error == Errors.NONE) { @@ -72,11 +70,6 @@ class DelayedProduce(delayMs: Long, trace("Initial partition status for %s is %s".format(topicPartition, status)) } - override def safeTryComplete(): Boolean = lock synchronized { - tryComplete() - } - - /** * The delayed produce operation can be completed if every partition * it produces to is satisfied by one of the following: http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 3a4ecef..98a4be1 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -450,7 +450,6 @@ class ReplicaManager(val config: KafkaConfig, isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], responseCallback: Map[TopicPartition, PartitionResponse] => Unit, - delayedProduceLock: Option[Object] = None, processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) { if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds @@ -470,7 +469,7 @@ class ReplicaManager(val config: KafkaConfig, if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { // create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) - val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock) + val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback) // create a list of (topic, partition) pairs to use as keys for this delayed produce operation val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 85d72c3..3fed45d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -1367,7 +1367,6 @@ class GroupCoordinatorTest extends JUnitSuite { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject().asInstanceOf[Option[Object]], EasyMock.anyObject())).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> @@ -1451,7 +1450,6 @@ class GroupCoordinatorTest extends JUnitSuite { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject().asInstanceOf[Option[Object]], EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( @@ -1481,7 +1479,6 @@ class GroupCoordinatorTest extends JUnitSuite { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject().asInstanceOf[Option[Object]], EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 4a509ed..46a1878 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -1306,7 +1306,6 @@ class GroupMetadataManagerTest { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject().asInstanceOf[Option[Object]], EasyMock.anyObject()) ) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) @@ -1321,7 +1320,6 @@ class GroupMetadataManagerTest { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject().asInstanceOf[Option[Object]], EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index ed1636c..0a2b641 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -498,7 +498,6 @@ class TransactionStateManagerTest { EasyMock.eq(false), EasyMock.eq(recordsByPartition), EasyMock.capture(capturedArgument), - EasyMock.eq(None), EasyMock.anyObject() )).andAnswer(new IAnswer[Unit] { override def answer(): Unit = { @@ -599,7 +598,6 @@ class TransactionStateManagerTest { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject(), EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer(): Unit = capturedArgument.getValue.apply( http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index 82cf642..fdfb582 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -17,6 +17,11 @@ package kafka.server +import java.util.concurrent.{Executors, Future} +import java.util.concurrent.locks.ReentrantLock + +import kafka.utils.CoreUtils.inLock + import org.apache.kafka.common.utils.Time import org.junit.{After, Before, Test} import org.junit.Assert._ @@ -117,9 +122,83 @@ class DelayedOperationTest { assertEquals(Nil, cancelledOperations) } + @Test + def testDelayedOperationLock() { + val key = "key" + val executorService = Executors.newSingleThreadExecutor + try { + def createDelayedOperations(count: Int): Seq[MockDelayedOperation] = { + (1 to count).map { _ => + val op = new MockDelayedOperation(100000L) + purgatory.tryCompleteElseWatch(op, Seq(key)) + assertFalse("Not completable", op.isCompleted) + op + } + } + + def createCompletableOperations(count: Int): Seq[MockDelayedOperation] = { + (1 to count).map { _ => + val op = new MockDelayedOperation(100000L) + op.completable = true + op + } + } + + def runOnAnotherThread(fun: => Unit, shouldComplete: Boolean): Future[_] = { + val future = executorService.submit(new Runnable { + def run() = fun + }) + if (shouldComplete) + future.get() + else + assertFalse("Should not have completed", future.isDone) + future + } + + def checkAndComplete(completableOps: Seq[MockDelayedOperation], expectedComplete: Seq[MockDelayedOperation]): Unit = { + completableOps.foreach(op => op.completable = true) + val completed = purgatory.checkAndComplete(key) + assertEquals(expectedComplete.size, completed) + expectedComplete.foreach(op => assertTrue("Should have completed", op.isCompleted)) + val expectedNotComplete = completableOps.toSet -- expectedComplete + expectedNotComplete.foreach(op => assertFalse("Should not have completed", op.isCompleted)) + } + + // If locks are free all completable operations should complete + var ops = createDelayedOperations(2) + checkAndComplete(ops, ops) + + // Lock held by current thread, completable operations should complete + ops = createDelayedOperations(2) + inLock(ops(1).lock) { + checkAndComplete(ops, ops) + } + + // Lock held by another thread, should not block, only operations that can be + // locked without blocking on the current thread should complete + ops = createDelayedOperations(2) + runOnAnotherThread(ops(0).lock.lock(), true) + try { + checkAndComplete(ops, Seq(ops(1))) + } finally { + runOnAnotherThread(ops(0).lock.unlock(), true) + } + + // Immediately completable operations should complete without locking + ops = createCompletableOperations(2) + ops.foreach { op => + assertTrue("Should have completed", purgatory.tryCompleteElseWatch(op, Seq(key))) + assertTrue("Should have completed", op.isCompleted) + } + + } finally { + executorService.shutdown() + } + } - class MockDelayedOperation(delayMs: Long) extends DelayedOperation(delayMs) { + class MockDelayedOperation(delayMs: Long) + extends DelayedOperation(delayMs) { var completable = false def awaitExpiration() { http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 508bc35..76ae35b 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -180,7 +180,6 @@ class KafkaApisTest { EasyMock.eq(false), EasyMock.anyObject(), EasyMock.capture(responseCallback), - EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(new IAnswer[Unit] { override def answer(): Unit = { responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))) @@ -219,7 +218,6 @@ class KafkaApisTest { EasyMock.eq(false), EasyMock.anyObject(), EasyMock.capture(responseCallback), - EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(new IAnswer[Unit] { override def answer(): Unit = { responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))) @@ -250,7 +248,6 @@ class KafkaApisTest { EasyMock.eq(false), EasyMock.anyObject(), EasyMock.anyObject(), - EasyMock.anyObject(), EasyMock.anyObject())) EasyMock.replay(replicaManager)