[ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708017#comment-16708017
 ] 

ASF GitHub Bot commented on KAFKA-7697:
---------------------------------------

rajinisivaram closed pull request #5997: KAFKA-7697: Avoid blocking for 
leaderIsrUpdateLock in DelayedFetch
URL: https://github.com/apache/kafka/pull/5997
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 745c89a393b..a5655c77e2d 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -836,6 +836,20 @@ class Partition(val topicPartition: TopicPartition,
     localReplica.offsetSnapshot
   }
 
+  def maybeFetchOffsetSnapshot(currentLeaderEpoch: Optional[Integer],
+                          fetchOnlyFromLeader: Boolean): 
Option[LogOffsetSnapshot] = {
+    if (leaderIsrUpdateLock.readLock().tryLock()) {
+      try {
+        // decide whether to only fetch from leader
+        val localReplica = 
localReplicaWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
+        Some(localReplica.offsetSnapshot)
+      } finally {
+        leaderIsrUpdateLock.readLock().unlock()
+      }
+    } else
+      None
+  }
+
   def fetchOffsetSnapshotOrError(currentLeaderEpoch: Optional[Integer],
                                  fetchOnlyFromLeader: Boolean): 
Either[LogOffsetSnapshot, Errors] = {
     inReadLock(leaderIsrUpdateLock) {
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala 
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 90200991759..d6504e64de9 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -84,34 +84,35 @@ class DelayedFetch(delayMs: Long,
           if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
             val partition = 
replicaManager.getPartitionOrException(topicPartition,
               expectLeader = fetchMetadata.fetchOnlyLeader)
-            val offsetSnapshot = 
partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader)
+            partition.maybeFetchOffsetSnapshot(fetchLeaderEpoch, 
fetchMetadata.fetchOnlyLeader).foreach { offsetSnapshot =>
 
-            val endOffset = fetchMetadata.fetchIsolation match {
-              case FetchLogEnd => offsetSnapshot.logEndOffset
-              case FetchHighWatermark => offsetSnapshot.highWatermark
-              case FetchTxnCommitted => offsetSnapshot.lastStableOffset
-            }
+              val endOffset = fetchMetadata.fetchIsolation match {
+                case FetchLogEnd => offsetSnapshot.logEndOffset
+                case FetchHighWatermark => offsetSnapshot.highWatermark
+                case FetchTxnCommitted => offsetSnapshot.lastStableOffset
+              }
 
-            // Go directly to the check for Case D if the message offsets are 
the same. If the log segment
-            // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
-            // which would incorrectly be seen as an instance of Case C.
-            if (endOffset.messageOffset != fetchOffset.messageOffset) {
-              if (endOffset.onOlderSegment(fetchOffset)) {
-                // Case C, this can happen when the new fetch operation is on 
a truncated leader
-                debug(s"Satisfying fetch $fetchMetadata since it is fetching 
later segments of partition $topicPartition.")
-                return forceComplete()
-              } else if (fetchOffset.onOlderSegment(endOffset)) {
-                // Case C, this can happen when the fetch operation is falling 
behind the current segment
-                // or the partition has just rolled a new segment
-                debug(s"Satisfying fetch $fetchMetadata immediately since it 
is fetching older segments.")
-                // We will not force complete the fetch request if a replica 
should be throttled.
-                if (!replicaManager.shouldLeaderThrottle(quota, 
topicPartition, fetchMetadata.replicaId))
+              // Go directly to the check for Case D if the message offsets 
are the same. If the log segment
+              // has just rolled, then the high watermark offset will remain 
the same but be on the old segment,
+              // which would incorrectly be seen as an instance of Case C.
+              if (endOffset.messageOffset != fetchOffset.messageOffset) {
+                if (endOffset.onOlderSegment(fetchOffset)) {
+                  // Case C, this can happen when the new fetch operation is 
on a truncated leader
+                  debug(s"Satisfying fetch $fetchMetadata since it is fetching 
later segments of partition $topicPartition.")
                   return forceComplete()
-              } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
-                // we take the partition fetch size as upper bound when 
accumulating the bytes (skip if a throttled partition)
-                val bytesAvailable = 
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
-                if (!replicaManager.shouldLeaderThrottle(quota, 
topicPartition, fetchMetadata.replicaId))
-                  accumulatedSize += bytesAvailable
+                } else if (fetchOffset.onOlderSegment(endOffset)) {
+                  // Case C, this can happen when the fetch operation is 
falling behind the current segment
+                  // or the partition has just rolled a new segment
+                  debug(s"Satisfying fetch $fetchMetadata immediately since it 
is fetching older segments.")
+                  // We will not force complete the fetch request if a replica 
should be throttled.
+                  if (!replicaManager.shouldLeaderThrottle(quota, 
topicPartition, fetchMetadata.replicaId))
+                    return forceComplete()
+                } else if (fetchOffset.messageOffset < 
endOffset.messageOffset) {
+                  // we take the partition fetch size as upper bound when 
accumulating the bytes (skip if a throttled partition)
+                  val bytesAvailable = 
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
+                  if (!replicaManager.shouldLeaderThrottle(quota, 
topicPartition, fetchMetadata.replicaId))
+                    accumulatedSize += bytesAvailable
+                }
               }
             }
           }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Possible deadlock in kafka.cluster.Partition
> --------------------------------------------
>
>                 Key: KAFKA-7697
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7697
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.1.0
>            Reporter: Gian Merlino
>            Assignee: Rajini Sivaram
>            Priority: Blocker
>             Fix For: 2.2.0, 2.1.1
>
>         Attachments: threaddump.txt
>
>
> After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up 
> within a few minutes (by "locked up" I mean that all request handler threads 
> were busy, and other brokers reported that they couldn't communicate with 
> it). I restarted it a few times and it did the same thing each time. After 
> downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from 
> the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads 
> trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.
> It jumps out that there are two threads that already have some read lock 
> (can't tell which one) and are trying to acquire a second one (on two 
> different read locks: 0x0000000708184b88 and 0x000000070821f188): 
> kafka-request-handler-1 and kafka-request-handler-4. Both are handling a 
> produce request, and in the process of doing so, are calling 
> Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the 
> same time, both of those locks have writers from other threads waiting on 
> them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks 
> appear to have writers that hold them (if only because no threads in the dump 
> are deep enough in inWriteLock to indicate that).
> ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over 
> readers. Is it possible that kafka-request-handler-1 and 
> kafka-request-handler-4 are each trying to read-lock the partition that is 
> currently locked by the other one, and they're both parked waiting for 
> kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they 
> never will, because the former two threads own read locks and aren't giving 
> them up?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to