artemlivshits commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1147156380
##########
core/src/main/scala/kafka/network/RequestChannel.scala:
##########
@@ -456,9 +460,16 @@ class RequestChannel(val queueSize: Int,
}
}
- /** Get the next request or block until specified time has elapsed */
- def receiveRequest(timeout: Long): RequestChannel.BaseRequest =
- requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
+ /** Get the next request or block until specified time has elapsed
+ * Check the callback queue and execute first if present since these
+ * requests have already waited in line. */
+ def receiveRequest(timeout: Long): RequestChannel.BaseRequest = {
+ val callbackRequest = callbackQueue.poll()
+ if (callbackRequest != null)
+ callbackRequest
+ else
+ requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
Review Comment:
We could probably handle WakeupRequest in this function, so that the wakeup
mechanism is encapsulated in RequestChannel (i.e. check if we got a wakeup
request from the requestQueue and poll the callbackQueue again in that case).
##########
core/src/main/scala/kafka/network/RequestChannel.scala:
##########
@@ -481,6 +493,11 @@ class RequestChannel(val queueSize: Int,
def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
+ def sendCallbackRequest(request: CallbackRequest): Unit = {
+ callbackQueue.put(request)
+ requestQueue.put(RequestChannel.WakeupRequest)
Review Comment:
This should be .offer -- we don't need to block if the request queue is
full, and it's ok if we don't have a wakeup request in a full queue -- the
queue would would contain a request (due to the fact that it's full) to wake up
the poll.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]