This is an automated email from the ASF dual-hosted git repository.

atul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 780207869b3 Attach user identity to router request logs (#15126)
780207869b3 is described below

commit 780207869b3f3b0792cee253500fb205203c9c12
Author: Atul Mohan <[email protected]>
AuthorDate: Wed Oct 18 19:40:58 2023 -0700

    Attach user identity to router request logs (#15126)
    
    * Attach user identity to router request logs
    
    * Add test
    
    * More tests
---
 .../druid/server/AsyncQueryForwardingServlet.java  | 35 ++++++--
 .../server/AsyncQueryForwardingServletTest.java    | 95 +++++++++++++++++-----
 2 files changed, 102 insertions(+), 28 deletions(-)

diff --git 
a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
 
b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
index 75b13a39f1f..5134a8109b8 100644
--- 
a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
+++ 
b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
@@ -56,6 +56,7 @@ import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.Authenticator;
 import org.apache.druid.server.security.AuthenticatorMapper;
+import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.sql.http.SqlQuery;
 import org.apache.druid.sql.http.SqlResource;
 import org.eclipse.jetty.client.HttpClient;
@@ -303,6 +304,7 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
 
   /**
    * Rebuilds the {@link SqlQuery} object with sqlQueryId and queryId context 
parameters if not present
+   *
    * @param sqlQuery the original SqlQuery
    * @return an updated sqlQuery object with sqlQueryId and queryId context 
parameters
    */
@@ -367,13 +369,16 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
     // Log the error message
     final String errorMessage = exceptionToReport.getMessage() == null
                                 ? "no error message" : 
exceptionToReport.getMessage();
+
+    AuthenticationResult authenticationResult = 
AuthorizationUtils.authenticationResultFromRequest(request);
+
     if (isNativeQuery) {
       requestLogger.logNativeQuery(
           RequestLogLine.forNative(
               null,
               DateTimes.nowUtc(),
               request.getRemoteAddr(),
-              new QueryStats(ImmutableMap.of("success", false, "exception", 
errorMessage))
+              new QueryStats(ImmutableMap.of("success", false, "exception", 
errorMessage, "identity", authenticationResult.getIdentity()))
           )
       );
     } else {
@@ -383,7 +388,7 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
               null,
               DateTimes.nowUtc(),
               request.getRemoteAddr(),
-              new QueryStats(ImmutableMap.of("success", false, "exception", 
errorMessage))
+              new QueryStats(ImmutableMap.of("success", false, "exception", 
errorMessage, "identity", authenticationResult.getIdentity()))
           )
       );
     }
@@ -744,6 +749,8 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
       }
       emitQueryTime(requestTimeNs, success, sqlQueryId, queryId);
 
+      AuthenticationResult authenticationResult = 
AuthorizationUtils.authenticationResultFromRequest(req);
+
       //noinspection VariableNotUsedInsideIf
       if (sqlQueryId != null) {
         // SQL query doesn't have a native query translation in router. Hence, 
not logging the native query.
@@ -761,7 +768,9 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
                             TimeUnit.NANOSECONDS.toMillis(requestTimeNs),
                             "success",
                             success
-                            && result.getResponse().getStatus() == 
Status.OK.getStatusCode()
+                            && result.getResponse().getStatus() == 
Status.OK.getStatusCode(),
+                            "identity",
+                            authenticationResult.getIdentity()
                         )
                     )
                 )
@@ -787,7 +796,9 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
                         TimeUnit.NANOSECONDS.toMillis(requestTimeNs),
                         "success",
                         success
-                        && result.getResponse().getStatus() == 
Status.OK.getStatusCode()
+                        && result.getResponse().getStatus() == 
Status.OK.getStatusCode(),
+                        "identity",
+                        authenticationResult.getIdentity()
                     )
                 )
             )
@@ -824,6 +835,7 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
 
       failedQueryCount.incrementAndGet();
       emitQueryTime(requestTimeNs, false, sqlQueryId, queryId);
+      AuthenticationResult authenticationResult = 
AuthorizationUtils.authenticationResultFromRequest(req);
 
       //noinspection VariableNotUsedInsideIf
       if (sqlQueryId != null) {
@@ -841,7 +853,9 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
                             "success",
                             false,
                             "exception",
-                            errorMessage == null ? "no message" : errorMessage
+                            errorMessage == null ? "no message" : errorMessage,
+                            "identity",
+                            authenticationResult.getIdentity()
                         )
                     )
                 )
@@ -871,7 +885,9 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
                         "success",
                         false,
                         "exception",
-                        errorMessage == null ? "no message" : errorMessage
+                        errorMessage == null ? "no message" : errorMessage,
+                        "identity",
+                        authenticationResult.getIdentity()
                     )
                 )
             )
@@ -890,7 +906,12 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
       super.onFailure(response, failure);
     }
 
-    private void emitQueryTime(long requestTimeNs, boolean success, @Nullable 
String sqlQueryId, @Nullable String queryId)
+    private void emitQueryTime(
+        long requestTimeNs,
+        boolean success,
+        @Nullable String sqlQueryId,
+        @Nullable String queryId
+    )
     {
       QueryMetrics queryMetrics;
       if (sqlQueryId != null) {
diff --git 
a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
 
b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
index 6facaa54778..54238fe8cce 100644
--- 
a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
+++ 
b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
@@ -65,6 +65,8 @@ import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.router.QueryHostFinder;
 import org.apache.druid.server.router.RendezvousHashAvaticaConnectionBalancer;
 import org.apache.druid.server.security.AllowAllAuthorizer;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.AuthenticatorMapper;
 import org.apache.druid.server.security.Authorizer;
 import org.apache.druid.server.security.AuthorizerMapper;
@@ -227,7 +229,7 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
 
     Properties properties = new Properties();
     properties.setProperty("druid.router.sql.enable", "true");
-    verifyServletCallsForQuery(query, true, false, hostFinder, properties);
+    verifyServletCallsForQuery(query, true, false, hostFinder, properties, 
false);
   }
 
   @Test
@@ -244,7 +246,7 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
     EasyMock.expect(hostFinder.pickServer(query)).andReturn(new 
TestServer("http", "1.2.3.4", 9999)).once();
     EasyMock.replay(hostFinder);
 
-    verifyServletCallsForQuery(query, false, false, hostFinder, new 
Properties());
+    verifyServletCallsForQuery(query, false, false, hostFinder, new 
Properties(), false);
   }
 
   @Test
@@ -258,7 +260,7 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
             .once();
     EasyMock.replay(hostFinder);
 
-    verifyServletCallsForQuery(jdbcRequest, false, true, hostFinder, new 
Properties());
+    verifyServletCallsForQuery(jdbcRequest, false, true, hostFinder, new 
Properties(), false);
   }
 
   @Test
@@ -408,6 +410,7 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
         new Properties(),
         new ServerConfig()
     );
+    
Mockito.when(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).thenReturn(new
 AuthenticationResult("userA", "basic", "basic", null));
     IOException testException = new IOException(errorMessage);
     servlet.handleQueryParseException(request, response, mockMapper, 
testException, false);
     ArgumentCaptor<Exception> captor = 
ArgumentCaptor.forClass(Exception.class);
@@ -454,6 +457,7 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
           }
         }
     );
+    
Mockito.when(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).thenReturn(new
 AuthenticationResult("userA", "basic", "basic", null));
     IOException testException = new IOException(errorMessage);
     servlet.handleQueryParseException(request, response, mockMapper, 
testException, false);
     ArgumentCaptor<Exception> captor = 
ArgumentCaptor.forClass(Exception.class);
@@ -501,6 +505,7 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
           }
         }
     );
+    
Mockito.when(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).thenReturn(new
 AuthenticationResult("userA", "basic", "basic", null));
     IOException testException = new IOException(errorMessage);
     servlet.handleQueryParseException(request, response, mockMapper, 
testException, false);
     ArgumentCaptor<Exception> captor = 
ArgumentCaptor.forClass(Exception.class);
@@ -512,6 +517,46 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
     Assert.assertNull(((QueryException) captor.getValue()).getHost());
   }
 
+  @Test
+  public void testNativeQueryProxyFailure() throws Exception
+  {
+    final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                        .dataSource("foo")
+                                        .intervals("2000/P1D")
+                                        .granularity(Granularities.ALL)
+                                        .context(ImmutableMap.of("queryId", 
"dummy"))
+                                        .build();
+
+    final QueryHostFinder hostFinder = 
EasyMock.createMock(QueryHostFinder.class);
+    EasyMock.expect(hostFinder.pickServer(query)).andReturn(new 
TestServer("http", "1.2.3.4", 9999)).once();
+    EasyMock.replay(hostFinder);
+
+    verifyServletCallsForQuery(query, false, false, hostFinder, new 
Properties(), true);
+  }
+
+  @Test
+  public void testSqlQueryProxyFailure() throws Exception
+  {
+    final SqlQuery query = new SqlQuery(
+        "SELECT * FROM foo",
+        ResultFormat.ARRAY,
+        false,
+        false,
+        false,
+        ImmutableMap.of("sqlQueryId", "dummy"),
+        null
+    );
+    final QueryHostFinder hostFinder = 
EasyMock.createMock(QueryHostFinder.class);
+    EasyMock.expect(hostFinder.findServerSql(
+        query.withOverridenContext(ImmutableMap.of("sqlQueryId", "dummy", 
"queryId", "dummy")))
+    ).andReturn(new TestServer("http", "1.2.3.4", 9999)).once();
+    EasyMock.replay(hostFinder);
+
+    Properties properties = new Properties();
+    properties.setProperty("druid.router.sql.enable", "true");
+    verifyServletCallsForQuery(query, true, false, hostFinder, properties, 
true);
+  }
+
   /**
    * Verifies that the Servlet calls the right methods the right number of 
times.
    */
@@ -520,7 +565,8 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
       boolean isNativeSql,
       boolean isJDBCSql,
       QueryHostFinder hostFinder,
-      Properties properties
+      Properties properties,
+      boolean isFailure
   ) throws Exception
   {
     final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
@@ -587,27 +633,30 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
     EasyMock.expectLastCall();
     requestMock.setAttribute("org.apache.druid.proxy.to.host.scheme", "http");
     EasyMock.expectLastCall();
+    
EasyMock.expect(requestMock.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(new
 AuthenticationResult("userA", "basic", "basic", null));
+    if (isFailure) {
+      EasyMock.expect(requestMock.getRemoteAddr()).andReturn("0.0.0.0:0");
+    }
+
     EasyMock.replay(requestMock);
 
     final AtomicLong didService = new AtomicLong();
     final Request proxyRequestMock = Mockito.spy(Request.class);
-    final Result result = new Result(
-        proxyRequestMock,
-        new HttpResponse(proxyRequestMock, ImmutableList.of())
-        {
-          @Override
-          public HttpFields getHeaders()
-          {
-            HttpFields httpFields = new HttpFields();
-            if (isJDBCSql) {
-              httpFields.add(new HttpField("X-Druid-SQL-Query-Id", 
"jdbcDummy"));
-            } else if (isNativeSql) {
-              httpFields.add(new HttpField("X-Druid-SQL-Query-Id", "dummy"));
-            }
-            return httpFields;
-          }
+    HttpResponse response = new HttpResponse(proxyRequestMock, 
ImmutableList.of())
+    {
+      @Override
+      public HttpFields getHeaders()
+      {
+        HttpFields httpFields = new HttpFields();
+        if (isJDBCSql) {
+          httpFields.add(new HttpField("X-Druid-SQL-Query-Id", "jdbcDummy"));
+        } else if (isNativeSql) {
+          httpFields.add(new HttpField("X-Druid-SQL-Query-Id", "dummy"));
         }
-    );
+        return httpFields;
+      }
+    };
+    final Result result = new Result(proxyRequestMock, response);
     final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", 
"");
     final AsyncQueryForwardingServlet servlet = new 
AsyncQueryForwardingServlet(
         new MapQueryToolChestWarehouse(ImmutableMap.of()),
@@ -640,7 +689,11 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
     // partial state of the servlet. Hence, only catching the exact exception 
to avoid possible errors.
     // Further, the metric assertions are also done to ensure that the metrics 
have emitted.
     try {
-      servlet.newProxyResponseListener(requestMock, null).onComplete(result);
+      if (isFailure) {
+        servlet.newProxyResponseListener(requestMock, 
null).onFailure(response, new Throwable("Proxy failed"));
+      } else {
+        servlet.newProxyResponseListener(requestMock, null).onComplete(result);
+      }
     }
     catch (NullPointerException ignored) {
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to