artemlivshits commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1160875392


##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -108,11 +108,10 @@ class KafkaRequestHandler(id: Int,
           completeShutdown()
           return
 
-        case request: RequestChannel.CallbackRequest =>
+        case callback: RequestChannel.CallbackRequest =>
           try {
-            request.originalRequest.callbackRequestDequeTimeNanos = 
Some(time.nanoseconds())
-            request.fun() 
-            request.originalRequest.callbackRequestCompleteTimeNanos = 
Some(time.nanoseconds())
+            callback.originalRequest.callbackRequestDequeTimeNanos = 
Some(time.nanoseconds())
+            callback.fun()

Review Comment:
   I think it's still good to keep the 
request.originalRequest.callbackRequestCompleteTimeNanos = 
Some(time.nanoseconds()) here as well.  Looking at the code, request could end 
in many different ways, some could happen before callback completion, some may 
happen after callback completion.  There are some error cases, like connection 
disconnects that may not go through the success path.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2334,6 +2334,10 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
+      // If this callback was executed after KafkaApis returned we will need 
to adjust the callback completion time here.
+      if (request.callbackRequestDequeTimeNanos.isDefined)
+        request.callbackRequestCompleteTimeNanos = Some(time.nanoseconds())
+

Review Comment:
   Instead of doing this, I would put the 
request.callbackRequestCompleteTimeNanos = Some(time.nanoseconds()) back after 
the callback.fun() call and just update the place where we update metrics to 
take the current time if callbackRequestCompleteTimeNanos is empty.  Then we 
don't need chase all the code paths that we could go through before updating 
the metrics.



##########
core/src/main/scala/kafka/network/RequestChannel.scala:
##########
@@ -227,8 +233,9 @@ object RequestChannel extends Logging {
       }
 
       val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - 
startTimeNanos)
-      val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - 
requestDequeueTimeNanos)
-      val apiRemoteTimeMs = nanosToMs(responseCompleteTimeNanos - 
apiLocalCompleteTimeNanos)
+      val callbackRequestTimeNanos = 
callbackRequestDequeTimeNanos.getOrElse(0L) - 
callbackRequestCompleteTimeNanos.getOrElse(0L)

Review Comment:
   1. Isn't it backwards?  The complete time should be larger than deque time.
   2.  If we do the following logic, then it should work regardless of whether 
we arrive here before setting callbackRequestCompleteTimeNanos or after setting 
callbackRequestCompleteTimeNanos.
   
   ```
   val callbackRequestTimeNanos = 
callbackRequestCompleteTimeNanos.getOrElse(endTimeNanos) - 
callbackRequestDequeTimeNanos.getOrElse(endTimeNanos)
   ````



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to