chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r484097161



##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -231,26 +214,27 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
 
     // At this point the only thread that can attempt this operation is this 
current thread
     // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
+    if (operation.tryComplete()) return true
+
+    // There is a potential deadlock if we don't hold the lock while adding 
the operation to watch list and do the
+    // final tryComplete() check. For example,
+    // 1) thread_a holds lock_a
+    // 2) thread_a is executing tryCompleteElseWatch
+    // 3) thread_a adds the op to watch list
+    // 4) thread_b holds lock of op to complete op
+    // 5) thread_b calls op's onComplete which requiring lock_a
+    // 6) thread_a requires lock of op to call safeTryComplete
+    if (inLock(operation.lock) {
+      var watchCreated = false
+      watchKeys.foreach { key =>
+        watchForOperation(key, operation)
+        if (!watchCreated) {
+          watchCreated = true
+          estimatedTotalOperations.incrementAndGet()
+        }
       }
-    }
-
-    isCompletedByMe = operation.maybeTryComplete()
-    if (isCompletedByMe)
-      return true
+      operation.tryComplete()

Review comment:
       Pardon me, the latest commit does it. If the tryComplete return true, 
this method return true also.
   
   Or you mean we can do return in the lambda function directly? If so, the 
reason I don’t address it is the impl of return in lambda is to throws and then 
catch exception.That is anti-pattern in scala and we should avoid it from our 
hot methods.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to