[ https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708017#comment-16708017 ]
ASF GitHub Bot commented on KAFKA-7697: --------------------------------------- rajinisivaram closed pull request #5997: KAFKA-7697: Avoid blocking for leaderIsrUpdateLock in DelayedFetch URL: https://github.com/apache/kafka/pull/5997 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 745c89a393b..a5655c77e2d 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -836,6 +836,20 @@ class Partition(val topicPartition: TopicPartition, localReplica.offsetSnapshot } + def maybeFetchOffsetSnapshot(currentLeaderEpoch: Optional[Integer], + fetchOnlyFromLeader: Boolean): Option[LogOffsetSnapshot] = { + if (leaderIsrUpdateLock.readLock().tryLock()) { + try { + // decide whether to only fetch from leader + val localReplica = localReplicaWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader) + Some(localReplica.offsetSnapshot) + } finally { + leaderIsrUpdateLock.readLock().unlock() + } + } else + None + } + def fetchOffsetSnapshotOrError(currentLeaderEpoch: Optional[Integer], fetchOnlyFromLeader: Boolean): Either[LogOffsetSnapshot, Errors] = { inReadLock(leaderIsrUpdateLock) { diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 90200991759..d6504e64de9 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -84,34 +84,35 @@ class DelayedFetch(delayMs: Long, if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { val partition = replicaManager.getPartitionOrException(topicPartition, expectLeader = fetchMetadata.fetchOnlyLeader) - val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader) + partition.maybeFetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader).foreach { offsetSnapshot => - val endOffset = fetchMetadata.fetchIsolation match { - case FetchLogEnd => offsetSnapshot.logEndOffset - case FetchHighWatermark => offsetSnapshot.highWatermark - case FetchTxnCommitted => offsetSnapshot.lastStableOffset - } + val endOffset = fetchMetadata.fetchIsolation match { + case FetchLogEnd => offsetSnapshot.logEndOffset + case FetchHighWatermark => offsetSnapshot.highWatermark + case FetchTxnCommitted => offsetSnapshot.lastStableOffset + } - // Go directly to the check for Case D if the message offsets are the same. If the log segment - // has just rolled, then the high watermark offset will remain the same but be on the old segment, - // which would incorrectly be seen as an instance of Case C. - if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { - // Case C, this can happen when the new fetch operation is on a truncated leader - debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.") - return forceComplete() - } else if (fetchOffset.onOlderSegment(endOffset)) { - // Case C, this can happen when the fetch operation is falling behind the current segment - // or the partition has just rolled a new segment - debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.") - // We will not force complete the fetch request if a replica should be throttled. - if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) + // Go directly to the check for Case D if the message offsets are the same. If the log segment + // has just rolled, then the high watermark offset will remain the same but be on the old segment, + // which would incorrectly be seen as an instance of Case C. + if (endOffset.messageOffset != fetchOffset.messageOffset) { + if (endOffset.onOlderSegment(fetchOffset)) { + // Case C, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.") return forceComplete() - } else if (fetchOffset.messageOffset < endOffset.messageOffset) { - // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) - val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) - if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) - accumulatedSize += bytesAvailable + } else if (fetchOffset.onOlderSegment(endOffset)) { + // Case C, this can happen when the fetch operation is falling behind the current segment + // or the partition has just rolled a new segment + debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.") + // We will not force complete the fetch request if a replica should be throttled. + if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) + return forceComplete() + } else if (fetchOffset.messageOffset < endOffset.messageOffset) { + // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) + val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) + if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) + accumulatedSize += bytesAvailable + } } } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Possible deadlock in kafka.cluster.Partition > -------------------------------------------- > > Key: KAFKA-7697 > URL: https://issues.apache.org/jira/browse/KAFKA-7697 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.1.0 > Reporter: Gian Merlino > Assignee: Rajini Sivaram > Priority: Blocker > Fix For: 2.2.0, 2.1.1 > > Attachments: threaddump.txt > > > After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up > within a few minutes (by "locked up" I mean that all request handler threads > were busy, and other brokers reported that they couldn't communicate with > it). I restarted it a few times and it did the same thing each time. After > downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from > the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads > trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition. > It jumps out that there are two threads that already have some read lock > (can't tell which one) and are trying to acquire a second one (on two > different read locks: 0x0000000708184b88 and 0x000000070821f188): > kafka-request-handler-1 and kafka-request-handler-4. Both are handling a > produce request, and in the process of doing so, are calling > Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the > same time, both of those locks have writers from other threads waiting on > them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks > appear to have writers that hold them (if only because no threads in the dump > are deep enough in inWriteLock to indicate that). > ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over > readers. Is it possible that kafka-request-handler-1 and > kafka-request-handler-4 are each trying to read-lock the partition that is > currently locked by the other one, and they're both parked waiting for > kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they > never will, because the former two threads own read locks and aren't giving > them up? -- This message was sent by Atlassian JIRA (v7.6.3#76005)