artemlivshits commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1160875392
########## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ########## @@ -108,11 +108,10 @@ class KafkaRequestHandler(id: Int, completeShutdown() return - case request: RequestChannel.CallbackRequest => + case callback: RequestChannel.CallbackRequest => try { - request.originalRequest.callbackRequestDequeTimeNanos = Some(time.nanoseconds()) - request.fun() - request.originalRequest.callbackRequestCompleteTimeNanos = Some(time.nanoseconds()) + callback.originalRequest.callbackRequestDequeTimeNanos = Some(time.nanoseconds()) + callback.fun() Review Comment: I think it's still good to keep the request.originalRequest.callbackRequestCompleteTimeNanos = Some(time.nanoseconds()) here as well. Looking at the code, request could end in many different ways, some could happen before callback completion, some may happen after callback completion. There are some error cases, like connection disconnects that may not go through the success path. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2334,6 +2334,10 @@ class KafkaApis(val requestChannel: RequestChannel, } } + // If this callback was executed after KafkaApis returned we will need to adjust the callback completion time here. + if (request.callbackRequestDequeTimeNanos.isDefined) + request.callbackRequestCompleteTimeNanos = Some(time.nanoseconds()) + Review Comment: Instead of doing this, I would put the request.callbackRequestCompleteTimeNanos = Some(time.nanoseconds()) back after the callback.fun() call and just update the place where we update metrics to take the current time if callbackRequestCompleteTimeNanos is empty. Then we don't need chase all the code paths that we could go through before updating the metrics. ########## core/src/main/scala/kafka/network/RequestChannel.scala: ########## @@ -227,8 +233,9 @@ object RequestChannel extends Logging { } val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos) - val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos) - val apiRemoteTimeMs = nanosToMs(responseCompleteTimeNanos - apiLocalCompleteTimeNanos) + val callbackRequestTimeNanos = callbackRequestDequeTimeNanos.getOrElse(0L) - callbackRequestCompleteTimeNanos.getOrElse(0L) Review Comment: 1. Isn't it backwards? The complete time should be larger than deque time. 2. If we do the following logic, then it should work regardless of whether we arrive here before setting callbackRequestCompleteTimeNanos or after setting callbackRequestCompleteTimeNanos. ``` val callbackRequestTimeNanos = callbackRequestCompleteTimeNanos.getOrElse(endTimeNanos) - callbackRequestDequeTimeNanos.getOrElse(endTimeNanos) ```` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org