junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r484606811



##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -228,29 +212,33 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
     // if the operation is completed (by another thread) between the two 
tryComplete() calls, the
     // operation is unnecessarily added for watch. However, this is a less 
severe issue since the
     // expire reaper will clean it up periodically.
-
-    // At this point the only thread that can attempt this operation is this 
current thread
-    // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
-      }
-    }
-
-    isCompletedByMe = operation.maybeTryComplete()
-    if (isCompletedByMe)
-      return true
+    //

Review comment:
       The above comment is a bit out of context now. Perhaps we could change 
"we do the check in the following way" to "we do the check in the following way 
through safeTryCompleteAndElse()".
   
   

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -100,42 +102,24 @@ abstract class DelayedOperation(override val delayMs: 
Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the 
lock can be acquired
-   * without blocking.
-   *
-   * If threadA acquires the lock and performs the check for completion before 
completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the 
lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted 
again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to 
acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
-   * the operation is actually completed.
+   * Thread-safe variant of tryComplete() and call extra function if first 
tryComplete returns false
+   * @param f else function to be executed after first tryComplete returns 
false
+   * @return result of tryComplete
    */
-  private[server] def maybeTryComplete(): Boolean = {
-    var retry = false
-    var done = false
-    do {
-      if (lock.tryLock()) {
-        try {
-          tryCompletePending.set(false)
-          done = tryComplete()
-        } finally {
-          lock.unlock()
-        }
-        // While we were holding the lock, another thread may have invoked 
`maybeTryComplete` and set
-        // `tryCompletePending`. In this case we should retry.
-        retry = tryCompletePending.get()
-      } else {
-        // Another thread is holding the lock. If `tryCompletePending` is 
already set and this thread failed to
-        // acquire the lock, then the thread that is holding the lock is 
guaranteed to see the flag and retry.
-        // Otherwise, we should set the flag and retry on this thread since 
the thread holding the lock may have
-        // released the lock and returned by the time the flag is set.
-        retry = !tryCompletePending.getAndSet(true)
-      }
-    } while (!isCompleted && retry)
-    done
+  private[server] def safeTryCompleteAndElse(f: => Unit): Boolean = 
inLock(lock) {

Review comment:
       safeTryCompleteAndElse => safeTryCompleteOrElse ?

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -228,29 +212,33 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
     // if the operation is completed (by another thread) between the two 
tryComplete() calls, the
     // operation is unnecessarily added for watch. However, this is a less 
severe issue since the
     // expire reaper will clean it up periodically.
-
-    // At this point the only thread that can attempt this operation is this 
current thread
-    // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
-      }
-    }
-
-    isCompletedByMe = operation.maybeTryComplete()
-    if (isCompletedByMe)
-      return true
+    //
+    // ==============[story about lock]==============
+    // There is a potential deadlock between the callers to 
tryCompleteElseWatch() and checkAndComplete() in practice
+    // if we don't hold the lock while adding the operation to watch
+    // list and do the final tryComplete() check. For example,

Review comment:
       Perhaps change the above to the following?
   
   We hold the operation's lock while adding the operation to watch list and 
doing the tryComplete() check. This is to avoid a potential deadlock between 
the callers to tryCompleteElseWatch() and checkAndComplete(). For example, the 
following deadlock can happen if the lock is only held for the final 
tryComplete() check.

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
##########
@@ -202,9 +201,8 @@ object AbstractCoordinatorConcurrencyTest {
         }
       }
       val producerRequestKeys = 
entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
-      watchKeys ++= producerRequestKeys
       producePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys)
-      tryCompleteDelayedRequests()
+      watchKeys ++= producerRequestKeys

Review comment:
       Do we still need to change the ordering now that we always call 
tryComplete() with lock in tryCompleteElseWatch?

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -228,29 +212,33 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
     // if the operation is completed (by another thread) between the two 
tryComplete() calls, the
     // operation is unnecessarily added for watch. However, this is a less 
severe issue since the
     // expire reaper will clean it up periodically.
-
-    // At this point the only thread that can attempt this operation is this 
current thread
-    // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
-      }
-    }
-
-    isCompletedByMe = operation.maybeTryComplete()
-    if (isCompletedByMe)
-      return true
+    //
+    // ==============[story about lock]==============
+    // There is a potential deadlock between the callers to 
tryCompleteElseWatch() and checkAndComplete() in practice
+    // if we don't hold the lock while adding the operation to watch
+    // list and do the final tryComplete() check. For example,
+    // 1) thread_a holds readlock of stateLock from TransactionStateManager
+    // 2) thread_a is executing tryCompleteElseWatch
+    // 3) thread_a adds op to watch list
+    // 4) thread_b requires writelock of stateLock from 
TransactionStateManager (blocked by thread_a)
+    // 5) thread_c calls checkAndComplete () and holds lock of op

Review comment:
       checkAndComplete () => checkAndComplete()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to