This is an automated email from the ASF dual-hosted git repository.

tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f981bb6e0b Fix extra router statusCode metric edge cases (#18699)
1f981bb6e0b is described below

commit 1f981bb6e0b0148579662ca08c7102ef6fd646a5
Author: jtuglu1 <[email protected]>
AuthorDate: Tue Oct 28 11:13:00 2025 -0700

    Fix extra router statusCode metric edge cases (#18699)
    
    The `statusCode` dimension on `query/time` metric will misreport if 
connection exceptions occur while router/broker are communicating (e.g. there's 
no HTTP status code yet – Jetty sets defaults this to 0) or when a client 
manually closes the connection while a downstream query is still in-flight. We 
cannot perfectly map every error in this case (nor do I think we should), but 
it's better than misreporting incorrect codes. Since we need to report a metric 
event anyways, maintaining a f [...]
---
 .../druid/server/AsyncQueryForwardingServlet.java  |  33 +++++-
 .../server/AsyncQueryForwardingServletTest.java    | 127 +++++++++++++++++++++
 2 files changed, 156 insertions(+), 4 deletions(-)

diff --git 
a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
 
b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
index 3eedf19a4b2..85abd832fef 100644
--- 
a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
+++ 
b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
@@ -107,6 +107,8 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
   private static final String PROPERTY_SQL_ENABLE_DEFAULT = "false";
 
   private static final long CANCELLATION_TIMEOUT_MILLIS = 
TimeUnit.SECONDS.toMillis(5);
+  // Jetty-specific default (un-assigned) status code
+  private static final int UNASSIGNED_DEFAULT_STATUS_CODE = 0;
 
   private final AtomicLong successfulQueryCount = new AtomicLong();
   private final AtomicLong failedQueryCount = new AtomicLong();
@@ -760,8 +762,8 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
         return;
       }
 
-      final int statusCode = result.getResponse().getStatus();
-      boolean success = result.isSucceeded() && statusCode == 
Status.OK.getStatusCode();
+      final boolean success = result.isSucceeded() && 
result.getResponse().getStatus() == Status.OK.getStatusCode();
+      final int statusCode = determineStatusCode(success, 
result.getResponse().getStatus());
       if (success) {
         successfulQueryCount.incrementAndGet();
       } else {
@@ -770,6 +772,7 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
 
       // As router is simply a proxy, we don't make an effort to construct the 
error code from the exception ourselves.
       // We rely on broker to set this for us if the error occurs downstream.
+      // Otherwise, if there's a router/client error, we log this as an 
unknown error.
       emitQueryTime(requestTimeNs, success, sqlQueryId, queryId, statusCode);
 
       AuthenticationResult authenticationResult = 
AuthorizationUtils.authenticationResultFromRequest(req);
@@ -857,8 +860,10 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
       failedQueryCount.incrementAndGet();
 
       // As router is simply a proxy, we don't make an effort to construct the 
error code from the exception ourselves.
-      // We rely on broker to set this for us if the error occurs downstream.
-      emitQueryTime(requestTimeNs, false, sqlQueryId, queryId, 
response.getStatus());
+      // We rely on broker to set this for us if the error occurs downstream. 
+      // Otherwise, if there's a router/client error, we log this as an 
unknown error.
+      final int statusCode = determineStatusCode(false, response.getStatus());
+      emitQueryTime(requestTimeNs, false, sqlQueryId, queryId, statusCode);
       AuthenticationResult authenticationResult = 
AuthorizationUtils.authenticationResultFromRequest(req);
 
       //noinspection VariableNotUsedInsideIf
@@ -960,4 +965,24 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
       queryMetrics.reportQueryTime(requestTimeNs).emit(emitter);
     }
   }
+
+  /**
+   * Helper method to assign reasonable status codes in ambigious cases like 
client/broker connection errors.
+   *
+   * @param success Whether the query was successful
+   * @param statusCode Status code reported by the broker (or {@value 
UNASSIGNED_DEFAULT_STATUS_CODE})
+   */
+  private static int determineStatusCode(boolean success, int statusCode)
+  {
+    if (success) {
+      if (statusCode == UNASSIGNED_DEFAULT_STATUS_CODE) {
+        statusCode = Status.OK.getStatusCode();
+      }
+    } else {
+      if (statusCode == UNASSIGNED_DEFAULT_STATUS_CODE || statusCode == 
Status.OK.getStatusCode()) {
+        statusCode = Status.INTERNAL_SERVER_ERROR.getStatusCode();
+      }
+    }
+    return statusCode;
+  }
 }
diff --git 
a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
 
b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
index bd5487c6474..372ad96174c 100644
--- 
a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
+++ 
b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
@@ -92,6 +92,7 @@ import org.eclipse.jetty.ee8.servlet.ServletHolder;
 import org.eclipse.jetty.http.HttpField;
 import org.eclipse.jetty.http.HttpFields;
 import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.io.EofException;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.junit.Assert;
@@ -530,6 +531,132 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
     Assert.assertEquals("false", 
stubServiceEmitter.getMetricEvents("query/time").get(0).toMap().get("success"));
   }
 
+  @Test
+  public void testOnCompleteWithClosedException()
+  {
+    final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                        .dataSource("foo")
+                                        .intervals("2000/P1D")
+                                        .granularity(Granularities.ALL)
+                                        .context(ImmutableMap.of("queryId", 
"closed-test"))
+                                        .build();
+
+    final HttpServletRequest requestMock = 
Mockito.mock(HttpServletRequest.class);
+    
Mockito.when(requestMock.getAttribute("org.apache.druid.proxy.avaticaQuery")).thenReturn(null);
+    
Mockito.when(requestMock.getAttribute("org.apache.druid.proxy.query")).thenReturn(query);
+    
Mockito.when(requestMock.getAttribute("org.apache.druid.proxy.sqlQuery")).thenReturn(null);
+    Mockito.when(requestMock.getRemoteAddr()).thenReturn("127.0.0.1");
+    
Mockito.when(requestMock.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+           .thenReturn(new AuthenticationResult("testUser", "basic", "basic", 
null));
+
+    final Request proxyRequestMock = Mockito.mock(Request.class);
+    final Response responseMock = Mockito.mock(Response.class);
+    Mockito.when(responseMock.getStatus()).thenReturn(200); // Status OK
+    Mockito.when(responseMock.getHeaders()).thenReturn(HttpFields.build());
+    Mockito.when(responseMock.getRequest()).thenReturn(proxyRequestMock);
+
+    // Result where connection is closed prematurely
+    final Result result = new Result(proxyRequestMock, responseMock)
+    {
+      @Override
+      public boolean isSucceeded()
+      {
+        return false;
+      }
+
+      @Override
+      public Throwable getFailure()
+      {
+        return new EofException("Stream closed");
+      }
+    };
+
+    final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", 
"");
+    final AsyncQueryForwardingServlet servlet = new 
AsyncQueryForwardingServlet(
+        new MapQueryToolChestWarehouse(ImmutableMap.of()),
+        TestHelper.makeJsonMapper(),
+        TestHelper.makeSmileMapper(),
+        null,
+        null,
+        null,
+        stubServiceEmitter,
+        NoopRequestLogger.instance(),
+        new DefaultGenericQueryMetricsFactory(),
+        new AuthenticatorMapper(ImmutableMap.of()),
+        new Properties(),
+        new ServerConfig()
+    );
+
+    try {
+      servlet.newProxyResponseListener(requestMock, null).onComplete(result);
+    }
+    catch (NullPointerException ignored) {
+    }
+
+    stubServiceEmitter.verifyEmitted("query/time", 1);
+    Assert.assertEquals("closed-test", 
stubServiceEmitter.getEvents().get(0).toMap().get("id"));
+    Assert.assertEquals(
+        500,
+        
stubServiceEmitter.getMetricEvents("query/time").get(0).toMap().get(DruidMetrics.STATUS_CODE)
+    );
+    Assert.assertEquals("false", 
stubServiceEmitter.getMetricEvents("query/time").get(0).toMap().get("success"));
+  }
+
+  @Test
+  public void testOnFailureWithExceptionAndUnassignedStatusCode()
+  {
+    final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                        .dataSource("foo")
+                                        .intervals("2000/P1D")
+                                        .granularity(Granularities.ALL)
+                                        .context(ImmutableMap.of("queryId", 
"zero-status-test"))
+                                        .build();
+
+    final HttpServletRequest requestMock = 
Mockito.mock(HttpServletRequest.class);
+    
Mockito.when(requestMock.getAttribute("org.apache.druid.proxy.avaticaQuery")).thenReturn(null);
+    
Mockito.when(requestMock.getAttribute("org.apache.druid.proxy.query")).thenReturn(query);
+    
Mockito.when(requestMock.getAttribute("org.apache.druid.proxy.sqlQuery")).thenReturn(null);
+    Mockito.when(requestMock.getRemoteAddr()).thenReturn("127.0.0.1");
+    
Mockito.when(requestMock.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+           .thenReturn(new AuthenticationResult("testUser", "basic", "basic", 
null));
+
+    final Response responseMock = Mockito.mock(Response.class);
+    Mockito.when(responseMock.getStatus()).thenReturn(0); // Test unassigned 
http status code case from server
+    Mockito.when(responseMock.getHeaders()).thenReturn(HttpFields.build());
+
+    final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", 
"");
+    final AsyncQueryForwardingServlet servlet = new 
AsyncQueryForwardingServlet(
+        new MapQueryToolChestWarehouse(ImmutableMap.of()),
+        TestHelper.makeJsonMapper(),
+        TestHelper.makeSmileMapper(),
+        null,
+        null,
+        null,
+        stubServiceEmitter,
+        NoopRequestLogger.instance(),
+        new DefaultGenericQueryMetricsFactory(),
+        new AuthenticatorMapper(ImmutableMap.of()),
+        new Properties(),
+        new ServerConfig()
+    );
+
+    try {
+      servlet.newProxyResponseListener(requestMock, null)
+             .onFailure(responseMock, new IOException("Connection reset by 
peer"));
+    }
+    catch (NullPointerException ignored) {
+    }
+
+    stubServiceEmitter.verifyEmitted("query/time", 1);
+    Assert.assertEquals("zero-status-test", 
stubServiceEmitter.getEvents().get(0).toMap().get("id"));
+    Assert.assertEquals(
+        500, // Should default to 500 when status is 0
+        
stubServiceEmitter.getMetricEvents("query/time").get(0).toMap().get(DruidMetrics.STATUS_CODE)
+    );
+    Assert.assertEquals("false", 
stubServiceEmitter.getMetricEvents("query/time").get(0).toMap().get("success"));
+  }
+
+
   @Test
   public void testNoParseExceptionOnGroupByWithFilteredAggregationOnLookups() 
throws Exception
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to