[ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16709777#comment-16709777
 ] 

ASF GitHub Bot commented on KAFKA-7697:
---------------------------------------

rajinisivaram closed pull request #5999: KAFKA-7697: Process DelayedFetch 
without holding leaderIsrUpdateLock
URL: https://github.com/apache/kafka/pull/5999
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 745c89a393b..1f52bd769cf 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -740,8 +740,6 @@ class Partition(val topicPartition: TopicPartition,
           }
 
           val info = log.appendAsLeader(records, leaderEpoch = 
this.leaderEpoch, isFromClient)
-          // probably unblock some follower fetch requests since log end 
offset has been updated
-          
replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, 
this.partitionId))
           // we may need to increment high watermark since ISR could be down 
to 1
           (info, maybeIncrementLeaderHW(leaderReplica))
 
@@ -754,6 +752,10 @@ class Partition(val topicPartition: TopicPartition,
     // some delayed operations may be unblocked after HW changed
     if (leaderHWIncremented)
       tryCompleteDelayedRequests()
+    else {
+      // probably unblock some follower fetch requests since log end offset 
has been updated
+      replicaManager.tryCompleteDelayedFetch(new 
TopicPartitionOperationKey(topicPartition))
+    }
 
     info
   }
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 6e38ca9575b..cfaa147f407 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -19,14 +19,14 @@ package kafka.cluster
 import java.io.File
 import java.nio.ByteBuffer
 import java.util.{Optional, Properties}
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit, 
TimeoutException}
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.api.Request
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.log.{Defaults => _, _}
 import kafka.server._
-import kafka.utils.{MockScheduler, MockTime, TestUtils}
+import kafka.utils.{CoreUtils, MockScheduler, MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ReplicaNotAvailableException
@@ -39,7 +39,7 @@ import org.apache.kafka.common.requests.{IsolationLevel, 
LeaderAndIsrRequest, Li
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 import org.scalatest.Assertions.assertThrows
-import org.easymock.EasyMock
+import org.easymock.{Capture, EasyMock, IAnswer}
 
 import scala.collection.JavaConverters._
 
@@ -671,7 +671,95 @@ class PartitionTest {
     partition.updateReplicaLogReadResult(follower1Replica,
                                          
readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset), 
batch3), leaderReplica))
     assertEquals("ISR", Set[Integer](leader, follower1, follower2), 
partition.inSyncReplicas.map(_.brokerId))
- }
+  }
+
+  /**
+   * Verify that delayed fetch operations which are completed when records are 
appended don't result in deadlocks.
+   * Delayed fetch operations acquire Partition leaderIsrUpdate read lock for 
one or more partitions. So they
+   * need to be completed after releasing the lock acquired to append records. 
Otherwise, waiting writers
+   * (e.g. to check if ISR needs to be shrinked) can trigger deadlock in 
request handler threads waiting for
+   * read lock of one Partition while holding on to read lock of another 
Partition.
+   */
+  @Test
+  def testDelayedFetchAfterAppendRecords(): Unit = {
+    val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
+    val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
+    val controllerId = 0
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val replicaIds = List[Integer](brokerId, brokerId + 1).asJava
+    val isr = replicaIds
+    val logConfig = LogConfig(new Properties)
+
+    val topicPartitions = (0 until 5).map { i => new 
TopicPartition("test-topic", i) }
+    val logs = topicPartitions.map { tp => logManager.getOrCreateLog(tp, 
logConfig) }
+    val replicas = logs.map { log => new Replica(brokerId, log.topicPartition, 
time, log = Some(log)) }
+    val partitions = replicas.map { replica =>
+      val tp = replica.topicPartition
+      val partition = new Partition(tp,
+        isOffline = false,
+        replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+        localBrokerId = brokerId,
+        time,
+        replicaManager,
+        logManager,
+        zkClient)
+      partition.addReplicaIfNotExists(replica)
+      partition.makeLeader(controllerId, new 
LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
+        leaderEpoch, isr, 1, replicaIds, true), 0)
+      partition
+    }
+
+    // Acquire leaderIsrUpdate read lock of a different partition when 
completing delayed fetch
+    val tpKey: Capture[TopicPartitionOperationKey] = EasyMock.newCapture()
+    
EasyMock.expect(replicaManager.tryCompleteDelayedFetch(EasyMock.capture(tpKey)))
+      .andAnswer(new IAnswer[Unit] {
+        override def answer(): Unit = {
+          val anotherPartition = (tpKey.getValue.partition + 1) % 
topicPartitions.size
+          val partition = partitions(anotherPartition)
+          partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), 
fetchOnlyFromLeader = true)
+        }
+      }).anyTimes()
+    EasyMock.replay(replicaManager, zkClient)
+
+    def createRecords(baseOffset: Long): MemoryRecords = {
+      val records = List(
+        new SimpleRecord("k1".getBytes, "v1".getBytes),
+        new SimpleRecord("k2".getBytes, "v2".getBytes))
+      val buf = 
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
+      val builder = MemoryRecords.builder(
+        buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, 
TimestampType.CREATE_TIME,
+        baseOffset, time.milliseconds, 0)
+      records.foreach(builder.append)
+      builder.build()
+    }
+
+    val done = new AtomicBoolean()
+    val executor = Executors.newFixedThreadPool(topicPartitions.size + 1)
+    try {
+      // Invoke some operation that acquires leaderIsrUpdate write lock on one 
thread
+      executor.submit(CoreUtils.runnable {
+        while (!done.get) {
+          partitions.foreach(_.maybeShrinkIsr(10000))
+        }
+      })
+      // Append records to partitions, one partition-per-thread
+      val futures = partitions.map { partition =>
+        executor.submit(CoreUtils.runnable {
+          (1 to 10000).foreach { _ => 
partition.appendRecordsToLeader(createRecords(baseOffset = 0), isFromClient = 
true) }
+        })
+      }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+      done.set(true)
+    } catch {
+      case e: TimeoutException =>
+        val allThreads = TestUtils.allThreadStackTraces()
+        fail(s"Test timed out with exception $e, thread stack traces: 
$allThreads")
+    } finally {
+      executor.shutdownNow()
+      executor.awaitTermination(5, TimeUnit.SECONDS)
+    }
+  }
 
   def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, 
partitionLeaderEpoch: Int = 0): MemoryRecords = {
     val buf = 
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index bcb05816353..e5ea6a4baae 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -958,6 +958,12 @@ object TestUtils extends Logging {
     assertEquals(0, threadCount)
   }
 
+  def allThreadStackTraces(): String = {
+    Thread.getAllStackTraces.asScala.map { case (thread, stackTrace) =>
+      thread.getName + "\n\t" + 
stackTrace.toList.map(_.toString).mkString("\n\t")
+    }.mkString("\n")
+  }
+
   /**
    * Create new LogManager instance with default configuration for testing
    */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Possible deadlock in kafka.cluster.Partition
> --------------------------------------------
>
>                 Key: KAFKA-7697
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7697
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.1.0
>            Reporter: Gian Merlino
>            Assignee: Rajini Sivaram
>            Priority: Blocker
>             Fix For: 2.2.0, 2.1.1
>
>         Attachments: threaddump.txt
>
>
> After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up 
> within a few minutes (by "locked up" I mean that all request handler threads 
> were busy, and other brokers reported that they couldn't communicate with 
> it). I restarted it a few times and it did the same thing each time. After 
> downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from 
> the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads 
> trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.
> It jumps out that there are two threads that already have some read lock 
> (can't tell which one) and are trying to acquire a second one (on two 
> different read locks: 0x0000000708184b88 and 0x000000070821f188): 
> kafka-request-handler-1 and kafka-request-handler-4. Both are handling a 
> produce request, and in the process of doing so, are calling 
> Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the 
> same time, both of those locks have writers from other threads waiting on 
> them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks 
> appear to have writers that hold them (if only because no threads in the dump 
> are deep enough in inWriteLock to indicate that).
> ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over 
> readers. Is it possible that kafka-request-handler-1 and 
> kafka-request-handler-4 are each trying to read-lock the partition that is 
> currently locked by the other one, and they're both parked waiting for 
> kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they 
> never will, because the former two threads own read locks and aren't giving 
> them up?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to