Repository: kafka Updated Branches: refs/heads/trunk 9be71f7bd -> efefb452d
KAFKA-6042: Avoid deadlock between two groups with delayed operations Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wangg...@gmail.com> Closes #4103 from rajinisivaram/KAFKA-6042-group-deadlock (cherry picked from commit 5ee157126d595b913761cf1887963460bbe12855) Signed-off-by: Guozhang Wang <wangg...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/efefb452 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/efefb452 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/efefb452 Branch: refs/heads/trunk Commit: efefb452df0d60ae427c708cc7da312217bbe647 Parents: 9be71f7 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Sat Oct 21 20:17:58 2017 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Sat Oct 21 20:18:20 2017 -0700 ---------------------------------------------------------------------- .../coordinator/group/DelayedHeartbeat.scala | 2 +- .../kafka/coordinator/group/DelayedJoin.scala | 4 +- .../coordinator/group/GroupCoordinator.scala | 34 +++++++------ .../kafka/coordinator/group/GroupMetadata.scala | 7 ++- .../group/GroupMetadataManager.scala | 17 ++++--- .../transaction/DelayedTxnMarker.scala | 8 +-- .../transaction/TransactionCoordinator.scala | 8 +-- .../TransactionMarkerChannelManager.scala | 4 +- ...nsactionMarkerRequestCompletionHandler.scala | 2 +- .../transaction/TransactionMetadata.scala | 8 ++- .../transaction/TransactionStateManager.scala | 38 ++++++++++----- .../scala/kafka/server/DelayedOperation.scala | 7 +-- .../scala/kafka/server/DelayedProduce.scala | 6 ++- .../scala/kafka/server/ReplicaManager.scala | 4 +- .../group/GroupCoordinatorTest.scala | 4 ++ .../group/GroupMetadataManagerTest.scala | 3 ++ .../TransactionMarkerChannelManagerTest.scala | 7 ++- .../TransactionStateManagerTest.scala | 3 ++ .../kafka/server/DelayedOperationTest.scala | 51 ++++++++++++++++++-- .../scala/unit/kafka/server/KafkaApisTest.scala | 3 ++ 20 files changed, 161 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala index 73d5d0f..5f16acb 100644 --- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala +++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala @@ -28,7 +28,7 @@ private[group] class DelayedHeartbeat(coordinator: GroupCoordinator, member: MemberMetadata, heartbeatDeadline: Long, sessionTimeout: Long) - extends DelayedOperation(sessionTimeout) { + extends DelayedOperation(sessionTimeout, Some(group.lock)) { override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _) override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala index 5232287..c75c0d4 100644 --- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala +++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala @@ -33,7 +33,7 @@ import scala.math.{max, min} */ private[group] class DelayedJoin(coordinator: GroupCoordinator, group: GroupMetadata, - rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout) { + rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) { override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _) override def onExpiration() = coordinator.onExpireJoin() @@ -58,7 +58,7 @@ private[group] class InitialDelayedJoin(coordinator: GroupCoordinator, override def tryComplete(): Boolean = false override def onComplete(): Unit = { - group synchronized { + group.inLock { if (group.newMemberAdded && remainingMs != 0) { group.newMemberAdded = false val delay = min(configuredRebalanceDelay, remainingMs) http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index dd4d52d..ed13a08 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -40,6 +40,12 @@ import scala.math.max * * Each Kafka server instantiates a coordinator which is responsible for a set of * groups. Groups are assigned to coordinators based on their group names. + * <p> + * <b>Delayed operation locking notes:</b> + * Delayed operations in GroupCoordinator use `group` as the delayed operation + * lock. ReplicaManager.appendRecords may be invoked while holding the group lock + * used by its callback. The delayed callback may acquire the group lock + * since the delayed operation is completed only if the group lock can be acquired. */ class GroupCoordinator(val brokerId: Int, val groupConfig: GroupConfig, @@ -142,7 +148,7 @@ class GroupCoordinator(val brokerId: Int, protocolType: String, protocols: List[(String, Array[Byte])], responseCallback: JoinCallback) { - group synchronized { + group.inLock { if (!group.is(Empty) && (!group.protocolType.contains(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) { // if the new member does not support the group protocol, reject it responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL)) @@ -248,7 +254,7 @@ class GroupCoordinator(val brokerId: Int, memberId: String, groupAssignment: Map[String, Array[Byte]], responseCallback: SyncCallback) { - group synchronized { + group.inLock { if (!group.has(memberId)) { responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID) } else if (generationId != group.generationId) { @@ -273,7 +279,7 @@ class GroupCoordinator(val brokerId: Int, val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap groupManager.storeGroup(group, assignment, (error: Errors) => { - group synchronized { + group.inLock { // another member may have joined the group while we were awaiting this callback, // so we must ensure we are still in the CompletingRebalance state and the same generation // when it gets invoked. if we have transitioned to another state, then do nothing @@ -317,7 +323,7 @@ class GroupCoordinator(val brokerId: Int, responseCallback(Errors.UNKNOWN_MEMBER_ID) case Some(group) => - group synchronized { + group.inLock { if (group.is(Dead) || !group.has(memberId)) { responseCallback(Errors.UNKNOWN_MEMBER_ID) } else { @@ -349,7 +355,7 @@ class GroupCoordinator(val brokerId: Int, responseCallback(Errors.UNKNOWN_MEMBER_ID) case Some(group) => - group synchronized { + group.inLock { group.currentState match { case Dead => // if the group is marked as dead, it means some other thread has just removed the group @@ -449,7 +455,7 @@ class GroupCoordinator(val brokerId: Int, producerEpoch: Short, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Errors] => Unit) { - group synchronized { + group.inLock { if (group.is(Dead)) { responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID)) } else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) { @@ -506,7 +512,7 @@ class GroupCoordinator(val brokerId: Int, groupManager.getGroup(groupId) match { case None => (Errors.NONE, GroupCoordinator.DeadGroup) case Some(group) => - group synchronized { + group.inLock { (Errors.NONE, group.summary) } } @@ -529,7 +535,7 @@ class GroupCoordinator(val brokerId: Int, } private def onGroupUnloaded(group: GroupMetadata) { - group synchronized { + group.inLock { info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}") val previousState = group.currentState group.transitionTo(Dead) @@ -558,7 +564,7 @@ class GroupCoordinator(val brokerId: Int, } private def onGroupLoaded(group: GroupMetadata) { - group synchronized { + group.inLock { info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}") assert(group.is(Stable) || group.is(Empty)) group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _)) @@ -666,7 +672,7 @@ class GroupCoordinator(val brokerId: Int, } private def maybePrepareRebalance(group: GroupMetadata) { - group synchronized { + group.inLock { if (group.canRebalance) prepareRebalance(group) } @@ -706,7 +712,7 @@ class GroupCoordinator(val brokerId: Int, } def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = { - group synchronized { + group.inLock { if (group.notYetRejoinedMembers.isEmpty) forceComplete() else false @@ -718,7 +724,7 @@ class GroupCoordinator(val brokerId: Int, } def onCompleteJoin(group: GroupMetadata) { - group synchronized { + group.inLock { // remove any members who haven't joined the group yet group.notYetRejoinedMembers.foreach { failedMember => group.remove(failedMember.memberId) @@ -768,7 +774,7 @@ class GroupCoordinator(val brokerId: Int, } def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = { - group synchronized { + group.inLock { if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving) forceComplete() else false @@ -776,7 +782,7 @@ class GroupCoordinator(val brokerId: Int, } def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) { - group synchronized { + group.inLock { if (!shouldKeepMemberAlive(member, heartbeatDeadline)) { info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group") removeMemberAndUpdateGroup(group, member) http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index c4e071d..9019461 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -17,9 +17,10 @@ package kafka.coordinator.group import java.util.UUID +import java.util.concurrent.locks.ReentrantLock import kafka.common.OffsetAndMetadata -import kafka.utils.{Logging, nonthreadsafe} +import kafka.utils.{CoreUtils, Logging, nonthreadsafe} import org.apache.kafka.common.TopicPartition import scala.collection.{Seq, immutable, mutable} @@ -154,6 +155,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState private var state: GroupState = initialState + private[group] val lock = new ReentrantLock + private val members = new mutable.HashMap[String, MemberMetadata] private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset] @@ -172,6 +175,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState var protocol: String = null var newMemberAdded: Boolean = false + def inLock[T](fun: => T): T = CoreUtils.inLock(lock)(fun) + def is(groupState: GroupState) = state == groupState def not(groupState: GroupState) = state != groupState def has(memberId: String) = members.contains(memberId) http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 8ef4894..67a048d 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -90,7 +90,7 @@ class GroupMetadataManager(brokerId: Int, recreateGauge("NumOffsets", new Gauge[Int] { def value = groupMetadataCache.values.map(group => { - group synchronized { group.numOffsets } + group.inLock { group.numOffsets } }).sum }) @@ -281,6 +281,7 @@ class GroupMetadataManager(brokerId: Int, internalTopicsAllowed = true, isFromClient = false, entriesPerPartition = records, + delayedProduceLock = Some(group.lock), responseCallback = callback) } @@ -298,7 +299,7 @@ class GroupMetadataManager(brokerId: Int, validateOffsetMetadataLength(offsetAndMetadata.metadata) } - group synchronized { + group.inLock { if (!group.hasReceivedConsistentOffsetCommits) warn(s"group: ${group.groupId} with leader: ${group.leaderId} has received offset commits from consumers as well " + s"as transactional producers. Mixing both types of offset commits will generally result in surprises and " + @@ -347,7 +348,7 @@ class GroupMetadataManager(brokerId: Int, // the offset and metadata to cache if the append status has no error val status = responseStatus(offsetTopicPartition) - val responseError = group synchronized { + val responseError = group.inLock { if (status.error == Errors.NONE) { if (!group.is(Dead)) { filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) => @@ -407,12 +408,12 @@ class GroupMetadataManager(brokerId: Int, } if (isTxnOffsetCommit) { - group synchronized { + group.inLock { addProducerGroup(producerId, group.groupId) group.prepareTxnOffsetCommit(producerId, offsetMetadata) } } else { - group synchronized { + group.inLock { group.prepareOffsetCommit(offsetMetadata) } } @@ -441,7 +442,7 @@ class GroupMetadataManager(brokerId: Int, (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE)) }.toMap } else { - group synchronized { + group.inLock { if (group.is(Dead)) { topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE)) @@ -713,7 +714,7 @@ class GroupMetadataManager(brokerId: Int, var offsetsRemoved = 0 groupMetadataCache.foreach { case (groupId, group) => - val (removedOffsets, groupIsDead, generation) = group synchronized { + val (removedOffsets, groupIsDead, generation) = group.inLock { val removedOffsets = deletedTopicPartitions match { case Some(topicPartitions) => group.removeOffsets(topicPartitions) case None => group.removeExpiredOffsets(startMs) @@ -785,7 +786,7 @@ class GroupMetadataManager(brokerId: Int, val pendingGroups = groupsBelongingToPartitions(producerId, completedPartitions) pendingGroups.foreach { case (groupId) => getGroup(groupId) match { - case Some(group) => group synchronized { + case Some(group) => group.inLock { if (!group.is(Dead)) { group.completePendingTxnOffsetCommit(producerId, isCommit) removeProducerGroup(producerId, groupId) http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala index bc0f1b7..cf18b81 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala @@ -17,6 +17,7 @@ package kafka.coordinator.transaction import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Lock import kafka.server.DelayedOperation import org.apache.kafka.common.protocol.Errors @@ -25,11 +26,12 @@ import org.apache.kafka.common.protocol.Errors * Delayed transaction state change operations that are added to the purgatory without timeout (i.e. these operations should never time out) */ private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata, - completionCallback: Errors => Unit) - extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365)) { + completionCallback: Errors => Unit, + lock: Lock) + extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365), Some(lock)) { override def tryComplete(): Boolean = { - txnMetadata synchronized { + txnMetadata.inLock { if (txnMetadata.topicPartitions.isEmpty) forceComplete() else false http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 6b9b7ef..0b38dbc 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -137,7 +137,7 @@ class TransactionCoordinator(brokerId: Int, val coordinatorEpoch = existingEpochAndMetadata.coordinatorEpoch val txnMetadata = existingEpochAndMetadata.transactionMetadata - txnMetadata synchronized { + txnMetadata.inLock { prepareInitProduceIdTransit(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata) } } @@ -241,7 +241,7 @@ class TransactionCoordinator(brokerId: Int, val txnMetadata = epochAndMetadata.transactionMetadata // generate the new transaction metadata with added partitions - txnMetadata synchronized { + txnMetadata.inLock { if (txnMetadata.producerId != producerId) { Left(Errors.INVALID_PRODUCER_ID_MAPPING) } else if (txnMetadata.producerEpoch != producerEpoch) { @@ -304,7 +304,7 @@ class TransactionCoordinator(brokerId: Int, val txnMetadata = epochAndTxnMetadata.transactionMetadata val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch - txnMetadata synchronized { + txnMetadata.inLock { if (txnMetadata.producerId != producerId) Left(Errors.INVALID_PRODUCER_ID_MAPPING) else if (txnMetadata.producerEpoch != producerEpoch) @@ -368,7 +368,7 @@ class TransactionCoordinator(brokerId: Int, case Some(epochAndMetadata) => if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) { val txnMetadata = epochAndMetadata.transactionMetadata - txnMetadata synchronized { + txnMetadata.inLock { if (txnMetadata.producerId != producerId) Left(Errors.INVALID_PRODUCER_ID_MAPPING) else if (txnMetadata.producerEpoch != producerEpoch) http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index 6c13de4..4fc9db2 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -263,7 +263,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig, } } - val delayedTxnMarker = new DelayedTxnMarker(txnMetadata, appendToLogCallback) + val delayedTxnMarker = new DelayedTxnMarker(txnMetadata, appendToLogCallback, txnStateManager.stateReadLock) txnMarkerPurgatory.tryCompleteElseWatch(delayedTxnMarker, Seq(transactionalId)) addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId, txnMetadata.producerEpoch, txnResult, coordinatorEpoch, txnMetadata.topicPartitions.toSet) @@ -340,7 +340,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig, val txnMetadata = epochAndMetadata.transactionMetadata - txnMetadata synchronized { + txnMetadata.inLock { topicPartitions.foreach(txnMetadata.removePartition) } http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala index bfa25be..fefe767 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala @@ -129,7 +129,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, txnMarkerChannelManager.removeMarkersForTxnId(transactionalId) abortSending = true } else { - txnMetadata synchronized { + txnMetadata.inLock { for ((topicPartition, error) <- errors.asScala) { error match { case Errors.NONE => http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala index 405e639..486a887 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala @@ -16,7 +16,9 @@ */ package kafka.coordinator.transaction -import kafka.utils.{Logging, nonthreadsafe} +import java.util.concurrent.locks.ReentrantLock + +import kafka.utils.{CoreUtils, Logging, nonthreadsafe} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.RecordBatch @@ -156,6 +158,10 @@ private[transaction] class TransactionMetadata(val transactionalId: String, // initialized as the same as the current state var pendingState: Option[TransactionState] = None + private[transaction] val lock = new ReentrantLock + + def inLock[T](fun: => T): T = CoreUtils.inLock(lock)(fun) + def addPartitions(partitions: collection.Set[TopicPartition]): Unit = { topicPartitions ++= partitions } http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index ad5d33b..f2e25c4 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -56,6 +56,16 @@ object TransactionStateManager { * 1. the transaction log, which is a special internal topic. * 2. the transaction metadata including its ongoing transaction status. * 3. the background expiration of the transaction as well as the transactional id. + * + * <b>Delayed operation locking notes:</b> + * Delayed operations in TransactionStateManager use `stateLock.readLock` as the delayed operation + * lock. Delayed operations are completed only if `stateLock.readLock` can be acquired. + * Delayed callbacks may acquire `stateLock.readLock` or any of the `txnMetadata` locks. + * <ul> + * <li>`stateLock.readLock` must never be acquired while holding `txnMetadata` lock.</li> + * <li>`txnMetadata` lock must never be acquired while holding `stateLock.writeLock`.</li> + * <li>`ReplicaManager.appendRecords` should never be invoked while holding a `txnMetadata` lock.</li> + * </ul> */ class TransactionStateManager(brokerId: Int, zkUtils: ZkUtils, @@ -95,6 +105,7 @@ class TransactionStateManager(brokerId: Int, loadingPartitions.add(partitionAndLeaderEpoch) } } + private[transaction] def stateReadLock = stateLock.readLock // this is best-effort expiration of an ongoing transaction which has been open for more than its // txn timeout value, we do not need to grab the lock on the metadata object upon checking its state @@ -136,7 +147,7 @@ class TransactionStateManager(brokerId: Int, }.filter { case (_, txnMetadata) => txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs }.map { case (transactionalId, txnMetadata) => - val txnMetadataTransition = txnMetadata synchronized { + val txnMetadataTransition = txnMetadata.inLock { txnMetadata.prepareDead() } TransactionalIdCoordinatorEpochAndMetadata(transactionalId, entry.coordinatorEpoch, txnMetadataTransition) @@ -166,7 +177,7 @@ class TransactionStateManager(brokerId: Int, .foreach { txnMetadataCacheEntry => toRemove.foreach { idCoordinatorEpochAndMetadata => val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(idCoordinatorEpochAndMetadata.transactionalId) - txnMetadata synchronized { + txnMetadata.inLock { if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch && txnMetadata.pendingState.contains(Dead) && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch @@ -196,7 +207,8 @@ class TransactionStateManager(brokerId: Int, internalTopicsAllowed = true, isFromClient = false, recordsPerPartition, - removeFromCacheCallback + removeFromCacheCallback, + Some(stateLock.readLock) ) } @@ -376,7 +388,7 @@ class TransactionStateManager(brokerId: Int, val transactionsPendingForCompletion = new mutable.ListBuffer[TransactionalIdCoordinatorEpochAndTransitMetadata] loadedTransactions.foreach { case (transactionalId, txnMetadata) => - txnMetadata synchronized { + txnMetadata.inLock { // if state is PrepareCommit or PrepareAbort we need to complete the transaction txnMetadata.state match { case PrepareAbort => @@ -509,7 +521,7 @@ class TransactionStateManager(brokerId: Int, case Right(Some(epochAndMetadata)) => val metadata = epochAndMetadata.transactionMetadata - metadata synchronized { + metadata.inLock { if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) { // the cache may have been changed due to txn topic partition emigration and immigration, // in this case directly return NOT_COORDINATOR to client and let it to re-discover the transaction coordinator @@ -536,7 +548,7 @@ class TransactionStateManager(brokerId: Int, getTransactionState(transactionalId) match { case Right(Some(epochAndTxnMetadata)) => val metadata = epochAndTxnMetadata.transactionMetadata - metadata synchronized { + metadata.inLock { if (epochAndTxnMetadata.coordinatorEpoch == coordinatorEpoch) { if (retryOnError(responseError)) { info(s"TransactionalId ${metadata.transactionalId} append transaction log for $newMetadata transition failed due to $responseError, " + @@ -586,24 +598,28 @@ class TransactionStateManager(brokerId: Int, case Right(Some(epochAndMetadata)) => val metadata = epochAndMetadata.transactionMetadata - metadata synchronized { + val append: Boolean = metadata.inLock { if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) { // the coordinator epoch has changed, reply to client immediately with with NOT_COORDINATOR responseCallback(Errors.NOT_COORDINATOR) + false } else { // do not need to check the metadata object itself since no concurrent thread should be able to modify it // under the same coordinator epoch, so directly append to txn log now - - replicaManager.appendRecords( + true + } + } + if (append) { + replicaManager.appendRecords( newMetadata.txnTimeoutMs.toLong, TransactionLog.EnforcedRequiredAcks, internalTopicsAllowed = true, isFromClient = false, recordsPerPartition, - updateCacheCallback) + updateCacheCallback, + delayedProduceLock = Some(stateLock.readLock)) trace(s"Appending new metadata $newMetadata for transaction id $transactionalId with coordinator epoch $coordinatorEpoch to the local transaction log") - } } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/main/scala/kafka/server/DelayedOperation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 86bf1ff..894d30e 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -19,7 +19,7 @@ package kafka.server import java.util.concurrent._ import java.util.concurrent.atomic._ -import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock} +import java.util.concurrent.locks.{Lock, ReentrantLock, ReentrantReadWriteLock} import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup @@ -43,11 +43,12 @@ import scala.collection.mutable.ListBuffer * * A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete(). */ -abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging { +abstract class DelayedOperation(override val delayMs: Long, + lockOpt: Option[Lock] = None) extends TimerTask with Logging { private val completed = new AtomicBoolean(false) // Visible for testing - private[server] val lock: ReentrantLock = new ReentrantLock + private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock) /* * Force completing the delayed operation, if not already completed. http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/main/scala/kafka/server/DelayedProduce.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index ebbd9ee..718ed24 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -19,6 +19,7 @@ package kafka.server import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Lock import com.yammer.metrics.core.Meter import kafka.metrics.KafkaMetricsGroup @@ -54,8 +55,9 @@ case class ProduceMetadata(produceRequiredAcks: Short, class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, - responseCallback: Map[TopicPartition, PartitionResponse] => Unit) - extends DelayedOperation(delayMs) { + responseCallback: Map[TopicPartition, PartitionResponse] => Unit, + lockOpt: Option[Lock] = None) + extends DelayedOperation(delayMs, lockOpt) { // first update the acks pending variable according to the error code produceMetadata.produceStatus.foreach { case (topicPartition, status) => http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 625b352..eee442a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -19,6 +19,7 @@ package kafka.server import java.io.File import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import java.util.concurrent.locks.Lock import com.yammer.metrics.core.Gauge import kafka.api._ @@ -462,6 +463,7 @@ class ReplicaManager(val config: KafkaConfig, isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], responseCallback: Map[TopicPartition, PartitionResponse] => Unit, + delayedProduceLock: Option[Lock] = None, processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) { if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds @@ -481,7 +483,7 @@ class ReplicaManager(val config: KafkaConfig, if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { // create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) - val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback) + val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock) // create a list of (topic, partition) pairs to use as keys for this delayed produce operation val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index c9f2ec6..22efb33 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -28,6 +28,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse, TransactionResult} import org.easymock.{Capture, EasyMock, IAnswer} import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock import org.apache.kafka.common.internals.Topic import org.junit.Assert._ @@ -1367,6 +1368,7 @@ class GroupCoordinatorTest extends JUnitSuite { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), + EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]], EasyMock.anyObject())).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> @@ -1450,6 +1452,7 @@ class GroupCoordinatorTest extends JUnitSuite { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), + EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]], EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( @@ -1479,6 +1482,7 @@ class GroupCoordinatorTest extends JUnitSuite { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), + EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]], EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index ceee8b2..b437405 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -40,6 +40,7 @@ import org.apache.kafka.common.internals.Topic import scala.collection.JavaConverters._ import scala.collection._ +import java.util.concurrent.locks.ReentrantLock class GroupMetadataManagerTest { @@ -1308,6 +1309,7 @@ class GroupMetadataManagerTest { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), + EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]], EasyMock.anyObject()) ) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) @@ -1322,6 +1324,7 @@ class GroupMetadataManagerTest { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), + EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]], EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index 0bc1c9f..a039c53 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -16,6 +16,8 @@ */ package kafka.coordinator.transaction +import java.util.concurrent.locks.ReentrantReadWriteLock + import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache} import kafka.utils.timer.MockTimer import kafka.utils.TestUtils @@ -86,7 +88,10 @@ class TransactionMarkerChannelManagerTest { EasyMock.expect(txnStateManager.getTransactionState(EasyMock.eq(transactionalId2))) .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2)))) .anyTimes() - + val stateLock = new ReentrantReadWriteLock + EasyMock.expect(txnStateManager.stateReadLock) + .andReturn(stateLock.readLock) + .anyTimes() } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 0a2b641..7973b9a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -17,6 +17,7 @@ package kafka.coordinator.transaction import java.nio.ByteBuffer +import java.util.concurrent.locks.ReentrantLock import kafka.log.Log import kafka.server.{FetchDataInfo, LogOffsetMetadata, ReplicaManager} @@ -498,6 +499,7 @@ class TransactionStateManagerTest { EasyMock.eq(false), EasyMock.eq(recordsByPartition), EasyMock.capture(capturedArgument), + EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]], EasyMock.anyObject() )).andAnswer(new IAnswer[Unit] { override def answer(): Unit = { @@ -598,6 +600,7 @@ class TransactionStateManagerTest { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), + EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]], EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer(): Unit = capturedArgument.getValue.apply( http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index 5b1daff..d4d79e5 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -18,6 +18,7 @@ package kafka.server import java.util.concurrent.{Executors, Future} +import java.util.concurrent.locks.ReentrantLock import kafka.utils.CoreUtils.inLock @@ -123,12 +124,28 @@ class DelayedOperationTest { @Test def testDelayedOperationLock() { + verifyDelayedOperationLock(new MockDelayedOperation(100000L), mismatchedLocks = false) + } + + @Test + def testDelayedOperationLockOverride() { + def newMockOperation = { + val lock = new ReentrantLock + new MockDelayedOperation(100000L, Some(lock), Some(lock)) + } + verifyDelayedOperationLock(newMockOperation, mismatchedLocks = false) + + verifyDelayedOperationLock(new MockDelayedOperation(100000L, None, Some(new ReentrantLock)), + mismatchedLocks = true) + } + + def verifyDelayedOperationLock(mockDelayedOperation: => MockDelayedOperation, mismatchedLocks: Boolean) { val key = "key" val executorService = Executors.newSingleThreadExecutor try { def createDelayedOperations(count: Int): Seq[MockDelayedOperation] = { (1 to count).map { _ => - val op = new MockDelayedOperation(100000L) + val op = mockDelayedOperation purgatory.tryCompleteElseWatch(op, Seq(key)) assertFalse("Not completable", op.isCompleted) op @@ -137,7 +154,7 @@ class DelayedOperationTest { def createCompletableOperations(count: Int): Seq[MockDelayedOperation] = { (1 to count).map { _ => - val op = new MockDelayedOperation(100000L) + val op = mockDelayedOperation op.completable = true op } @@ -181,6 +198,27 @@ class DelayedOperationTest { checkAndComplete(ops, Seq(ops(1))) } finally { runOnAnotherThread(ops(0).lock.unlock(), true) + checkAndComplete(Seq(ops(0)), Seq(ops(0))) + } + + // Lock acquired by response callback held by another thread, should not block + // if the response lock is used as operation lock, only operations + // that can be locked without blocking on the current thread should complete + ops = createDelayedOperations(2) + ops(0).responseLockOpt.foreach { lock => + runOnAnotherThread(lock.lock(), true) + try { + try { + checkAndComplete(ops, Seq(ops(1))) + assertFalse("Should have failed with mismatched locks", mismatchedLocks) + } catch { + case e: IllegalStateException => + assertTrue("Should not have failed with valid locks", mismatchedLocks) + } + } finally { + runOnAnotherThread(lock.unlock(), true) + checkAndComplete(Seq(ops(0)), Seq(ops(0))) + } } // Immediately completable operations should complete without locking @@ -196,8 +234,9 @@ class DelayedOperationTest { } - class MockDelayedOperation(delayMs: Long) - extends DelayedOperation(delayMs) { + class MockDelayedOperation(delayMs: Long, + lockOpt: Option[ReentrantLock] = None, + val responseLockOpt: Option[ReentrantLock] = None) extends DelayedOperation(delayMs, lockOpt) { var completable = false def awaitExpiration() { @@ -218,6 +257,10 @@ class DelayedOperationTest { } override def onComplete() { + responseLockOpt.foreach { lock => + if (!lock.tryLock()) + throw new IllegalStateException("Response callback lock could not be acquired in callback") + } synchronized { notify() } http://git-wip-us.apache.org/repos/asf/kafka/blob/efefb452/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index a2a831a..a86c160 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -180,6 +180,7 @@ class KafkaApisTest { EasyMock.eq(false), EasyMock.anyObject(), EasyMock.capture(responseCallback), + EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(new IAnswer[Unit] { override def answer(): Unit = { responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))) @@ -218,6 +219,7 @@ class KafkaApisTest { EasyMock.eq(false), EasyMock.anyObject(), EasyMock.capture(responseCallback), + EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(new IAnswer[Unit] { override def answer(): Unit = { responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))) @@ -248,6 +250,7 @@ class KafkaApisTest { EasyMock.eq(false), EasyMock.anyObject(), EasyMock.anyObject(), + EasyMock.anyObject(), EasyMock.anyObject())) EasyMock.replay(replicaManager)