This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6495a0768ca KAFKA-14032; Dequeue time for forwarded requests is unset 
(#12360)
6495a0768ca is described below

commit 6495a0768cae1086d4b1dfa466967dfe178c1553
Author: YU <[email protected]>
AuthorDate: Thu Jul 7 04:21:28 2022 +0800

    KAFKA-14032; Dequeue time for forwarded requests is unset (#12360)
    
    When building a forwarded request, we need to override the dequeue time of 
the underlying request to match the same value as the envelope. Otherwise, the 
field is left unset, which causes inaccurate reporting.
    
    Reviewers; Jason Gustafson <[email protected]>
---
 core/src/main/scala/kafka/server/EnvelopeUtils.scala      |  5 ++++-
 core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 10 +++++++---
 core/src/test/scala/unit/kafka/utils/TestUtils.scala      |  5 ++++-
 3 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/server/EnvelopeUtils.scala 
b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
index a162ae5fe80..97c532ebb45 100644
--- a/core/src/main/scala/kafka/server/EnvelopeUtils.scala
+++ b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
@@ -84,7 +84,7 @@ object EnvelopeUtils {
     requestChannelMetrics: RequestChannel.Metrics
   ): RequestChannel.Request = {
     try {
-      new RequestChannel.Request(
+      val forwardedRequest = new RequestChannel.Request(
         processor = envelope.processor,
         context = forwardedContext,
         startTimeNanos = envelope.startTimeNanos,
@@ -93,6 +93,9 @@ object EnvelopeUtils {
         requestChannelMetrics,
         Some(envelope)
       )
+      // set the dequeue time of forwardedRequest as the value of envelope 
request
+      forwardedRequest.requestDequeueTimeNanos = 
envelope.requestDequeueTimeNanos
+      forwardedRequest
     } catch {
       case e: InvalidRequestException =>
         // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index cc34cabe05a..d176f369f8d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -307,8 +307,10 @@ class KafkaApisTest {
         Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
     val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion)
 
+    val startTimeNanos = time.nanoseconds()
+    val queueDurationNanos = 5 * 1000 * 1000
     val request = TestUtils.buildEnvelopeRequest(
-      alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
time.nanoseconds())
+      alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
startTimeNanos, startTimeNanos + queueDurationNanos)
 
     val capturedResponse: ArgumentCaptor[AlterConfigsResponse] = 
ArgumentCaptor.forClass(classOf[AlterConfigsResponse])
     val capturedRequest: ArgumentCaptor[RequestChannel.Request] = 
ArgumentCaptor.forClass(classOf[RequestChannel.Request])
@@ -321,6 +323,8 @@ class KafkaApisTest {
       any()
     )
     assertEquals(Some(request), capturedRequest.getValue.envelope)
+    // the dequeue time of forwarded request should equals to envelop request
+    assertEquals(request.requestDequeueTimeNanos, 
capturedRequest.getValue.requestDequeueTimeNanos)
     val innerResponse = capturedResponse.getValue
     val responseMap = innerResponse.data.responses().asScala.map { 
resourceResponse =>
       resourceResponse.resourceName() -> 
Errors.forCode(resourceResponse.errorCode)
@@ -397,7 +401,7 @@ class KafkaApisTest {
       .build(requestHeader.apiVersion)
 
     val request = TestUtils.buildEnvelopeRequest(
-      alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
time.nanoseconds(), fromPrivilegedListener)
+      alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
time.nanoseconds(), fromPrivilegedListener = fromPrivilegedListener)
 
     val capturedResponse: ArgumentCaptor[AbstractResponse] = 
ArgumentCaptor.forClass(classOf[AbstractResponse])
     createKafkaApis(authorizer = Some(authorizer), enableForwarding = 
true).handle(request, RequestLocal.withThreadConfinedCaching)
@@ -1614,7 +1618,7 @@ class KafkaApisTest {
 
       assertEquals(1, response.data.responses.size)
       val topicProduceResponse = response.data.responses.asScala.head
-      assertEquals(1, topicProduceResponse.partitionResponses.size)   
+      assertEquals(1, topicProduceResponse.partitionResponses.size)
       val partitionProduceResponse = 
topicProduceResponse.partitionResponses.asScala.head
       assertEquals(Errors.INVALID_PRODUCER_EPOCH, 
Errors.forCode(partitionProduceResponse.errorCode))
     }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 05610413e98..01a888c1667 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -2182,6 +2182,7 @@ object TestUtils extends Logging {
     principalSerde: KafkaPrincipalSerde,
     requestChannelMetrics: RequestChannel.Metrics,
     startTimeNanos: Long,
+    dequeueTimeNanos: Long = -1,
     fromPrivilegedListener: Boolean = true
   ): RequestChannel.Request = {
     val clientId = "id"
@@ -2203,7 +2204,7 @@ object TestUtils extends Logging {
       KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT, 
ClientInformation.EMPTY,
       fromPrivilegedListener, Optional.of(principalSerde))
 
-    new RequestChannel.Request(
+    val envelopRequest = new RequestChannel.Request(
       processor = 1,
       context = envelopeContext,
       startTimeNanos = startTimeNanos,
@@ -2212,6 +2213,8 @@ object TestUtils extends Logging {
       metrics = requestChannelMetrics,
       envelope = None
     )
+    envelopRequest.requestDequeueTimeNanos = dequeueTimeNanos
+    envelopRequest
   }
 
   def verifyNoUnexpectedThreads(context: String): Unit = {

Reply via email to