This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 469b4d7a5bd Add `SqlResourceQueryResultPusherFactory` and 
`SqlResourceQueryResultPusher` class (#18381)
469b4d7a5bd is described below

commit 469b4d7a5bd58ac7b2b15845fc34e84feeb39449
Author: Cece Mei <[email protected]>
AuthorDate: Fri Aug 8 00:15:22 2025 -0700

    Add `SqlResourceQueryResultPusherFactory` and 
`SqlResourceQueryResultPusher` class (#18381)
    
    * pusher
    
    * test
---
 .../dart/controller/http/DartSqlResourceTest.java  |  14 +-
 .../org/apache/druid/sql/http/SqlResource.java     | 204 +----------------
 .../sql/http/SqlResourceQueryResultPusher.java     | 189 ++++++++++++++++
 .../http/SqlResourceQueryResultPusherFactory.java  |  77 +++++++
 .../org/apache/druid/sql/http/SqlResourceTest.java | 249 +++++++++++----------
 5 files changed, 415 insertions(+), 318 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
index 2c898e1c227..eba37c273a7 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
@@ -89,6 +89,7 @@ import org.apache.druid.sql.http.ResultFormat;
 import org.apache.druid.sql.http.SqlEngineRegistry;
 import org.apache.druid.sql.http.SqlQuery;
 import org.apache.druid.sql.http.SqlResource;
+import org.apache.druid.sql.http.SqlResourceQueryResultPusherFactory;
 import org.apache.druid.sql.http.SupportedEnginesResponse;
 import org.hamcrest.CoreMatchers;
 import org.junit.jupiter.api.AfterEach;
@@ -218,7 +219,6 @@ public class DartSqlResourceTest extends MSQTestBase
         lifecycleManager
     );
 
-
     final DartSqlEngine engine = new DartSqlEngine(
         new MSQTestControllerContext(
             "did2",
@@ -265,14 +265,16 @@ public class DartSqlResourceTest extends MSQTestBase
     );
 
     sqlResource = new SqlResource(
-        objectMapper,
         CalciteTests.TEST_AUTHORIZER_MAPPER,
-        new ServerConfig() /* currently only used for error transform strategy 
*/,
         lifecycleManager,
         new SqlEngineRegistry(Set.of(engine)),
-        ResponseContextConfig.newConfig(false),
-        DefaultQueryConfig.NIL,
-        SELF_NODE
+        new SqlResourceQueryResultPusherFactory(
+            objectMapper,
+            new ServerConfig(),
+            ResponseContextConfig.newConfig(false),
+            SELF_NODE
+        ),
+        DefaultQueryConfig.NIL
     );
 
     // Setup mocks
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java 
b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
index e38dea70f30..3b35669ac77 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
@@ -19,27 +19,19 @@
 
 package org.apache.druid.sql.http;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import com.sun.jersey.api.core.HttpContext;
-import org.apache.druid.common.exception.SanitizableException;
 import org.apache.druid.error.DruidException;
-import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.DefaultQueryConfig;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.QueryContexts;
-import org.apache.druid.server.DruidNode;
-import org.apache.druid.server.QueryLifecycle;
 import org.apache.druid.server.QueryResource;
-import org.apache.druid.server.QueryResponse;
 import org.apache.druid.server.QueryResultPusher;
-import org.apache.druid.server.ResponseContextConfig;
-import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.AuthorizationResult;
@@ -47,12 +39,10 @@ import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.sql.DirectStatement.ResultSet;
 import org.apache.druid.sql.HttpStatement;
 import org.apache.druid.sql.SqlLifecycleManager;
 import org.apache.druid.sql.SqlLifecycleManager.Cancelable;
 import org.apache.druid.sql.SqlQueryPlus;
-import org.apache.druid.sql.SqlRowTransformer;
 import org.apache.druid.sql.calcite.run.SqlEngine;
 
 import javax.annotation.Nullable;
@@ -68,12 +58,9 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
-import java.io.IOException;
-import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -88,38 +75,30 @@ public class SqlResource
   public static final String SQL_HEADER_VALUE = "yes";
 
   private static final Logger log = new Logger(SqlResource.class);
-  private static final SqlResourceQueryMetricCounter QUERY_METRIC_COUNTER = 
new SqlResourceQueryMetricCounter();
+  public static final SqlResourceQueryMetricCounter QUERY_METRIC_COUNTER = new 
SqlResourceQueryMetricCounter();
 
-  private final ObjectMapper jsonMapper;
   private final AuthorizerMapper authorizerMapper;
-  private final ServerConfig serverConfig;
-  private final ResponseContextConfig responseContextConfig;
-  private final DefaultQueryConfig defaultQueryConfig;
-  private final DruidNode selfNode;
+  private final SqlResourceQueryResultPusherFactory resultPusherFactory;
   private final SqlLifecycleManager sqlLifecycleManager;
   private final SqlEngineRegistry sqlEngineRegistry;
+  private final DefaultQueryConfig defaultQueryConfig;
 
   @VisibleForTesting
   @Inject
   public SqlResource(
-      final ObjectMapper jsonMapper,
       final AuthorizerMapper authorizerMapper,
-      final ServerConfig serverConfig,
       final SqlLifecycleManager sqlLifecycleManager,
       final SqlEngineRegistry sqlEngineRegistry,
-      final ResponseContextConfig responseContextConfig,
-      final DefaultQueryConfig defaultQueryConfig,
-      @Self final DruidNode selfNode
+      final SqlResourceQueryResultPusherFactory resultPusherFactory,
+      final DefaultQueryConfig defaultQueryConfig
   )
   {
+    this.resultPusherFactory = resultPusherFactory;
     this.sqlEngineRegistry = Preconditions.checkNotNull(sqlEngineRegistry, 
"sqlEngineRegistry");
-    this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
     this.authorizerMapper = Preconditions.checkNotNull(authorizerMapper, 
"authorizerMapper");
-    this.serverConfig = Preconditions.checkNotNull(serverConfig, 
"serverConfig");
-    this.responseContextConfig = responseContextConfig;
-    this.defaultQueryConfig = Preconditions.checkNotNull(defaultQueryConfig, 
"defaultQueryConfig");
-    this.selfNode = selfNode;
     this.sqlLifecycleManager = Preconditions.checkNotNull(sqlLifecycleManager, 
"sqlLifecycleManager");
+    this.defaultQueryConfig = Preconditions.checkNotNull(defaultQueryConfig, 
"defaultQueryConfig");
+
   }
 
   @GET
@@ -207,7 +186,9 @@ public class SqlResource
     final String currThreadName = Thread.currentThread().getName();
     try {
       Thread.currentThread().setName(StringUtils.format("sql[%s]", 
stmt.sqlQueryId()));
-      return makePusher(req, stmt, sqlQuery, queryContext).push();
+
+      QueryResultPusher pusher = resultPusherFactory.factorize(req, stmt, 
sqlQuery);
+      return pusher.push();
     }
     finally {
       Thread.currentThread().setName(currThreadName);
@@ -268,169 +249,6 @@ public class SqlResource
     }
   }
 
-  private SqlResourceQueryResultPusher makePusher(
-      HttpServletRequest req,
-      HttpStatement stmt,
-      SqlQuery sqlQuery,
-      QueryContext queryContext
-  )
-  {
-    final String sqlQueryId = stmt.sqlQueryId();
-    Map<String, String> headers = new LinkedHashMap<>();
-    headers.put(SQL_QUERY_ID_RESPONSE_HEADER, sqlQueryId);
-
-    if (sqlQuery.includeHeader()) {
-      headers.put(SQL_HEADER_RESPONSE_HEADER, SQL_HEADER_VALUE);
-    }
-
-    return new SqlResourceQueryResultPusher(req, sqlQueryId, stmt, sqlQuery, 
queryContext, headers);
-  }
-
-  private class SqlResourceQueryResultPusher extends QueryResultPusher
-  {
-    private final String sqlQueryId;
-    private final HttpStatement stmt;
-    private final SqlQuery sqlQuery;
-
-    /**
-     * Context to use for pushing results. May be different from the context 
in SqlQuery due to SET statements.
-     */
-    private final QueryContext queryContext;
-
-    public SqlResourceQueryResultPusher(
-        HttpServletRequest req,
-        String sqlQueryId,
-        HttpStatement stmt,
-        SqlQuery sqlQuery,
-        QueryContext queryContext,
-        Map<String, String> headers
-    )
-    {
-      super(
-          req,
-          jsonMapper,
-          responseContextConfig,
-          selfNode,
-          SqlResource.QUERY_METRIC_COUNTER,
-          sqlQueryId,
-          MediaType.APPLICATION_JSON_TYPE,
-          headers
-      );
-      this.sqlQueryId = sqlQueryId;
-      this.stmt = stmt;
-      this.queryContext = queryContext;
-      this.sqlQuery = sqlQuery;
-    }
-
-    @Override
-    public ResultsWriter start()
-    {
-      return new ResultsWriter()
-      {
-        private QueryResponse<Object[]> queryResponse;
-        private ResultSet thePlan;
-
-        @Override
-        @Nullable
-        public Response.ResponseBuilder start()
-        {
-          thePlan = stmt.plan();
-          queryResponse = thePlan.run();
-          return null;
-        }
-
-        @Override
-        @SuppressWarnings({"unchecked", "rawtypes"})
-        public QueryResponse<Object> getQueryResponse()
-        {
-          return (QueryResponse) queryResponse;
-        }
-
-        @Override
-        public Writer makeWriter(OutputStream out) throws IOException
-        {
-          ResultFormat.Writer writer = 
sqlQuery.getResultFormat().createFormatter(out, jsonMapper);
-          final SqlRowTransformer rowTransformer = 
thePlan.createRowTransformer();
-
-          return new Writer()
-          {
-
-            @Override
-            public void writeResponseStart() throws IOException
-            {
-              writer.writeResponseStart();
-
-              if (sqlQuery.includeHeader()) {
-                writer.writeHeader(
-                    rowTransformer.getRowType(),
-                    sqlQuery.includeTypesHeader(),
-                    sqlQuery.includeSqlTypesHeader()
-                );
-              }
-            }
-
-            @Override
-            public void writeRow(Object obj) throws IOException
-            {
-              Object[] row = (Object[]) obj;
-
-              writer.writeRowStart();
-              for (int i = 0; i < rowTransformer.getFieldList().size(); i++) {
-                final Object value = rowTransformer.transform(row, i);
-                writer.writeRowField(rowTransformer.getFieldList().get(i), 
value);
-              }
-              writer.writeRowEnd();
-            }
-
-            @Override
-            public void writeResponseEnd() throws IOException
-            {
-              writer.writeResponseEnd();
-            }
-
-            @Override
-            public void close() throws IOException
-            {
-              writer.close();
-            }
-          };
-        }
-
-        @Override
-        public void recordSuccess(long numBytes)
-        {
-          stmt.reporter().succeeded(numBytes);
-        }
-
-        @Override
-        public void recordFailure(Exception e)
-        {
-          if (QueryLifecycle.shouldLogStackTrace(e, queryContext)) {
-            log.warn(e, "Exception while processing sqlQueryId[%s]", 
sqlQueryId);
-          } else {
-            log.noStackTrace().warn(e, "Exception while processing 
sqlQueryId[%s]", sqlQueryId);
-          }
-          stmt.reporter().failed(e);
-        }
-
-        @Override
-        public void close()
-        {
-          stmt.close();
-        }
-      };
-    }
-
-    @Override
-    public void writeException(Exception ex, OutputStream out) throws 
IOException
-    {
-      if (ex instanceof SanitizableException) {
-        ex = 
serverConfig.getErrorResponseTransformStrategy().transformIfNeeded((SanitizableException)
 ex);
-      }
-      out.write(jsonMapper.writeValueAsBytes(ex));
-    }
-  }
-
   /**
    * Authorize a query cancellation operation.
    * <p>
diff --git 
a/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusher.java 
b/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusher.java
new file mode 100644
index 00000000000..1cdbbf3a723
--- /dev/null
+++ 
b/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusher.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.common.exception.SanitizableException;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.QueryLifecycle;
+import org.apache.druid.server.QueryResponse;
+import org.apache.druid.server.QueryResultPusher;
+import org.apache.druid.server.ResponseContextConfig;
+import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.sql.DirectStatement;
+import org.apache.druid.sql.HttpStatement;
+import org.apache.druid.sql.SqlRowTransformer;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+class SqlResourceQueryResultPusher extends QueryResultPusher
+{
+
+  private static final Logger log = new 
Logger(SqlResourceQueryResultPusher.class);
+
+  private final ObjectMapper jsonMapper;
+  private final String sqlQueryId;
+  private final HttpStatement stmt;
+  private final SqlQuery sqlQuery;
+  private final ServerConfig serverConfig;
+
+  public SqlResourceQueryResultPusher(
+      final ObjectMapper jsonMapper,
+      final ResponseContextConfig responseContextConfig,
+      final DruidNode selfNode,
+      final ServerConfig serverConfig,
+      final HttpServletRequest req,
+      final HttpStatement stmt,
+      final SqlQuery sqlQuery,
+      final Map<String, String> headers
+  )
+  {
+    super(
+        req,
+        jsonMapper,
+        responseContextConfig,
+        selfNode,
+        SqlResource.QUERY_METRIC_COUNTER,
+        stmt.sqlQueryId(),
+        MediaType.APPLICATION_JSON_TYPE,
+        headers
+    );
+    this.serverConfig = serverConfig;
+    this.jsonMapper = jsonMapper;
+    this.sqlQueryId = stmt.sqlQueryId();
+    this.stmt = stmt;
+    this.sqlQuery = sqlQuery;
+  }
+
+  @Override
+  public ResultsWriter start()
+  {
+    return new ResultsWriter()
+    {
+      private QueryResponse<Object[]> queryResponse;
+      private DirectStatement.ResultSet thePlan;
+
+      @Override
+      @Nullable
+      public Response.ResponseBuilder start()
+      {
+        thePlan = stmt.plan();
+        queryResponse = thePlan.run();
+        return null;
+      }
+
+      @Override
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      public QueryResponse<Object> getQueryResponse()
+      {
+        return (QueryResponse) queryResponse;
+      }
+
+      @Override
+      public Writer makeWriter(OutputStream out) throws IOException
+      {
+        ResultFormat.Writer writer = 
sqlQuery.getResultFormat().createFormatter(out, jsonMapper);
+        final SqlRowTransformer rowTransformer = 
thePlan.createRowTransformer();
+
+        return new Writer()
+        {
+
+          @Override
+          public void writeResponseStart() throws IOException
+          {
+            writer.writeResponseStart();
+
+            if (sqlQuery.includeHeader()) {
+              writer.writeHeader(
+                  rowTransformer.getRowType(),
+                  sqlQuery.includeTypesHeader(),
+                  sqlQuery.includeSqlTypesHeader()
+              );
+            }
+          }
+
+          @Override
+          public void writeRow(Object obj) throws IOException
+          {
+            Object[] row = (Object[]) obj;
+
+            writer.writeRowStart();
+            for (int i = 0; i < rowTransformer.getFieldList().size(); i++) {
+              final Object value = rowTransformer.transform(row, i);
+              writer.writeRowField(rowTransformer.getFieldList().get(i), 
value);
+            }
+            writer.writeRowEnd();
+          }
+
+          @Override
+          public void writeResponseEnd() throws IOException
+          {
+            writer.writeResponseEnd();
+          }
+
+          @Override
+          public void close() throws IOException
+          {
+            writer.close();
+          }
+        };
+      }
+
+      @Override
+      public void recordSuccess(long numBytes)
+      {
+        stmt.reporter().succeeded(numBytes);
+      }
+
+      @Override
+      public void recordFailure(Exception e)
+      {
+        if (QueryLifecycle.shouldLogStackTrace(e, sqlQuery.queryContext())) {
+          log.warn(e, "Exception while processing sqlQueryId[%s]", sqlQueryId);
+        } else {
+          log.noStackTrace().warn(e, "Exception while processing 
sqlQueryId[%s]", sqlQueryId);
+        }
+        stmt.reporter().failed(e);
+      }
+
+      @Override
+      public void close()
+      {
+        stmt.close();
+      }
+    };
+  }
+
+  @Override
+  public void writeException(Exception ex, OutputStream out) throws IOException
+  {
+    if (ex instanceof SanitizableException) {
+      ex = 
serverConfig.getErrorResponseTransformStrategy().transformIfNeeded((SanitizableException)
 ex);
+    }
+    out.write(jsonMapper.writeValueAsBytes(ex));
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusherFactory.java
 
b/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusherFactory.java
new file mode 100644
index 00000000000..70b5953fd7a
--- /dev/null
+++ 
b/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusherFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.ResponseContextConfig;
+import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.sql.HttpStatement;
+
+import javax.servlet.http.HttpServletRequest;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class SqlResourceQueryResultPusherFactory
+{
+  private final ObjectMapper jsonMapper;
+  private final ServerConfig serverConfig;
+  private final ResponseContextConfig responseContextConfig;
+  private final DruidNode selfNode;
+
+  @Inject
+  public SqlResourceQueryResultPusherFactory(
+      ObjectMapper jsonMapper,
+      ServerConfig serverConfig,
+      ResponseContextConfig responseContextConfig,
+      @Self DruidNode selfNode
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.serverConfig = serverConfig;
+    this.responseContextConfig = responseContextConfig;
+    this.selfNode = selfNode;
+  }
+
+  public SqlResourceQueryResultPusher factorize(HttpServletRequest req, 
HttpStatement stmt, SqlQuery sqlQuery)
+  {
+    Map<String, String> headers = new LinkedHashMap<>();
+
+    final String sqlQueryId = stmt.sqlQueryId();
+    headers.put(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER, sqlQueryId);
+
+    if (sqlQuery.includeHeader()) {
+      headers.put(SqlResource.SQL_HEADER_RESPONSE_HEADER, 
SqlResource.SQL_HEADER_VALUE);
+    }
+
+    return new SqlResourceQueryResultPusher(
+        jsonMapper,
+        responseContextConfig,
+        selfNode,
+        serverConfig,
+        req,
+        stmt,
+        sqlQuery,
+        headers
+    );
+  }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java 
b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
index 595939377b7..87e9285a30a 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
@@ -324,14 +324,16 @@ public class SqlResourceTest extends CalciteTestBase
     };
     engine = CalciteTests.createMockSqlEngine(walker, conglomerate, 
sqlStatementFactory);
     resource = new SqlResource(
-        JSON_MAPPER,
         CalciteTests.TEST_AUTHORIZER_MAPPER,
-        new ServerConfig(),
         lifecycleManager,
         new SqlEngineRegistry(Set.of(engine)),
-        TEST_RESPONSE_CONTEXT_CONFIG,
-        DefaultQueryConfig.NIL,
-        DUMMY_DRUID_NODE
+        new SqlResourceQueryResultPusherFactory(
+            JSON_MAPPER,
+            new ServerConfig(),
+            TEST_RESPONSE_CONTEXT_CONFIG,
+            DUMMY_DRUID_NODE
+        ),
+        DefaultQueryConfig.NIL
     );
   }
 
@@ -449,15 +451,20 @@ public class SqlResourceTest extends CalciteTestBase
         )
     );
     final Object observedMissingHeaders = 
response.headers.get("X-Druid-Response-Context").stream()
-                                                           .map(s -> {
-                                                             try {
-                                                               return 
JSON_MAPPER.readValue(s, new TypeReference<Map<String, String>>() {});
-                                                             }
-                                                             catch 
(JsonProcessingException e) {
-                                                               throw new 
RuntimeException(e);
-                                                             }
-                                                           })
-                                                           
.collect(Collectors.toList());
+                                                          .map(s -> {
+                                                            try {
+                                                              return 
JSON_MAPPER.readValue(
+                                                                  s,
+                                                                  new 
TypeReference<Map<String, String>>()
+                                                                  {
+                                                                  }
+                                                              );
+                                                            }
+                                                            catch 
(JsonProcessingException e) {
+                                                              throw new 
RuntimeException(e);
+                                                            }
+                                                          })
+                                                          
.collect(Collectors.toList());
 
     Assert.assertEquals(expectedMissingHeaders, observedMissingHeaders);
 
@@ -593,14 +600,16 @@ public class SqlResourceTest extends CalciteTestBase
 
     // We need to create a new SqlResource instance with our custom 
DefaultQueryConfig
     resource = new SqlResource(
-        JSON_MAPPER,
         CalciteTests.TEST_AUTHORIZER_MAPPER,
-        new ServerConfig(),
         lifecycleManager,
         new SqlEngineRegistry(Set.of(engine)),
-        TEST_RESPONSE_CONTEXT_CONFIG,
-        queryConfigWithTimezone,
-        DUMMY_DRUID_NODE
+        new SqlResourceQueryResultPusherFactory(
+            JSON_MAPPER,
+            new ServerConfig(),
+            TEST_RESPONSE_CONTEXT_CONFIG,
+            DUMMY_DRUID_NODE
+        ),
+        queryConfigWithTimezone
     );
 
     final List<Map<String, Object>> rows = doPost(
@@ -704,50 +713,50 @@ public class SqlResourceTest extends CalciteTestBase
   public void testPivotRowTypePreservedInDecoupledPlanner() throws Exception
   {
     final List<Map<String, Object>> rows = doPost(
-            new SqlQuery(
-                    "SET plannerStrategy='DECOUPLED';" +
-                            " WITH t1 AS (\n" +
-                            "  SELECT *\n" +
-                            "  FROM (\n" +
-                            "    VALUES\n" +
-                            "    ('18-19', 'female', 84),\n" +
-                            "    ('18-19', 'male', 217),\n" +
-                            "    ('20-29', 'female', 321),\n" +
-                            "    ('20-29', 'male', 820),\n" +
-                            "    ('30-39', 'female', 63),\n" +
-                            "    ('30-39', 'male', 449),\n" +
-                            "    ('40-49', 'female', 10),\n" +
-                            "    ('40-49', 'male', 83),\n" +
-                            "    ('50-59', 'female', 2),\n" +
-                            "    ('50-59', 'male', 13)\n" +
-                            "  ) AS data(Age, Gender, Visitors)\n" +
-                            "),\n" +
-                            "t2 AS (\n" +
-                            "  SELECT Age, Gender, CAST(SUM(Visitors) AS 
double) / (SELECT SUM(Visitors) FROM t1) AS Share\n" +
-                            "  FROM t1\n" +
-                            "  GROUP BY 1, 2\n" +
-                            ")\n" +
-                            "SELECT *\n" +
-                            "FROM t2\n" +
-                            "PIVOT (MAX(Share) FOR Gender IN ('female' AS 
Women, 'male' AS Men));",
-                    ResultFormat.OBJECT,
-                    false,
-                    false,
-                    false,
-                    null,
-                    null
-            )
+        new SqlQuery(
+            "SET plannerStrategy='DECOUPLED';" +
+            " WITH t1 AS (\n" +
+            "  SELECT *\n" +
+            "  FROM (\n" +
+            "    VALUES\n" +
+            "    ('18-19', 'female', 84),\n" +
+            "    ('18-19', 'male', 217),\n" +
+            "    ('20-29', 'female', 321),\n" +
+            "    ('20-29', 'male', 820),\n" +
+            "    ('30-39', 'female', 63),\n" +
+            "    ('30-39', 'male', 449),\n" +
+            "    ('40-49', 'female', 10),\n" +
+            "    ('40-49', 'male', 83),\n" +
+            "    ('50-59', 'female', 2),\n" +
+            "    ('50-59', 'male', 13)\n" +
+            "  ) AS data(Age, Gender, Visitors)\n" +
+            "),\n" +
+            "t2 AS (\n" +
+            "  SELECT Age, Gender, CAST(SUM(Visitors) AS double) / (SELECT 
SUM(Visitors) FROM t1) AS Share\n" +
+            "  FROM t1\n" +
+            "  GROUP BY 1, 2\n" +
+            ")\n" +
+            "SELECT *\n" +
+            "FROM t2\n" +
+            "PIVOT (MAX(Share) FOR Gender IN ('female' AS Women, 'male' AS 
Men));",
+            ResultFormat.OBJECT,
+            false,
+            false,
+            false,
+            null,
+            null
+        )
     ).rhs;
 
     Assert.assertEquals(
-            ImmutableList.of(
-                    ImmutableMap.of("Age", "18-19", "Women", 
0.040737148399612025, "Men", 0.1052376333656644),
-                    ImmutableMap.of("Age", "20-29", "Women", 
0.1556741028128031, "Men", 0.3976721629485936),
-                    ImmutableMap.of("Age", "30-39", "Women", 
0.030552861299709022, "Men", 0.2177497575169738),
-                    ImmutableMap.of("Age", "40-49", "Women", 
0.004849660523763337, "Men", 0.040252182347235696),
-                    ImmutableMap.of("Age", "50-59", "Women", 
0.0009699321047526673, "Men", 0.006304558680892337)
-            ),
-            rows
+        ImmutableList.of(
+            ImmutableMap.of("Age", "18-19", "Women", 0.040737148399612025, 
"Men", 0.1052376333656644),
+            ImmutableMap.of("Age", "20-29", "Women", 0.1556741028128031, 
"Men", 0.3976721629485936),
+            ImmutableMap.of("Age", "30-39", "Women", 0.030552861299709022, 
"Men", 0.2177497575169738),
+            ImmutableMap.of("Age", "40-49", "Women", 0.004849660523763337, 
"Men", 0.040252182347235696),
+            ImmutableMap.of("Age", "50-59", "Women", 0.0009699321047526673, 
"Men", 0.006304558680892337)
+        ),
+        rows
     );
   }
 
@@ -755,50 +764,50 @@ public class SqlResourceTest extends CalciteTestBase
   public void testPivotRowTypePreservedInCoupledPlanner() throws Exception
   {
     final List<Map<String, Object>> rows = doPost(
-            new SqlQuery(
-                    "SET plannerStrategy='COUPLED';" +
-                            " WITH t1 AS (\n" +
-                            "  SELECT *\n" +
-                            "  FROM (\n" +
-                            "    VALUES\n" +
-                            "    ('18-19', 'female', 84),\n" +
-                            "    ('18-19', 'male', 217),\n" +
-                            "    ('20-29', 'female', 321),\n" +
-                            "    ('20-29', 'male', 820),\n" +
-                            "    ('30-39', 'female', 63),\n" +
-                            "    ('30-39', 'male', 449),\n" +
-                            "    ('40-49', 'female', 10),\n" +
-                            "    ('40-49', 'male', 83),\n" +
-                            "    ('50-59', 'female', 2),\n" +
-                            "    ('50-59', 'male', 13)\n" +
-                            "  ) AS data(Age, Gender, Visitors)\n" +
-                            "),\n" +
-                            "t2 AS (\n" +
-                            "  SELECT Age, Gender, CAST(SUM(Visitors) AS 
double) / (SELECT SUM(Visitors) FROM t1) AS Share\n" +
-                            "  FROM t1\n" +
-                            "  GROUP BY 1, 2\n" +
-                            ")\n" +
-                            "SELECT *\n" +
-                            "FROM t2\n" +
-                            "PIVOT (MAX(Share) FOR Gender IN ('female' AS 
Women, 'male' AS Men));",
-                    ResultFormat.OBJECT,
-                    false,
-                    false,
-                    false,
-                    null,
-                    null
-            )
+        new SqlQuery(
+            "SET plannerStrategy='COUPLED';" +
+            " WITH t1 AS (\n" +
+            "  SELECT *\n" +
+            "  FROM (\n" +
+            "    VALUES\n" +
+            "    ('18-19', 'female', 84),\n" +
+            "    ('18-19', 'male', 217),\n" +
+            "    ('20-29', 'female', 321),\n" +
+            "    ('20-29', 'male', 820),\n" +
+            "    ('30-39', 'female', 63),\n" +
+            "    ('30-39', 'male', 449),\n" +
+            "    ('40-49', 'female', 10),\n" +
+            "    ('40-49', 'male', 83),\n" +
+            "    ('50-59', 'female', 2),\n" +
+            "    ('50-59', 'male', 13)\n" +
+            "  ) AS data(Age, Gender, Visitors)\n" +
+            "),\n" +
+            "t2 AS (\n" +
+            "  SELECT Age, Gender, CAST(SUM(Visitors) AS double) / (SELECT 
SUM(Visitors) FROM t1) AS Share\n" +
+            "  FROM t1\n" +
+            "  GROUP BY 1, 2\n" +
+            ")\n" +
+            "SELECT *\n" +
+            "FROM t2\n" +
+            "PIVOT (MAX(Share) FOR Gender IN ('female' AS Women, 'male' AS 
Men));",
+            ResultFormat.OBJECT,
+            false,
+            false,
+            false,
+            null,
+            null
+        )
     ).rhs;
 
     Assert.assertEquals(
-            ImmutableList.of(
-                    ImmutableMap.of("Age", "18-19", "Women", 
0.040737148399612025, "Men", 0.1052376333656644),
-                    ImmutableMap.of("Age", "20-29", "Women", 
0.1556741028128031, "Men", 0.3976721629485936),
-                    ImmutableMap.of("Age", "30-39", "Women", 
0.030552861299709022, "Men", 0.2177497575169738),
-                    ImmutableMap.of("Age", "40-49", "Women", 
0.004849660523763337, "Men", 0.040252182347235696),
-                    ImmutableMap.of("Age", "50-59", "Women", 
0.0009699321047526673, "Men", 0.006304558680892337)
-            ),
-            rows
+        ImmutableList.of(
+            ImmutableMap.of("Age", "18-19", "Women", 0.040737148399612025, 
"Men", 0.1052376333656644),
+            ImmutableMap.of("Age", "20-29", "Women", 0.1556741028128031, 
"Men", 0.3976721629485936),
+            ImmutableMap.of("Age", "30-39", "Women", 0.030552861299709022, 
"Men", 0.2177497575169738),
+            ImmutableMap.of("Age", "40-49", "Women", 0.004849660523763337, 
"Men", 0.040252182347235696),
+            ImmutableMap.of("Age", "50-59", "Women", 0.0009699321047526673, 
"Men", 0.006304558680892337)
+        ),
+        rows
     );
   }
 
@@ -1689,27 +1698,29 @@ public class SqlResourceTest extends CalciteTestBase
   public void testUnsupportedQueryThrowsExceptionWithFilterResponse() throws 
Exception
   {
     resource = new SqlResource(
-        JSON_MAPPER,
         CalciteTests.TEST_AUTHORIZER_MAPPER,
-        new ServerConfig()
-        {
-          @Override
-          public boolean isShowDetailedJettyErrors()
-          {
-            return true;
-          }
-
-          @Override
-          public ErrorResponseTransformStrategy 
getErrorResponseTransformStrategy()
-          {
-            return new 
AllowedRegexErrorResponseTransformStrategy(ImmutableList.of());
-          }
-        },
         lifecycleManager,
         new SqlEngineRegistry(Set.of(engine)),
-        TEST_RESPONSE_CONTEXT_CONFIG,
-        DefaultQueryConfig.NIL,
-        DUMMY_DRUID_NODE
+        new SqlResourceQueryResultPusherFactory(
+            JSON_MAPPER,
+            new ServerConfig()
+            {
+              @Override
+              public boolean isShowDetailedJettyErrors()
+              {
+                return true;
+              }
+
+              @Override
+              public ErrorResponseTransformStrategy 
getErrorResponseTransformStrategy()
+              {
+                return new 
AllowedRegexErrorResponseTransformStrategy(ImmutableList.of());
+              }
+            },
+            TEST_RESPONSE_CONTEXT_CONFIG,
+            DUMMY_DRUID_NODE
+        ),
+        DefaultQueryConfig.NIL
     );
 
     String errorMessage = "This will be supported in Druid 9999";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to