junrao commented on pull request #6915:
URL: https://github.com/apache/kafka/pull/6915#issuecomment-626031375


   @chia7712 :  Thanks for your interest. My understand of the issue is the 
following. We added the retry logic based on `tryCompletePending` in 
`DelayedOperation.maybeTryComplete()` like the following.
   
   ```
     private[server] def maybeTryComplete(): Boolean = {
       var retry = false
       var done = false
       do {
         if (lock.tryLock()) {
           try {
             tryCompletePending.set(false)
             done = tryComplete()
           } finally {
             lock.unlock()
           }
           // While we were holding the lock, another thread may have invoked 
`maybeTryComplete` and set
           // `tryCompletePending`. In this case we should retry.
           retry = tryCompletePending.get()
         } else {
           // Another thread is holding the lock. If `tryCompletePending` is 
already set and this thread failed to
           // acquire the lock, then the thread that is holding the lock is 
guaranteed to see the flag and retry.
           // Otherwise, we should set the flag and retry on this thread since 
the thread holding the lock may have
           // released the lock and returned by the time the flag is set.
           retry = !tryCompletePending.getAndSet(true)
         }
       } while (!isCompleted && retry)
   ```
   
   This is causing the issue in KAFKA-8334 since the logic expects when a 
caller of `DelayedOperation.maybeTryComplete()` can't get the lock, the other 
caller holding the lock will try to complete the operation. This is not the 
case when the caller is handling the replication fetch request on the 
offset_commit topic and the lock holding caller is handling the HeartBeat 
request. 
   
   If we remove the logic based on `tryCompletePending` and revert to the logic 
before by just blocking on the lock, we can avoid the above issue.
   
   The reason that we added the `tryCompletePending` logic is to avoid deadlock 
in the following situation. We hold a group lock while calling 
ReplicaManager.appendRecords(). After it appends to the local log, it may call 
ReplicaManager.tryCompleteDelayedProduce(), which may need to hold a different 
group lock (since the key of the operation is a topic partition on which many 
groups can reside), which can cause a deadlock.
   
   The above seems to be the only case where deadlock can be generated. For two 
other purgatories used in GroupCoordinator--Heartbeat and Join, their patterns 
are different from how produce purgatory is used. In these two cases, the 
pattern is that the caller holds a group level lock when calling 
checkAndComplete. Inside the DelayedOperation, tryComplete()/onComplete() also 
hold onto the same group lock. The key associated with the Join/Heartbeat 
delayed operation is group or group + member. This means only pending 
operations on the same group may have their watcher callback triggered.
   
   By splitting ReplicaManager.appendRecords() into 2 methods: appendToLocal() 
and checkPurgatory(), we hold a group lock to call in the former for 
serialization, but don't hold a group lock to the latter. This way, there is no 
room for deadlock. This means we can get rid of the `tryCompletePending` logic 
in `DelayedOperation.maybeTryComplete()`, which will address the issue in 
KAFKA-8334. It also simplifies the logic in DelayedOperation instead of making 
it more complicated.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to