junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479684163



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -85,6 +85,8 @@ object LogAppendInfo {
  * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically 
increasing
  * @param lastOffsetOfFirstBatch The last offset of the first batch
+ * @param leaderHWIncremented true if the high watermark is increased when 
appending record. Otherwise, false.
+ *                            this field is updated after appending record so 
the default value is None.

Review comment:
       Perhaps we can make this a bit clearer. Sth like the following.
   
   leaderHWIncremented has 3 possible values: (1) If records are not appended 
due to an error, the value will be None; (2) if records are appended 
successfully and HWM is advanced, the value is Some(true); (3) if records are 
appended successfully and HWM is not advanced, the value is Some(false).

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -180,6 +181,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DESCRIBE_CLIENT_QUOTAS => 
handleDescribeClientQuotasRequest(request)
         case ApiKeys.ALTER_CLIENT_QUOTAS => 
handleAlterClientQuotasRequest(request)
       }
+
+      // try to complete delayed action. In order to avoid conflicting 
locking, the actions to complete delayed requests
+      // are kept in a queue. We add the logic to check the ReplicaManager 
queue at the end of KafkaApis.handle() and the
+      // expiration thread for certain delayed operations (e.g. DelayedJoin)
+      actionQueue.tryCompleteAction()

Review comment:
       Yes, if actionQueue.tryCompleteAction() throws an exception, we can just 
catch it and log a warning in finally since the response has been sent by then.

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -134,6 +134,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
   private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
   private val KAFKA_BROKER_ID: String = "kafka.broker.id"
 
+  private val actionQueue = new ActionQueue

Review comment:
       Note that the action queue is not only called by requests threads, but 
also by the expiration thread for certain delayed operations.

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -100,41 +99,22 @@ abstract class DelayedOperation(override val delayMs: Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the 
lock can be acquired
-   * without blocking.
    *
-   * If threadA acquires the lock and performs the check for completion before 
completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the 
lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted 
again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to 
acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
-   * the operation is actually completed.
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There was a lot of cases that a thread holds a group lock 
and then it tries to hold more group
+   * locks to complete delayed requests. Unfortunately, the scenario causes 
deadlock and so we had introduced the
+   * "tryLock" to avoid deadlock.
+   *
+   * 2) using tryLock -  However, the "tryLock" causes another issue that the 
delayed requests may be into
+   * oblivion if the thread, which should complete the delayed requests, fails 
to get the lock.
+   *
+   * Now, we go back to use "lock" and make sure the thread which tries to 
complete delayed requests does NOT hold lock.
+   * The approach is that ReplicaManager collects all actions, which are used 
to complete delayed requests, in a queue.
+   * KafkaApis.handle() and the expiration thread for certain delayed 
operations (e.g. DelayedJoin) pick up and then
+   * execute an action when no lock is held.

Review comment:
       I was thinking to add a comment so that if someone adds a future delayed 
operation that calls ReplicaManager.appendRecords() in onComplete() like 
DelayedJoin, he/she is aware that this operation's onExpiration() needs to call 
actionQueue.tryCompleteAction().

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -562,6 +564,10 @@ class ReplicaManager(val config: KafkaConfig,
    * Append messages to leader replicas of the partition, and wait for them to 
be replicated to other replicas;
    * the callback function will be triggered either when timeout or the 
required acks are satisfied;
    * if the callback function itself is already synchronized on some object 
then pass this object to avoid deadlock.
+   *
+   * Noted that all pending delayed check operations are stored in a queue. 
All callers to ReplicaManager.appendRecords()
+   * are expected to take up to 1 item from that queue and check the 
completeness for all affected partitions, without

Review comment:
       Good question. It's based on the assumption that each KafkaApis.handle() 
call only calls ReplicaManager. appendRecords() once. Not sure if this is 
always true in the future. Perhaps a safer approach is to have 
Action.tryCompleteAction() get the current size of the queue and complete all 
those actions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to