[ https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16709777#comment-16709777 ]
ASF GitHub Bot commented on KAFKA-7697: --------------------------------------- rajinisivaram closed pull request #5999: KAFKA-7697: Process DelayedFetch without holding leaderIsrUpdateLock URL: https://github.com/apache/kafka/pull/5999 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 745c89a393b..1f52bd769cf 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -740,8 +740,6 @@ class Partition(val topicPartition: TopicPartition, } val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient) - // probably unblock some follower fetch requests since log end offset has been updated - replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 (info, maybeIncrementLeaderHW(leaderReplica)) @@ -754,6 +752,10 @@ class Partition(val topicPartition: TopicPartition, // some delayed operations may be unblocked after HW changed if (leaderHWIncremented) tryCompleteDelayedRequests() + else { + // probably unblock some follower fetch requests since log end offset has been updated + replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(topicPartition)) + } info } diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 6e38ca9575b..cfaa147f407 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -19,14 +19,14 @@ package kafka.cluster import java.io.File import java.nio.ByteBuffer import java.util.{Optional, Properties} -import java.util.concurrent.CountDownLatch +import java.util.concurrent.{CountDownLatch, Executors, TimeUnit, TimeoutException} import java.util.concurrent.atomic.AtomicBoolean import kafka.api.Request import kafka.common.UnexpectedAppendOffsetException import kafka.log.{Defaults => _, _} import kafka.server._ -import kafka.utils.{MockScheduler, MockTime, TestUtils} +import kafka.utils.{CoreUtils, MockScheduler, MockTime, TestUtils} import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.ReplicaNotAvailableException @@ -39,7 +39,7 @@ import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest, Li import org.junit.{After, Before, Test} import org.junit.Assert._ import org.scalatest.Assertions.assertThrows -import org.easymock.EasyMock +import org.easymock.{Capture, EasyMock, IAnswer} import scala.collection.JavaConverters._ @@ -671,7 +671,95 @@ class PartitionTest { partition.updateReplicaLogReadResult(follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset), batch3), leaderReplica)) assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicas.map(_.brokerId)) - } + } + + /** + * Verify that delayed fetch operations which are completed when records are appended don't result in deadlocks. + * Delayed fetch operations acquire Partition leaderIsrUpdate read lock for one or more partitions. So they + * need to be completed after releasing the lock acquired to append records. Otherwise, waiting writers + * (e.g. to check if ISR needs to be shrinked) can trigger deadlock in request handler threads waiting for + * read lock of one Partition while holding on to read lock of another Partition. + */ + @Test + def testDelayedFetchAfterAppendRecords(): Unit = { + val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager]) + val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient]) + val controllerId = 0 + val controllerEpoch = 0 + val leaderEpoch = 5 + val replicaIds = List[Integer](brokerId, brokerId + 1).asJava + val isr = replicaIds + val logConfig = LogConfig(new Properties) + + val topicPartitions = (0 until 5).map { i => new TopicPartition("test-topic", i) } + val logs = topicPartitions.map { tp => logManager.getOrCreateLog(tp, logConfig) } + val replicas = logs.map { log => new Replica(brokerId, log.topicPartition, time, log = Some(log)) } + val partitions = replicas.map { replica => + val tp = replica.topicPartition + val partition = new Partition(tp, + isOffline = false, + replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, + localBrokerId = brokerId, + time, + replicaManager, + logManager, + zkClient) + partition.addReplicaIfNotExists(replica) + partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId, + leaderEpoch, isr, 1, replicaIds, true), 0) + partition + } + + // Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch + val tpKey: Capture[TopicPartitionOperationKey] = EasyMock.newCapture() + EasyMock.expect(replicaManager.tryCompleteDelayedFetch(EasyMock.capture(tpKey))) + .andAnswer(new IAnswer[Unit] { + override def answer(): Unit = { + val anotherPartition = (tpKey.getValue.partition + 1) % topicPartitions.size + val partition = partitions(anotherPartition) + partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), fetchOnlyFromLeader = true) + } + }).anyTimes() + EasyMock.replay(replicaManager, zkClient) + + def createRecords(baseOffset: Long): MemoryRecords = { + val records = List( + new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes)) + val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) + val builder = MemoryRecords.builder( + buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, + baseOffset, time.milliseconds, 0) + records.foreach(builder.append) + builder.build() + } + + val done = new AtomicBoolean() + val executor = Executors.newFixedThreadPool(topicPartitions.size + 1) + try { + // Invoke some operation that acquires leaderIsrUpdate write lock on one thread + executor.submit(CoreUtils.runnable { + while (!done.get) { + partitions.foreach(_.maybeShrinkIsr(10000)) + } + }) + // Append records to partitions, one partition-per-thread + val futures = partitions.map { partition => + executor.submit(CoreUtils.runnable { + (1 to 10000).foreach { _ => partition.appendRecordsToLeader(createRecords(baseOffset = 0), isFromClient = true) } + }) + } + futures.foreach(_.get(10, TimeUnit.SECONDS)) + done.set(true) + } catch { + case e: TimeoutException => + val allThreads = TestUtils.allThreadStackTraces() + fail(s"Test timed out with exception $e, thread stack traces: $allThreads") + } finally { + executor.shutdownNow() + executor.awaitTermination(5, TimeUnit.SECONDS) + } + } def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = { val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index bcb05816353..e5ea6a4baae 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -958,6 +958,12 @@ object TestUtils extends Logging { assertEquals(0, threadCount) } + def allThreadStackTraces(): String = { + Thread.getAllStackTraces.asScala.map { case (thread, stackTrace) => + thread.getName + "\n\t" + stackTrace.toList.map(_.toString).mkString("\n\t") + }.mkString("\n") + } + /** * Create new LogManager instance with default configuration for testing */ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Possible deadlock in kafka.cluster.Partition > -------------------------------------------- > > Key: KAFKA-7697 > URL: https://issues.apache.org/jira/browse/KAFKA-7697 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.1.0 > Reporter: Gian Merlino > Assignee: Rajini Sivaram > Priority: Blocker > Fix For: 2.2.0, 2.1.1 > > Attachments: threaddump.txt > > > After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up > within a few minutes (by "locked up" I mean that all request handler threads > were busy, and other brokers reported that they couldn't communicate with > it). I restarted it a few times and it did the same thing each time. After > downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from > the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads > trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition. > It jumps out that there are two threads that already have some read lock > (can't tell which one) and are trying to acquire a second one (on two > different read locks: 0x0000000708184b88 and 0x000000070821f188): > kafka-request-handler-1 and kafka-request-handler-4. Both are handling a > produce request, and in the process of doing so, are calling > Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the > same time, both of those locks have writers from other threads waiting on > them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks > appear to have writers that hold them (if only because no threads in the dump > are deep enough in inWriteLock to indicate that). > ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over > readers. Is it possible that kafka-request-handler-1 and > kafka-request-handler-4 are each trying to read-lock the partition that is > currently locked by the other one, and they're both parked waiting for > kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they > never will, because the former two threads own read locks and aren't giving > them up? -- This message was sent by Atlassian JIRA (v7.6.3#76005)