This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 469b4d7a5bd Add `SqlResourceQueryResultPusherFactory` and
`SqlResourceQueryResultPusher` class (#18381)
469b4d7a5bd is described below
commit 469b4d7a5bd58ac7b2b15845fc34e84feeb39449
Author: Cece Mei <[email protected]>
AuthorDate: Fri Aug 8 00:15:22 2025 -0700
Add `SqlResourceQueryResultPusherFactory` and
`SqlResourceQueryResultPusher` class (#18381)
* pusher
* test
---
.../dart/controller/http/DartSqlResourceTest.java | 14 +-
.../org/apache/druid/sql/http/SqlResource.java | 204 +----------------
.../sql/http/SqlResourceQueryResultPusher.java | 189 ++++++++++++++++
.../http/SqlResourceQueryResultPusherFactory.java | 77 +++++++
.../org/apache/druid/sql/http/SqlResourceTest.java | 249 +++++++++++----------
5 files changed, 415 insertions(+), 318 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
index 2c898e1c227..eba37c273a7 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
@@ -89,6 +89,7 @@ import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.sql.http.SqlEngineRegistry;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.sql.http.SqlResource;
+import org.apache.druid.sql.http.SqlResourceQueryResultPusherFactory;
import org.apache.druid.sql.http.SupportedEnginesResponse;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.AfterEach;
@@ -218,7 +219,6 @@ public class DartSqlResourceTest extends MSQTestBase
lifecycleManager
);
-
final DartSqlEngine engine = new DartSqlEngine(
new MSQTestControllerContext(
"did2",
@@ -265,14 +265,16 @@ public class DartSqlResourceTest extends MSQTestBase
);
sqlResource = new SqlResource(
- objectMapper,
CalciteTests.TEST_AUTHORIZER_MAPPER,
- new ServerConfig() /* currently only used for error transform strategy
*/,
lifecycleManager,
new SqlEngineRegistry(Set.of(engine)),
- ResponseContextConfig.newConfig(false),
- DefaultQueryConfig.NIL,
- SELF_NODE
+ new SqlResourceQueryResultPusherFactory(
+ objectMapper,
+ new ServerConfig(),
+ ResponseContextConfig.newConfig(false),
+ SELF_NODE
+ ),
+ DefaultQueryConfig.NIL
);
// Setup mocks
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
index e38dea70f30..3b35669ac77 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
@@ -19,27 +19,19 @@
package org.apache.druid.sql.http;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.sun.jersey.api.core.HttpContext;
-import org.apache.druid.common.exception.SanitizableException;
import org.apache.druid.error.DruidException;
-import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
-import org.apache.druid.server.DruidNode;
-import org.apache.druid.server.QueryLifecycle;
import org.apache.druid.server.QueryResource;
-import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.QueryResultPusher;
-import org.apache.druid.server.ResponseContextConfig;
-import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizationResult;
@@ -47,12 +39,10 @@ import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.sql.DirectStatement.ResultSet;
import org.apache.druid.sql.HttpStatement;
import org.apache.druid.sql.SqlLifecycleManager;
import org.apache.druid.sql.SqlLifecycleManager.Cancelable;
import org.apache.druid.sql.SqlQueryPlus;
-import org.apache.druid.sql.SqlRowTransformer;
import org.apache.druid.sql.calcite.run.SqlEngine;
import javax.annotation.Nullable;
@@ -68,12 +58,9 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
-import java.io.IOException;
-import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -88,38 +75,30 @@ public class SqlResource
public static final String SQL_HEADER_VALUE = "yes";
private static final Logger log = new Logger(SqlResource.class);
- private static final SqlResourceQueryMetricCounter QUERY_METRIC_COUNTER =
new SqlResourceQueryMetricCounter();
+ public static final SqlResourceQueryMetricCounter QUERY_METRIC_COUNTER = new
SqlResourceQueryMetricCounter();
- private final ObjectMapper jsonMapper;
private final AuthorizerMapper authorizerMapper;
- private final ServerConfig serverConfig;
- private final ResponseContextConfig responseContextConfig;
- private final DefaultQueryConfig defaultQueryConfig;
- private final DruidNode selfNode;
+ private final SqlResourceQueryResultPusherFactory resultPusherFactory;
private final SqlLifecycleManager sqlLifecycleManager;
private final SqlEngineRegistry sqlEngineRegistry;
+ private final DefaultQueryConfig defaultQueryConfig;
@VisibleForTesting
@Inject
public SqlResource(
- final ObjectMapper jsonMapper,
final AuthorizerMapper authorizerMapper,
- final ServerConfig serverConfig,
final SqlLifecycleManager sqlLifecycleManager,
final SqlEngineRegistry sqlEngineRegistry,
- final ResponseContextConfig responseContextConfig,
- final DefaultQueryConfig defaultQueryConfig,
- @Self final DruidNode selfNode
+ final SqlResourceQueryResultPusherFactory resultPusherFactory,
+ final DefaultQueryConfig defaultQueryConfig
)
{
+ this.resultPusherFactory = resultPusherFactory;
this.sqlEngineRegistry = Preconditions.checkNotNull(sqlEngineRegistry,
"sqlEngineRegistry");
- this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
this.authorizerMapper = Preconditions.checkNotNull(authorizerMapper,
"authorizerMapper");
- this.serverConfig = Preconditions.checkNotNull(serverConfig,
"serverConfig");
- this.responseContextConfig = responseContextConfig;
- this.defaultQueryConfig = Preconditions.checkNotNull(defaultQueryConfig,
"defaultQueryConfig");
- this.selfNode = selfNode;
this.sqlLifecycleManager = Preconditions.checkNotNull(sqlLifecycleManager,
"sqlLifecycleManager");
+ this.defaultQueryConfig = Preconditions.checkNotNull(defaultQueryConfig,
"defaultQueryConfig");
+
}
@GET
@@ -207,7 +186,9 @@ public class SqlResource
final String currThreadName = Thread.currentThread().getName();
try {
Thread.currentThread().setName(StringUtils.format("sql[%s]",
stmt.sqlQueryId()));
- return makePusher(req, stmt, sqlQuery, queryContext).push();
+
+ QueryResultPusher pusher = resultPusherFactory.factorize(req, stmt,
sqlQuery);
+ return pusher.push();
}
finally {
Thread.currentThread().setName(currThreadName);
@@ -268,169 +249,6 @@ public class SqlResource
}
}
- private SqlResourceQueryResultPusher makePusher(
- HttpServletRequest req,
- HttpStatement stmt,
- SqlQuery sqlQuery,
- QueryContext queryContext
- )
- {
- final String sqlQueryId = stmt.sqlQueryId();
- Map<String, String> headers = new LinkedHashMap<>();
- headers.put(SQL_QUERY_ID_RESPONSE_HEADER, sqlQueryId);
-
- if (sqlQuery.includeHeader()) {
- headers.put(SQL_HEADER_RESPONSE_HEADER, SQL_HEADER_VALUE);
- }
-
- return new SqlResourceQueryResultPusher(req, sqlQueryId, stmt, sqlQuery,
queryContext, headers);
- }
-
- private class SqlResourceQueryResultPusher extends QueryResultPusher
- {
- private final String sqlQueryId;
- private final HttpStatement stmt;
- private final SqlQuery sqlQuery;
-
- /**
- * Context to use for pushing results. May be different from the context
in SqlQuery due to SET statements.
- */
- private final QueryContext queryContext;
-
- public SqlResourceQueryResultPusher(
- HttpServletRequest req,
- String sqlQueryId,
- HttpStatement stmt,
- SqlQuery sqlQuery,
- QueryContext queryContext,
- Map<String, String> headers
- )
- {
- super(
- req,
- jsonMapper,
- responseContextConfig,
- selfNode,
- SqlResource.QUERY_METRIC_COUNTER,
- sqlQueryId,
- MediaType.APPLICATION_JSON_TYPE,
- headers
- );
- this.sqlQueryId = sqlQueryId;
- this.stmt = stmt;
- this.queryContext = queryContext;
- this.sqlQuery = sqlQuery;
- }
-
- @Override
- public ResultsWriter start()
- {
- return new ResultsWriter()
- {
- private QueryResponse<Object[]> queryResponse;
- private ResultSet thePlan;
-
- @Override
- @Nullable
- public Response.ResponseBuilder start()
- {
- thePlan = stmt.plan();
- queryResponse = thePlan.run();
- return null;
- }
-
- @Override
- @SuppressWarnings({"unchecked", "rawtypes"})
- public QueryResponse<Object> getQueryResponse()
- {
- return (QueryResponse) queryResponse;
- }
-
- @Override
- public Writer makeWriter(OutputStream out) throws IOException
- {
- ResultFormat.Writer writer =
sqlQuery.getResultFormat().createFormatter(out, jsonMapper);
- final SqlRowTransformer rowTransformer =
thePlan.createRowTransformer();
-
- return new Writer()
- {
-
- @Override
- public void writeResponseStart() throws IOException
- {
- writer.writeResponseStart();
-
- if (sqlQuery.includeHeader()) {
- writer.writeHeader(
- rowTransformer.getRowType(),
- sqlQuery.includeTypesHeader(),
- sqlQuery.includeSqlTypesHeader()
- );
- }
- }
-
- @Override
- public void writeRow(Object obj) throws IOException
- {
- Object[] row = (Object[]) obj;
-
- writer.writeRowStart();
- for (int i = 0; i < rowTransformer.getFieldList().size(); i++) {
- final Object value = rowTransformer.transform(row, i);
- writer.writeRowField(rowTransformer.getFieldList().get(i),
value);
- }
- writer.writeRowEnd();
- }
-
- @Override
- public void writeResponseEnd() throws IOException
- {
- writer.writeResponseEnd();
- }
-
- @Override
- public void close() throws IOException
- {
- writer.close();
- }
- };
- }
-
- @Override
- public void recordSuccess(long numBytes)
- {
- stmt.reporter().succeeded(numBytes);
- }
-
- @Override
- public void recordFailure(Exception e)
- {
- if (QueryLifecycle.shouldLogStackTrace(e, queryContext)) {
- log.warn(e, "Exception while processing sqlQueryId[%s]",
sqlQueryId);
- } else {
- log.noStackTrace().warn(e, "Exception while processing
sqlQueryId[%s]", sqlQueryId);
- }
- stmt.reporter().failed(e);
- }
-
- @Override
- public void close()
- {
- stmt.close();
- }
- };
- }
-
- @Override
- public void writeException(Exception ex, OutputStream out) throws
IOException
- {
- if (ex instanceof SanitizableException) {
- ex =
serverConfig.getErrorResponseTransformStrategy().transformIfNeeded((SanitizableException)
ex);
- }
- out.write(jsonMapper.writeValueAsBytes(ex));
- }
- }
-
/**
* Authorize a query cancellation operation.
* <p>
diff --git
a/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusher.java
b/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusher.java
new file mode 100644
index 00000000000..1cdbbf3a723
--- /dev/null
+++
b/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusher.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.common.exception.SanitizableException;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.QueryLifecycle;
+import org.apache.druid.server.QueryResponse;
+import org.apache.druid.server.QueryResultPusher;
+import org.apache.druid.server.ResponseContextConfig;
+import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.sql.DirectStatement;
+import org.apache.druid.sql.HttpStatement;
+import org.apache.druid.sql.SqlRowTransformer;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+class SqlResourceQueryResultPusher extends QueryResultPusher
+{
+
+ private static final Logger log = new
Logger(SqlResourceQueryResultPusher.class);
+
+ private final ObjectMapper jsonMapper;
+ private final String sqlQueryId;
+ private final HttpStatement stmt;
+ private final SqlQuery sqlQuery;
+ private final ServerConfig serverConfig;
+
+ public SqlResourceQueryResultPusher(
+ final ObjectMapper jsonMapper,
+ final ResponseContextConfig responseContextConfig,
+ final DruidNode selfNode,
+ final ServerConfig serverConfig,
+ final HttpServletRequest req,
+ final HttpStatement stmt,
+ final SqlQuery sqlQuery,
+ final Map<String, String> headers
+ )
+ {
+ super(
+ req,
+ jsonMapper,
+ responseContextConfig,
+ selfNode,
+ SqlResource.QUERY_METRIC_COUNTER,
+ stmt.sqlQueryId(),
+ MediaType.APPLICATION_JSON_TYPE,
+ headers
+ );
+ this.serverConfig = serverConfig;
+ this.jsonMapper = jsonMapper;
+ this.sqlQueryId = stmt.sqlQueryId();
+ this.stmt = stmt;
+ this.sqlQuery = sqlQuery;
+ }
+
+ @Override
+ public ResultsWriter start()
+ {
+ return new ResultsWriter()
+ {
+ private QueryResponse<Object[]> queryResponse;
+ private DirectStatement.ResultSet thePlan;
+
+ @Override
+ @Nullable
+ public Response.ResponseBuilder start()
+ {
+ thePlan = stmt.plan();
+ queryResponse = thePlan.run();
+ return null;
+ }
+
+ @Override
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public QueryResponse<Object> getQueryResponse()
+ {
+ return (QueryResponse) queryResponse;
+ }
+
+ @Override
+ public Writer makeWriter(OutputStream out) throws IOException
+ {
+ ResultFormat.Writer writer =
sqlQuery.getResultFormat().createFormatter(out, jsonMapper);
+ final SqlRowTransformer rowTransformer =
thePlan.createRowTransformer();
+
+ return new Writer()
+ {
+
+ @Override
+ public void writeResponseStart() throws IOException
+ {
+ writer.writeResponseStart();
+
+ if (sqlQuery.includeHeader()) {
+ writer.writeHeader(
+ rowTransformer.getRowType(),
+ sqlQuery.includeTypesHeader(),
+ sqlQuery.includeSqlTypesHeader()
+ );
+ }
+ }
+
+ @Override
+ public void writeRow(Object obj) throws IOException
+ {
+ Object[] row = (Object[]) obj;
+
+ writer.writeRowStart();
+ for (int i = 0; i < rowTransformer.getFieldList().size(); i++) {
+ final Object value = rowTransformer.transform(row, i);
+ writer.writeRowField(rowTransformer.getFieldList().get(i),
value);
+ }
+ writer.writeRowEnd();
+ }
+
+ @Override
+ public void writeResponseEnd() throws IOException
+ {
+ writer.writeResponseEnd();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ writer.close();
+ }
+ };
+ }
+
+ @Override
+ public void recordSuccess(long numBytes)
+ {
+ stmt.reporter().succeeded(numBytes);
+ }
+
+ @Override
+ public void recordFailure(Exception e)
+ {
+ if (QueryLifecycle.shouldLogStackTrace(e, sqlQuery.queryContext())) {
+ log.warn(e, "Exception while processing sqlQueryId[%s]", sqlQueryId);
+ } else {
+ log.noStackTrace().warn(e, "Exception while processing
sqlQueryId[%s]", sqlQueryId);
+ }
+ stmt.reporter().failed(e);
+ }
+
+ @Override
+ public void close()
+ {
+ stmt.close();
+ }
+ };
+ }
+
+ @Override
+ public void writeException(Exception ex, OutputStream out) throws IOException
+ {
+ if (ex instanceof SanitizableException) {
+ ex =
serverConfig.getErrorResponseTransformStrategy().transformIfNeeded((SanitizableException)
ex);
+ }
+ out.write(jsonMapper.writeValueAsBytes(ex));
+ }
+}
diff --git
a/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusherFactory.java
b/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusherFactory.java
new file mode 100644
index 00000000000..70b5953fd7a
--- /dev/null
+++
b/sql/src/main/java/org/apache/druid/sql/http/SqlResourceQueryResultPusherFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.ResponseContextConfig;
+import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.sql.HttpStatement;
+
+import javax.servlet.http.HttpServletRequest;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class SqlResourceQueryResultPusherFactory
+{
+ private final ObjectMapper jsonMapper;
+ private final ServerConfig serverConfig;
+ private final ResponseContextConfig responseContextConfig;
+ private final DruidNode selfNode;
+
+ @Inject
+ public SqlResourceQueryResultPusherFactory(
+ ObjectMapper jsonMapper,
+ ServerConfig serverConfig,
+ ResponseContextConfig responseContextConfig,
+ @Self DruidNode selfNode
+ )
+ {
+ this.jsonMapper = jsonMapper;
+ this.serverConfig = serverConfig;
+ this.responseContextConfig = responseContextConfig;
+ this.selfNode = selfNode;
+ }
+
+ public SqlResourceQueryResultPusher factorize(HttpServletRequest req,
HttpStatement stmt, SqlQuery sqlQuery)
+ {
+ Map<String, String> headers = new LinkedHashMap<>();
+
+ final String sqlQueryId = stmt.sqlQueryId();
+ headers.put(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER, sqlQueryId);
+
+ if (sqlQuery.includeHeader()) {
+ headers.put(SqlResource.SQL_HEADER_RESPONSE_HEADER,
SqlResource.SQL_HEADER_VALUE);
+ }
+
+ return new SqlResourceQueryResultPusher(
+ jsonMapper,
+ responseContextConfig,
+ selfNode,
+ serverConfig,
+ req,
+ stmt,
+ sqlQuery,
+ headers
+ );
+ }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
index 595939377b7..87e9285a30a 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
@@ -324,14 +324,16 @@ public class SqlResourceTest extends CalciteTestBase
};
engine = CalciteTests.createMockSqlEngine(walker, conglomerate,
sqlStatementFactory);
resource = new SqlResource(
- JSON_MAPPER,
CalciteTests.TEST_AUTHORIZER_MAPPER,
- new ServerConfig(),
lifecycleManager,
new SqlEngineRegistry(Set.of(engine)),
- TEST_RESPONSE_CONTEXT_CONFIG,
- DefaultQueryConfig.NIL,
- DUMMY_DRUID_NODE
+ new SqlResourceQueryResultPusherFactory(
+ JSON_MAPPER,
+ new ServerConfig(),
+ TEST_RESPONSE_CONTEXT_CONFIG,
+ DUMMY_DRUID_NODE
+ ),
+ DefaultQueryConfig.NIL
);
}
@@ -449,15 +451,20 @@ public class SqlResourceTest extends CalciteTestBase
)
);
final Object observedMissingHeaders =
response.headers.get("X-Druid-Response-Context").stream()
- .map(s -> {
- try {
- return
JSON_MAPPER.readValue(s, new TypeReference<Map<String, String>>() {});
- }
- catch
(JsonProcessingException e) {
- throw new
RuntimeException(e);
- }
- })
-
.collect(Collectors.toList());
+ .map(s -> {
+ try {
+ return
JSON_MAPPER.readValue(
+ s,
+ new
TypeReference<Map<String, String>>()
+ {
+ }
+ );
+ }
+ catch
(JsonProcessingException e) {
+ throw new
RuntimeException(e);
+ }
+ })
+
.collect(Collectors.toList());
Assert.assertEquals(expectedMissingHeaders, observedMissingHeaders);
@@ -593,14 +600,16 @@ public class SqlResourceTest extends CalciteTestBase
// We need to create a new SqlResource instance with our custom
DefaultQueryConfig
resource = new SqlResource(
- JSON_MAPPER,
CalciteTests.TEST_AUTHORIZER_MAPPER,
- new ServerConfig(),
lifecycleManager,
new SqlEngineRegistry(Set.of(engine)),
- TEST_RESPONSE_CONTEXT_CONFIG,
- queryConfigWithTimezone,
- DUMMY_DRUID_NODE
+ new SqlResourceQueryResultPusherFactory(
+ JSON_MAPPER,
+ new ServerConfig(),
+ TEST_RESPONSE_CONTEXT_CONFIG,
+ DUMMY_DRUID_NODE
+ ),
+ queryConfigWithTimezone
);
final List<Map<String, Object>> rows = doPost(
@@ -704,50 +713,50 @@ public class SqlResourceTest extends CalciteTestBase
public void testPivotRowTypePreservedInDecoupledPlanner() throws Exception
{
final List<Map<String, Object>> rows = doPost(
- new SqlQuery(
- "SET plannerStrategy='DECOUPLED';" +
- " WITH t1 AS (\n" +
- " SELECT *\n" +
- " FROM (\n" +
- " VALUES\n" +
- " ('18-19', 'female', 84),\n" +
- " ('18-19', 'male', 217),\n" +
- " ('20-29', 'female', 321),\n" +
- " ('20-29', 'male', 820),\n" +
- " ('30-39', 'female', 63),\n" +
- " ('30-39', 'male', 449),\n" +
- " ('40-49', 'female', 10),\n" +
- " ('40-49', 'male', 83),\n" +
- " ('50-59', 'female', 2),\n" +
- " ('50-59', 'male', 13)\n" +
- " ) AS data(Age, Gender, Visitors)\n" +
- "),\n" +
- "t2 AS (\n" +
- " SELECT Age, Gender, CAST(SUM(Visitors) AS
double) / (SELECT SUM(Visitors) FROM t1) AS Share\n" +
- " FROM t1\n" +
- " GROUP BY 1, 2\n" +
- ")\n" +
- "SELECT *\n" +
- "FROM t2\n" +
- "PIVOT (MAX(Share) FOR Gender IN ('female' AS
Women, 'male' AS Men));",
- ResultFormat.OBJECT,
- false,
- false,
- false,
- null,
- null
- )
+ new SqlQuery(
+ "SET plannerStrategy='DECOUPLED';" +
+ " WITH t1 AS (\n" +
+ " SELECT *\n" +
+ " FROM (\n" +
+ " VALUES\n" +
+ " ('18-19', 'female', 84),\n" +
+ " ('18-19', 'male', 217),\n" +
+ " ('20-29', 'female', 321),\n" +
+ " ('20-29', 'male', 820),\n" +
+ " ('30-39', 'female', 63),\n" +
+ " ('30-39', 'male', 449),\n" +
+ " ('40-49', 'female', 10),\n" +
+ " ('40-49', 'male', 83),\n" +
+ " ('50-59', 'female', 2),\n" +
+ " ('50-59', 'male', 13)\n" +
+ " ) AS data(Age, Gender, Visitors)\n" +
+ "),\n" +
+ "t2 AS (\n" +
+ " SELECT Age, Gender, CAST(SUM(Visitors) AS double) / (SELECT
SUM(Visitors) FROM t1) AS Share\n" +
+ " FROM t1\n" +
+ " GROUP BY 1, 2\n" +
+ ")\n" +
+ "SELECT *\n" +
+ "FROM t2\n" +
+ "PIVOT (MAX(Share) FOR Gender IN ('female' AS Women, 'male' AS
Men));",
+ ResultFormat.OBJECT,
+ false,
+ false,
+ false,
+ null,
+ null
+ )
).rhs;
Assert.assertEquals(
- ImmutableList.of(
- ImmutableMap.of("Age", "18-19", "Women",
0.040737148399612025, "Men", 0.1052376333656644),
- ImmutableMap.of("Age", "20-29", "Women",
0.1556741028128031, "Men", 0.3976721629485936),
- ImmutableMap.of("Age", "30-39", "Women",
0.030552861299709022, "Men", 0.2177497575169738),
- ImmutableMap.of("Age", "40-49", "Women",
0.004849660523763337, "Men", 0.040252182347235696),
- ImmutableMap.of("Age", "50-59", "Women",
0.0009699321047526673, "Men", 0.006304558680892337)
- ),
- rows
+ ImmutableList.of(
+ ImmutableMap.of("Age", "18-19", "Women", 0.040737148399612025,
"Men", 0.1052376333656644),
+ ImmutableMap.of("Age", "20-29", "Women", 0.1556741028128031,
"Men", 0.3976721629485936),
+ ImmutableMap.of("Age", "30-39", "Women", 0.030552861299709022,
"Men", 0.2177497575169738),
+ ImmutableMap.of("Age", "40-49", "Women", 0.004849660523763337,
"Men", 0.040252182347235696),
+ ImmutableMap.of("Age", "50-59", "Women", 0.0009699321047526673,
"Men", 0.006304558680892337)
+ ),
+ rows
);
}
@@ -755,50 +764,50 @@ public class SqlResourceTest extends CalciteTestBase
public void testPivotRowTypePreservedInCoupledPlanner() throws Exception
{
final List<Map<String, Object>> rows = doPost(
- new SqlQuery(
- "SET plannerStrategy='COUPLED';" +
- " WITH t1 AS (\n" +
- " SELECT *\n" +
- " FROM (\n" +
- " VALUES\n" +
- " ('18-19', 'female', 84),\n" +
- " ('18-19', 'male', 217),\n" +
- " ('20-29', 'female', 321),\n" +
- " ('20-29', 'male', 820),\n" +
- " ('30-39', 'female', 63),\n" +
- " ('30-39', 'male', 449),\n" +
- " ('40-49', 'female', 10),\n" +
- " ('40-49', 'male', 83),\n" +
- " ('50-59', 'female', 2),\n" +
- " ('50-59', 'male', 13)\n" +
- " ) AS data(Age, Gender, Visitors)\n" +
- "),\n" +
- "t2 AS (\n" +
- " SELECT Age, Gender, CAST(SUM(Visitors) AS
double) / (SELECT SUM(Visitors) FROM t1) AS Share\n" +
- " FROM t1\n" +
- " GROUP BY 1, 2\n" +
- ")\n" +
- "SELECT *\n" +
- "FROM t2\n" +
- "PIVOT (MAX(Share) FOR Gender IN ('female' AS
Women, 'male' AS Men));",
- ResultFormat.OBJECT,
- false,
- false,
- false,
- null,
- null
- )
+ new SqlQuery(
+ "SET plannerStrategy='COUPLED';" +
+ " WITH t1 AS (\n" +
+ " SELECT *\n" +
+ " FROM (\n" +
+ " VALUES\n" +
+ " ('18-19', 'female', 84),\n" +
+ " ('18-19', 'male', 217),\n" +
+ " ('20-29', 'female', 321),\n" +
+ " ('20-29', 'male', 820),\n" +
+ " ('30-39', 'female', 63),\n" +
+ " ('30-39', 'male', 449),\n" +
+ " ('40-49', 'female', 10),\n" +
+ " ('40-49', 'male', 83),\n" +
+ " ('50-59', 'female', 2),\n" +
+ " ('50-59', 'male', 13)\n" +
+ " ) AS data(Age, Gender, Visitors)\n" +
+ "),\n" +
+ "t2 AS (\n" +
+ " SELECT Age, Gender, CAST(SUM(Visitors) AS double) / (SELECT
SUM(Visitors) FROM t1) AS Share\n" +
+ " FROM t1\n" +
+ " GROUP BY 1, 2\n" +
+ ")\n" +
+ "SELECT *\n" +
+ "FROM t2\n" +
+ "PIVOT (MAX(Share) FOR Gender IN ('female' AS Women, 'male' AS
Men));",
+ ResultFormat.OBJECT,
+ false,
+ false,
+ false,
+ null,
+ null
+ )
).rhs;
Assert.assertEquals(
- ImmutableList.of(
- ImmutableMap.of("Age", "18-19", "Women",
0.040737148399612025, "Men", 0.1052376333656644),
- ImmutableMap.of("Age", "20-29", "Women",
0.1556741028128031, "Men", 0.3976721629485936),
- ImmutableMap.of("Age", "30-39", "Women",
0.030552861299709022, "Men", 0.2177497575169738),
- ImmutableMap.of("Age", "40-49", "Women",
0.004849660523763337, "Men", 0.040252182347235696),
- ImmutableMap.of("Age", "50-59", "Women",
0.0009699321047526673, "Men", 0.006304558680892337)
- ),
- rows
+ ImmutableList.of(
+ ImmutableMap.of("Age", "18-19", "Women", 0.040737148399612025,
"Men", 0.1052376333656644),
+ ImmutableMap.of("Age", "20-29", "Women", 0.1556741028128031,
"Men", 0.3976721629485936),
+ ImmutableMap.of("Age", "30-39", "Women", 0.030552861299709022,
"Men", 0.2177497575169738),
+ ImmutableMap.of("Age", "40-49", "Women", 0.004849660523763337,
"Men", 0.040252182347235696),
+ ImmutableMap.of("Age", "50-59", "Women", 0.0009699321047526673,
"Men", 0.006304558680892337)
+ ),
+ rows
);
}
@@ -1689,27 +1698,29 @@ public class SqlResourceTest extends CalciteTestBase
public void testUnsupportedQueryThrowsExceptionWithFilterResponse() throws
Exception
{
resource = new SqlResource(
- JSON_MAPPER,
CalciteTests.TEST_AUTHORIZER_MAPPER,
- new ServerConfig()
- {
- @Override
- public boolean isShowDetailedJettyErrors()
- {
- return true;
- }
-
- @Override
- public ErrorResponseTransformStrategy
getErrorResponseTransformStrategy()
- {
- return new
AllowedRegexErrorResponseTransformStrategy(ImmutableList.of());
- }
- },
lifecycleManager,
new SqlEngineRegistry(Set.of(engine)),
- TEST_RESPONSE_CONTEXT_CONFIG,
- DefaultQueryConfig.NIL,
- DUMMY_DRUID_NODE
+ new SqlResourceQueryResultPusherFactory(
+ JSON_MAPPER,
+ new ServerConfig()
+ {
+ @Override
+ public boolean isShowDetailedJettyErrors()
+ {
+ return true;
+ }
+
+ @Override
+ public ErrorResponseTransformStrategy
getErrorResponseTransformStrategy()
+ {
+ return new
AllowedRegexErrorResponseTransformStrategy(ImmutableList.of());
+ }
+ },
+ TEST_RESPONSE_CONTEXT_CONFIG,
+ DUMMY_DRUID_NODE
+ ),
+ DefaultQueryConfig.NIL
);
String errorMessage = "This will be supported in Druid 9999";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]