junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r484090694



##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -231,26 +214,27 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
 
     // At this point the only thread that can attempt this operation is this 
current thread
     // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
+    if (operation.tryComplete()) return true
+
+    // There is a potential deadlock if we don't hold the lock while adding 
the operation to watch list and do the
+    // final tryComplete() check. For example,
+    // 1) thread_a holds lock_a
+    // 2) thread_a is executing tryCompleteElseWatch
+    // 3) thread_a adds the op to watch list
+    // 4) thread_b holds lock of op to complete op
+    // 5) thread_b calls op's onComplete which requiring lock_a
+    // 6) thread_a requires lock of op to call safeTryComplete
+    if (inLock(operation.lock) {
+      var watchCreated = false
+      watchKeys.foreach { key =>
+        watchForOperation(key, operation)
+        if (!watchCreated) {
+          watchCreated = true
+          estimatedTotalOperations.incrementAndGet()
+        }
       }
-    }
-
-    isCompletedByMe = operation.maybeTryComplete()
-    if (isCompletedByMe)
-      return true
+      operation.tryComplete()

Review comment:
       We should return if `tryComplete()` returns true.

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -231,26 +214,27 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
 
     // At this point the only thread that can attempt this operation is this 
current thread
     // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
+    if (operation.tryComplete()) return true
+
+    // There is a potential deadlock if we don't hold the lock while adding 
the operation to watch list and do the
+    // final tryComplete() check. For example,
+    // 1) thread_a holds lock_a
+    // 2) thread_a is executing tryCompleteElseWatch
+    // 3) thread_a adds the op to watch list
+    // 4) thread_b holds lock of op to complete op
+    // 5) thread_b calls op's onComplete which requiring lock_a

Review comment:
       requiring => requires

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -231,26 +214,27 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
 
     // At this point the only thread that can attempt this operation is this 
current thread
     // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
+    if (operation.tryComplete()) return true

Review comment:
       This is an existing issue. I am not sure if calling `tryComplete()` 
without holding the operation's lock guarantees visibility to another thread. 
For example, thread 1 changes the state in the operation in `tryComplete()`. It 
then calls `tryComplete()` holding the operations's lock but doesn't change the 
state in the operation. thread 2 calls `tryComplete()` holding the operations's 
lock. Is thread 2 guaranteed to see the changes made by thread 1 since the 
update was made without crossing the memory boundary by subsequent readers?
   
   If this is an issue, we could extend to lock to do the first `tryComplete()` 
check.

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
##########
@@ -201,8 +201,8 @@ object AbstractCoordinatorConcurrencyTest {
         }
       }
       val producerRequestKeys = 
entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
-      watchKeys ++= producerRequestKeys
       producePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys)
+      watchKeys ++= producerRequestKeys

Review comment:
       Do we still need this change to avoid deadlocks?

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -231,26 +214,27 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
 
     // At this point the only thread that can attempt this operation is this 
current thread
     // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
+    if (operation.tryComplete()) return true
+
+    // There is a potential deadlock if we don't hold the lock while adding 
the operation to watch list and do the
+    // final tryComplete() check. For example,
+    // 1) thread_a holds lock_a
+    // 2) thread_a is executing tryCompleteElseWatch
+    // 3) thread_a adds the op to watch list
+    // 4) thread_b holds lock of op to complete op
+    // 5) thread_b calls op's onComplete which requiring lock_a
+    // 6) thread_a requires lock of op to call safeTryComplete
+    if (inLock(operation.lock) {

Review comment:
       It seems that we don't need the `if` here?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1010,15 +1010,7 @@ class Partition(val topicPartition: TopicPartition,
       }
     }
 
-    // some delayed operations may be unblocked after HW changed
-    if (leaderHWIncremented)
-      tryCompleteDelayedRequests()
-    else {
-      // probably unblock some follower fetch requests since log end offset 
has been updated
-      delayedOperations.checkAndCompleteFetch()
-    }
-
-    info
+    info.copy(leaderHwChange = if (leaderHWIncremented) 
LeaderHwChange.Increased else LeaderHwChange.Same)

Review comment:
       With this change, `DelayedOperations.checkAndCompleteFetch()` is only 
used in tests. I am wondering if it can be removed. It's fine if we want to do 
this in a followup PR.
   
   Unrelated to this PR, `DelayedOperations.checkAndCompleteProduce` and 
`DelayedOperations.checkAndCompleteDeleteRecords` seem unused. We can probably 
remove them in a separate PR.

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -231,26 +214,27 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
 
     // At this point the only thread that can attempt this operation is this 
current thread
     // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
+    if (operation.tryComplete()) return true
+
+    // There is a potential deadlock if we don't hold the lock while adding 
the operation to watch list and do the
+    // final tryComplete() check. For example,
+    // 1) thread_a holds lock_a
+    // 2) thread_a is executing tryCompleteElseWatch
+    // 3) thread_a adds the op to watch list
+    // 4) thread_b holds lock of op to complete op
+    // 5) thread_b calls op's onComplete which requiring lock_a
+    // 6) thread_a requires lock of op to call safeTryComplete
+    if (inLock(operation.lock) {
+      var watchCreated = false
+      watchKeys.foreach { key =>
+        watchForOperation(key, operation)
+        if (!watchCreated) {
+          watchCreated = true
+          estimatedTotalOperations.incrementAndGet()
+        }
       }
-    }
-
-    isCompletedByMe = operation.maybeTryComplete()
-    if (isCompletedByMe)
-      return true
+      operation.tryComplete()
+    }) return true

Review comment:
       This approach is fine but leaks operation.lock beyond tests. Another way 
to package this is to add a new method in DelayedOperation like 
tryCompleteAndMaybeWatch(). If that's not very clean, we can keep the current 
approach. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to