junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r441177055



##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -100,40 +99,27 @@ abstract class DelayedOperation(override val delayMs: Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the 
lock can be acquired
-   * without blocking.
+   * Thread-safe variant of tryComplete() that attempts completion after it 
succeed to hold the lock.
    *
-   * If threadA acquires the lock and performs the check for completion before 
completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the 
lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted 
again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to 
acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
-   * the operation is actually completed.
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There were a lot of cases that a thread holds a group 
lock and then it tries to hold more group
+   * locks to complete delayed requests. Unfortunately, the scenario causes 
deadlock and so we had introduced the
+   * "tryLock" to avoid deadlock.
+   *
+   * 2) using tryLock -  However, the "tryLock" causes another issue that the 
delayed requests may be into
+   * oblivion if the thread, which should complete the delayed requests, fails 
to get the lock.
+   *
+   * Now, we go back to use "lock" and make sure the thread which tries to 
complete delayed requests does NOT hold lock.
+   * We introduces a flag "completeDelayedRequests" to 
ReplicaManager.appendRecords() (and related methods).
+   *
+   * 1) If "completeDelayedRequests" is true, the delayed requests may be 
completed as much as possible.

Review comment:
        the delayed requests may be completed as much as possible =>  the 
delayed requests may be completed inside the call with the expectation that no 
conflicting locks are held by the caller

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -560,19 +603,23 @@ class ReplicaManager(val config: KafkaConfig,
    * Append messages to leader replicas of the partition, and wait for them to 
be replicated to other replicas;
    * the callback function will be triggered either when timeout or the 
required acks are satisfied;
    * if the callback function itself is already synchronized on some object 
then pass this object to avoid deadlock.
+   * @return Returning a map of successfully appended topic partitions and a 
flag indicting whether the HWM has been
+   *         incremented. If the caller no longer passed in 
completeDelayedRequests, the caller is expected to complete

Review comment:
       "If the caller no longer passed in completeDelayedRequests" The caller 
still passes this in, just as false.

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -100,40 +99,27 @@ abstract class DelayedOperation(override val delayMs: Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the 
lock can be acquired
-   * without blocking.
+   * Thread-safe variant of tryComplete() that attempts completion after it 
succeed to hold the lock.
    *
-   * If threadA acquires the lock and performs the check for completion before 
completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the 
lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted 
again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to 
acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
-   * the operation is actually completed.
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There were a lot of cases that a thread holds a group 
lock and then it tries to hold more group
+   * locks to complete delayed requests. Unfortunately, the scenario causes 
deadlock and so we had introduced the
+   * "tryLock" to avoid deadlock.
+   *
+   * 2) using tryLock -  However, the "tryLock" causes another issue that the 
delayed requests may be into
+   * oblivion if the thread, which should complete the delayed requests, fails 
to get the lock.
+   *
+   * Now, we go back to use "lock" and make sure the thread which tries to 
complete delayed requests does NOT hold lock.
+   * We introduces a flag "completeDelayedRequests" to 
ReplicaManager.appendRecords() (and related methods).
+   *
+   * 1) If "completeDelayedRequests" is true, the delayed requests may be 
completed as much as possible.
+   * 2) If "completeDelayedRequests" is false, the delayed requests are NOT 
completed and the watch key (for example,
+   *    group key) are returned. Callers can complete the delayed requests 
manually.

Review comment:
       Callers can complete the delayed requests manually => Callers can 
complete the delayed requests after releasing any conflicting lock.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1108,7 +1157,11 @@ class GroupCoordinator(val brokerId: Int,
     joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
   }
 
-  private def removeMemberAndUpdateGroup(group: GroupMetadata, member: 
MemberMetadata, reason: String): Unit = {
+  /**
+   * @return the group which is preparing to rebalacne. Callers should use 
this group key to complete delayed requests

Review comment:
       typo rebalacne

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -239,9 +239,13 @@ class GroupMetadataManager(brokerId: Int,
     }
   }
 
+  /**
+   * @return Returning a map of successfully appended topic partitions and a 
flag indicting whether the HWM has been
+   *         incremented. The caller ought to to complete delayed requests for 
those returned partitions.

Review comment:
       to to  => to 

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##########
@@ -307,8 +307,14 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
     override def runWithCallback(member: GroupMember, responseCallback: 
CompleteTxnCallback): Unit = {
       val producerId = 1000L
       val offsetsPartitions = (0 to numPartitions).map(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _))
-      groupCoordinator.groupManager.handleTxnCompletion(producerId,
-        offsetsPartitions.map(_.partition).toSet, isCommit = 
random.nextBoolean)
+      val isCommit = random.nextBoolean
+      try groupCoordinator.groupManager.handleTxnCompletion(producerId,
+        offsetsPartitions.map(_.partition).toSet, isCommit = isCommit)
+      catch {
+        case e: IllegalStateException if isCommit
+          && e.getMessage.contains("though the offset commit record itself 
hasn't been appended to the log")=>

Review comment:
       Thanks for the great explanation. I understand the issue now. 
Essentially, this exposed a limitation of the existing test. The existing test 
happens to work because the producer callbacks are always completed in the same 
ReplicaManager.appendRecords() call under the group lock. However, this is not 
necessarily the general case.
   
   Your fix works, but may hide other real problems. I was thinking that 
another way to fix this is to change the test a bit. For example, we expect 
CompleteTxnOperation to happen after CommitTxnOffsetsOperation. So, instead of 
letting them run in parallel, we can change the test to make sure that 
CompleteTxnOperation only runs after CommitTxnOffsetsOperation completes 
successfully. JoinGroupOperation and SyncGroupOperation might need a similar 
consideration.
   

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1118,33 +1171,38 @@ class GroupCoordinator(val brokerId: Int,
     group.removeStaticMember(member.groupInstanceId)
 
     group.currentState match {
-      case Dead | Empty =>
-      case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
-      case PreparingRebalance => 
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+      case Dead | Empty => None
+      case Stable | CompletingRebalance =>
+        maybePrepareRebalance(group, reason)
+        None
+      case PreparingRebalance => Some(GroupKey(group.groupId))
     }
   }
 
-  private def removePendingMemberAndUpdateGroup(group: GroupMetadata, 
memberId: String): Unit = {
+  /**
+   * remove the pending member and then return the group key which is in 
PreparingRebalance,
+   * @param group group
+   * @param memberId member id
+   * @return group key if it is in PreparingRebalance. Otherwise, None
+   */
+  private def removePendingMemberAndUpdateGroup(group: GroupMetadata, 
memberId: String): Option[GroupKey] = {
     group.removePendingMember(memberId)
 
-    if (group.is(PreparingRebalance)) {
-      joinPurgatory.checkAndComplete(GroupKey(group.groupId))
-    }
-  }
-
-  def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
-    group.inLock {
-      if (group.hasAllMembersJoined)
-        forceComplete()
-      else false
-    }
+    if (group.is(PreparingRebalance)) Some(GroupKey(group.groupId))
+    else None
   }
 
   def onExpireJoin(): Unit = {
     // TODO: add metrics for restabilize timeouts
   }
 
-  def onCompleteJoin(group: GroupMetadata): Unit = {
+  /**
+   * @return Returning a map of successfully appended topic partitions and a 
flag indicting whether the HWM has been
+   *         incremented. If the caller no longer passed in 
completeDelayedRequests, the caller is expected to complete

Review comment:
       " If the caller no longer passed in completeDelayedRequests" => There is 
no completeDelayedRequests passed in. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to