junrao commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1147813111


##########
core/src/main/scala/kafka/network/RequestChannel.scala:
##########
@@ -483,14 +493,10 @@ class RequestChannel(val queueSize: Int,
 
   def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
 
-  // TODO: this is the most straightforward implementation, we may want to 
address fairness:
-  // The callback function is invoked as part of processing a request, so it 
already "stood in line"
-  // when the original request got accepted, so it may be unfair to make put 
to the end of the line.
-  // A simple solution would be to put it to the head of the line, but then 
multiple callbacks would
-  // be in LIFO order, which is weird.
-  // We may want to use a PriorityBlockingQueue or just have 2 queues and 
write customer wait / notify
-  // synchronization.
-  def sendCallbackRequest(request: CallbackRequest): Unit = 
requestQueue.put(request)
+  def sendCallbackRequest(request: CallbackRequest): Unit = {
+    callbackQueue.put(request)

Review Comment:
   It clears all entries it sees at the beginning.
   
   ```
     def tryCompleteActions(): Unit = {
       val maxToComplete = queue.size()
       var count = 0
       var done = false
       while (!done && count < maxToComplete) {
         try {
           val action = queue.poll()
           if (action == null) done = true
           else action()
         } catch {
           case e: Throwable =>
             error("failed to complete delayed actions", e)
         } finally count += 1
       }
     }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to