This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b91fa406bc [Refactor] Return InstanceResponseBlock in QueryExecutor
(#9561)
b91fa406bc is described below
commit b91fa406bcf7db3a88012614e549cd3f2ddaef3d
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Oct 10 16:55:23 2022 -0700
[Refactor] Return InstanceResponseBlock in QueryExecutor (#9561)
---
.../pinot/common/exception/QueryException.java | 5 +
.../common/datatable/DataTableBuilderFactory.java | 20 ++
.../common/datatable/DataTableBuilderUtils.java | 195 ----------------
.../core/operator/InstanceResponseOperator.java | 19 +-
.../operator/blocks/InstanceResponseBlock.java | 118 +++++++++-
.../blocks/results/AggregationResultsBlock.java | 31 ++-
.../operator/blocks/results/BaseResultsBlock.java | 35 ++-
.../blocks/results/DistinctResultsBlock.java | 25 +-
.../blocks/results/ExceptionResultsBlock.java | 24 +-
.../blocks/results/ExplainResultsBlock.java | 93 ++++++++
.../blocks/results/GroupByResultsBlock.java | 51 ++++-
.../blocks/results/MetadataResultsBlock.java | 24 +-
.../operator/blocks/results/ResultsBlockUtils.java | 119 ++++++++++
.../blocks/results/SelectionResultsBlock.java | 18 +-
.../StreamingInstanceResponseOperator.java | 21 +-
.../apache/pinot/core/plan/GlobalPlanImplV0.java | 5 +-
.../main/java/org/apache/pinot/core/plan/Plan.java | 4 +-
.../plan/StreamingInstanceResponsePlanNode.java | 2 +-
.../pinot/core/query/executor/QueryExecutor.java | 47 +++-
.../query/executor/ServerQueryExecutorV1Impl.java | 251 +++++++++------------
.../pinot/core/query/scheduler/QueryScheduler.java | 71 +++---
.../query/selection/SelectionOperatorUtils.java | 5 +-
.../core/transport/InstanceRequestHandler.java | 20 +-
.../pinot/core/transport/grpc/GrpcQueryServer.java | 17 +-
.../core/common/datatable/DataTableSerDeTest.java | 2 +-
.../blocks/results/ResultsBlockUtilsTest.java} | 31 +--
.../executor/QueryExecutorExceptionsTest.java | 4 +-
.../core/query/executor/QueryExecutorTest.java | 27 ++-
.../query/scheduler/PrioritySchedulerTest.java | 10 +-
.../pinot/core/transport/QueryRoutingTest.java | 8 +-
.../org/apache/pinot/queries/BaseQueriesTest.java | 23 +-
.../pinot/queries/ExplainPlanQueriesTest.java | 9 +-
.../queries/SegmentWithNullValueVectorTest.java | 27 ++-
.../apache/pinot/query/runtime/QueryRunner.java | 12 +-
34 files changed, 812 insertions(+), 561 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index e09a2ff238..0009cee884 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.response.ProcessingException;
+// TODO: Clean up ProcessingException (thrift) because we don't send it
through the wire
public class QueryException {
private QueryException() {
}
@@ -61,6 +62,7 @@ public class QueryException {
public static final int SERVER_SEGMENT_MISSING_ERROR_CODE = 235;
public static final int QUERY_SCHEDULING_TIMEOUT_ERROR_CODE = 240;
public static final int EXECUTION_TIMEOUT_ERROR_CODE = 250;
+ public static final int DATA_TABLE_SERIALIZATION_ERROR_CODE = 260;
public static final int BROKER_GATHER_ERROR_CODE = 300;
public static final int BROKER_SEGMENT_UNAVAILABLE_ERROR_CODE = 305;
public static final int DATA_TABLE_DESERIALIZATION_ERROR_CODE = 310;
@@ -105,6 +107,8 @@ public class QueryException {
new ProcessingException(QUERY_SCHEDULING_TIMEOUT_ERROR_CODE);
public static final ProcessingException EXECUTION_TIMEOUT_ERROR =
new ProcessingException(EXECUTION_TIMEOUT_ERROR_CODE);
+ public static final ProcessingException DATA_TABLE_SERIALIZATION_ERROR =
+ new ProcessingException(DATA_TABLE_SERIALIZATION_ERROR_CODE);
public static final ProcessingException BROKER_GATHER_ERROR = new
ProcessingException(BROKER_GATHER_ERROR_CODE);
public static final ProcessingException DATA_TABLE_DESERIALIZATION_ERROR =
new ProcessingException(DATA_TABLE_DESERIALIZATION_ERROR_CODE);
@@ -142,6 +146,7 @@ public class QueryException {
SERVER_SEGMENT_MISSING_ERROR.setMessage("ServerSegmentMissing");
QUERY_SCHEDULING_TIMEOUT_ERROR.setMessage("QuerySchedulingTimeoutError");
EXECUTION_TIMEOUT_ERROR.setMessage("ExecutionTimeoutError");
+ DATA_TABLE_DESERIALIZATION_ERROR.setMessage("DataTableSerializationError");
BROKER_GATHER_ERROR.setMessage("BrokerGatherError");
DATA_TABLE_DESERIALIZATION_ERROR.setMessage("DataTableDeserializationError");
FUTURE_CALL_ERROR.setMessage("FutureCallError");
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
index 5d9e616974..2bf0426a78 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
@@ -18,7 +18,11 @@
*/
package org.apache.pinot.core.common.datatable;
+import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTableFactory;
+import org.apache.pinot.common.datatable.DataTableImplV2;
+import org.apache.pinot.common.datatable.DataTableImplV3;
+import org.apache.pinot.common.datatable.DataTableImplV4;
import org.apache.pinot.common.utils.DataSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,4 +62,20 @@ public class DataTableBuilderFactory {
throw new IllegalStateException("Unsupported data table version: " +
_version);
}
}
+
+ /**
+ * Returns an empty data table without data.
+ */
+ public static DataTable getEmptyDataTable() {
+ switch (_version) {
+ case DataTableFactory.VERSION_2:
+ return new DataTableImplV2();
+ case DataTableFactory.VERSION_3:
+ return new DataTableImplV3();
+ case DataTableFactory.VERSION_4:
+ return new DataTableImplV4();
+ default:
+ throw new IllegalStateException("Unsupported data table version: " +
_version);
+ }
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java
deleted file mode 100644
index bc631af2fe..0000000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.common.datatable;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.apache.pinot.common.datatable.DataTable;
-import org.apache.pinot.common.datatable.DataTableFactory;
-import org.apache.pinot.common.datatable.DataTableImplV2;
-import org.apache.pinot.common.datatable.DataTableImplV3;
-import org.apache.pinot.common.datatable.DataTableImplV4;
-import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
-import org.apache.pinot.core.query.distinct.DistinctTable;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
-
-
-/**
- * The <code>DataTableUtils</code> class provides utility methods for data
table.
- */
-@SuppressWarnings("rawtypes")
-public class DataTableBuilderUtils {
- private DataTableBuilderUtils() {
- }
-
- /**
- * Returns an empty data table without data.
- */
- public static DataTable getEmptyDataTable() {
- int version = DataTableBuilderFactory.getDataTableVersion();
- switch (version) {
- case DataTableFactory.VERSION_2:
- return new DataTableImplV2();
- case DataTableFactory.VERSION_3:
- return new DataTableImplV3();
- case DataTableFactory.VERSION_4:
- return new DataTableImplV4();
- default:
- throw new IllegalStateException("Unsupported data table version: " +
version);
- }
- }
-
- /**
- * Builds an empty data table based on the broker request.
- */
- public static DataTable buildEmptyDataTable(QueryContext queryContext)
- throws IOException {
- if (QueryContextUtils.isSelectionQuery(queryContext)) {
- return buildEmptyDataTableForSelectionQuery(queryContext);
- } else if (QueryContextUtils.isAggregationQuery(queryContext)) {
- return buildEmptyDataTableForAggregationQuery(queryContext);
- } else {
- assert QueryContextUtils.isDistinctQuery(queryContext);
- return buildEmptyDataTableForDistinctQuery(queryContext);
- }
- }
-
- /**
- * Helper method to build an empty data table for selection query.
- */
- private static DataTable buildEmptyDataTableForSelectionQuery(QueryContext
queryContext) {
- List<ExpressionContext> selectExpressions =
queryContext.getSelectExpressions();
- int numSelectExpressions = selectExpressions.size();
- String[] columnNames = new String[numSelectExpressions];
- for (int i = 0; i < numSelectExpressions; i++) {
- columnNames[i] = selectExpressions.get(i).toString();
- }
- ColumnDataType[] columnDataTypes = new
ColumnDataType[numSelectExpressions];
- // NOTE: Use STRING column data type as default for selection query
- Arrays.fill(columnDataTypes, ColumnDataType.STRING);
- DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
- return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build();
- }
-
- /**
- * Helper method to build an empty data table for aggregation query.
- */
- private static DataTable buildEmptyDataTableForAggregationQuery(QueryContext
queryContext)
- throws IOException {
- AggregationFunction[] aggregationFunctions =
queryContext.getAggregationFunctions();
- assert aggregationFunctions != null;
- int numAggregations = aggregationFunctions.length;
- List<ExpressionContext> groupByExpressions =
queryContext.getGroupByExpressions();
- if (groupByExpressions != null) {
- // Aggregation group-by query
-
- int numColumns = groupByExpressions.size() + numAggregations;
- String[] columnNames = new String[numColumns];
- ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
- int index = 0;
- for (ExpressionContext groupByExpression : groupByExpressions) {
- columnNames[index] = groupByExpression.toString();
- // Use STRING column data type as default for group-by expressions
- columnDataTypes[index] = ColumnDataType.STRING;
- index++;
- }
- for (AggregationFunction aggregationFunction : aggregationFunctions) {
- // NOTE: Use AggregationFunction.getResultColumnName() for SQL format
response
- columnNames[index] = aggregationFunction.getResultColumnName();
- columnDataTypes[index] =
aggregationFunction.getIntermediateResultColumnType();
- index++;
- }
- return DataTableBuilderFactory.getDataTableBuilder(new
DataSchema(columnNames, columnDataTypes)).build();
- } else {
- // Aggregation only query
-
- String[] aggregationColumnNames = new String[numAggregations];
- ColumnDataType[] columnDataTypes = new ColumnDataType[numAggregations];
- Object[] aggregationResults = new Object[numAggregations];
- for (int i = 0; i < numAggregations; i++) {
- AggregationFunction aggregationFunction = aggregationFunctions[i];
- // NOTE: For backward-compatibility, use
AggregationFunction.getColumnName() for aggregation only query
- aggregationColumnNames[i] = aggregationFunction.getColumnName();
- columnDataTypes[i] =
aggregationFunction.getIntermediateResultColumnType();
- aggregationResults[i] =
-
aggregationFunction.extractAggregationResult(aggregationFunction.createAggregationResultHolder());
- }
-
- // Build the data table
- DataTableBuilder dataTableBuilder =
- DataTableBuilderFactory.getDataTableBuilder(new
DataSchema(aggregationColumnNames, columnDataTypes));
- dataTableBuilder.startRow();
- for (int i = 0; i < numAggregations; i++) {
- switch (columnDataTypes[i]) {
- case LONG:
- dataTableBuilder.setColumn(i, ((Number)
aggregationResults[i]).longValue());
- break;
- case DOUBLE:
- dataTableBuilder.setColumn(i, ((Double)
aggregationResults[i]).doubleValue());
- break;
- case OBJECT:
- dataTableBuilder.setColumn(i, aggregationResults[i]);
- break;
- default:
- throw new UnsupportedOperationException(
- "Unsupported aggregation column data type: " +
columnDataTypes[i] + " for column: "
- + aggregationColumnNames[i]);
- }
- }
- dataTableBuilder.finishRow();
- return dataTableBuilder.build();
- }
- }
-
- /**
- * Helper method to build an empty data table for distinct query.
- */
- private static DataTable buildEmptyDataTableForDistinctQuery(QueryContext
queryContext)
- throws IOException {
- AggregationFunction[] aggregationFunctions =
queryContext.getAggregationFunctions();
- assert aggregationFunctions != null && aggregationFunctions.length == 1
- && aggregationFunctions[0] instanceof DistinctAggregationFunction;
- DistinctAggregationFunction distinctAggregationFunction =
(DistinctAggregationFunction) aggregationFunctions[0];
-
- // Create the distinct table
- String[] columnNames = distinctAggregationFunction.getColumns();
- ColumnDataType[] columnDataTypes = new ColumnDataType[columnNames.length];
- // NOTE: Use STRING column data type as default for distinct query
- Arrays.fill(columnDataTypes, ColumnDataType.STRING);
- DistinctTable distinctTable =
- new DistinctTable(new DataSchema(columnNames, columnDataTypes),
Collections.emptySet(),
- queryContext.isNullHandlingEnabled());
-
- // Build the data table
- DataTableBuilder dataTableBuilder =
DataTableBuilderFactory.getDataTableBuilder(
- new DataSchema(new
String[]{distinctAggregationFunction.getColumnName()},
- new ColumnDataType[]{ColumnDataType.OBJECT}));
- dataTableBuilder.startRow();
- dataTableBuilder.setColumn(0, distinctTable);
- dataTableBuilder.finishRow();
- return dataTableBuilder.build();
- }
-}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
index 6bfdb3717e..a1b317d66c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
@@ -20,8 +20,6 @@ package org.apache.pinot.core.operator;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.request.context.ThreadTimer;
import org.apache.pinot.core.common.Operator;
@@ -83,7 +81,7 @@ public class InstanceResponseOperator extends
BaseOperator<InstanceResponseBlock
ThreadTimer mainThreadTimer = new ThreadTimer();
BaseResultsBlock resultsBlock = getCombinedResults();
- InstanceResponseBlock instanceResponseBlock = new
InstanceResponseBlock(getDataTable(resultsBlock));
+ InstanceResponseBlock instanceResponseBlock = new
InstanceResponseBlock(resultsBlock, _queryContext);
long mainThreadCpuTimeNs = mainThreadTimer.getThreadTimeNs();
long totalWallClockTimeNs = System.nanoTime() - startWallClockTimeNs;
@@ -99,14 +97,13 @@ public class InstanceResponseOperator extends
BaseOperator<InstanceResponseBlock
numServerThreads);
long threadCpuTimeNs = mainThreadCpuTimeNs + multipleThreadCpuTimeNs;
- Map<String, String> responseMetaData =
instanceResponseBlock.getInstanceResponseDataTable().getMetadata();
- responseMetaData.put(MetadataKey.THREAD_CPU_TIME_NS.getName(),
String.valueOf(threadCpuTimeNs));
- responseMetaData.put(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(),
+
instanceResponseBlock.addMetadata(MetadataKey.THREAD_CPU_TIME_NS.getName(),
String.valueOf(threadCpuTimeNs));
+
instanceResponseBlock.addMetadata(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(),
String.valueOf(systemActivitiesCpuTimeNs));
return instanceResponseBlock;
} else {
- return new InstanceResponseBlock(getDataTable(getCombinedResults()));
+ return new InstanceResponseBlock(getCombinedResults(), _queryContext);
}
}
@@ -119,14 +116,6 @@ public class InstanceResponseOperator extends
BaseOperator<InstanceResponseBlock
}
}
- private DataTable getDataTable(BaseResultsBlock resultsBlock) {
- try {
- return resultsBlock.getDataTable(_queryContext);
- } catch (Exception e) {
- throw new RuntimeException("Caught exception while building data table",
e);
- }
- }
-
private void prefetchAll() {
for (int i = 0; i < _fetchContextSize; i++) {
_indexSegments.get(i).prefetch(_fetchContexts.get(i));
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java
index 45170ff0d2..4708d5cf83 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java
@@ -18,26 +18,132 @@
*/
package org.apache.pinot.core.operator.blocks;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.common.BlockDocIdValueSet;
import org.apache.pinot.core.common.BlockMetadata;
import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
/**
- * InstanceResponseBlock is just a holder to get InstanceResponse from
InstanceResponseBlock.
+ * The {@code InstanceResponseBlock} is the holder of the server side results.
*/
public class InstanceResponseBlock implements Block {
- private final DataTable _instanceResponseDataTable;
+ private final BaseResultsBlock _resultsBlock;
+ private final QueryContext _queryContext;
+ private final Map<Integer, String> _exceptions;
+ private final Map<String, String> _metadata;
- public InstanceResponseBlock(DataTable dataTable) {
- _instanceResponseDataTable = dataTable;
+ public InstanceResponseBlock(BaseResultsBlock resultsBlock, QueryContext
queryContext) {
+ _resultsBlock = resultsBlock;
+ _queryContext = queryContext;
+ _exceptions = new HashMap<>();
+ List<ProcessingException> processingExceptions =
resultsBlock.getProcessingExceptions();
+ if (processingExceptions != null) {
+ for (ProcessingException processingException : processingExceptions) {
+ _exceptions.put(processingException.getErrorCode(),
processingException.getMessage());
+ }
+ }
+ _metadata = resultsBlock.getResultsMetadata();
}
- public DataTable getInstanceResponseDataTable() {
- return _instanceResponseDataTable;
+ /**
+ * Metadata only instance response.
+ */
+ public InstanceResponseBlock() {
+ _resultsBlock = null;
+ _queryContext = null;
+ _exceptions = new HashMap<>();
+ _metadata = new HashMap<>();
+ }
+
+ private InstanceResponseBlock(Map<Integer, String> exceptions, Map<String,
String> metadata) {
+ _resultsBlock = null;
+ _queryContext = null;
+ _exceptions = exceptions;
+ _metadata = metadata;
+ }
+
+ public InstanceResponseBlock toMetadataOnlyResponseBlock() {
+ return new InstanceResponseBlock(_exceptions, _metadata);
+ }
+
+ public void addException(ProcessingException processingException) {
+ _exceptions.put(processingException.getErrorCode(),
processingException.getMessage());
+ }
+
+ public void addException(int errorCode, String exceptionMessage) {
+ _exceptions.put(errorCode, exceptionMessage);
+ }
+
+ public void addMetadata(String key, String value) {
+ _metadata.put(key, value);
+ }
+
+ @Nullable
+ public BaseResultsBlock getResultsBlock() {
+ return _resultsBlock;
+ }
+
+ @Nullable
+ public QueryContext getQueryContext() {
+ return _queryContext;
+ }
+
+ public Map<Integer, String> getExceptions() {
+ return _exceptions;
+ }
+
+ public Map<String, String> getResponseMetadata() {
+ return _metadata;
+ }
+
+ @Nullable
+ public DataSchema getDataSchema() {
+ return _resultsBlock != null ? _resultsBlock.getDataSchema(_queryContext)
: null;
+ }
+
+ @Nullable
+ public Collection<Object[]> getRows() {
+ return _resultsBlock != null ? _resultsBlock.getRows(_queryContext) : null;
+ }
+
+ public DataTable toDataTable()
+ throws IOException {
+ DataTable dataTable = toDataOnlyDataTable();
+ attachMetadata(dataTable);
+ return dataTable;
+ }
+
+ public DataTable toDataOnlyDataTable()
+ throws IOException {
+ return _resultsBlock != null ? _resultsBlock.getDataTable(_queryContext)
+ : DataTableBuilderFactory.getEmptyDataTable();
+ }
+
+ public DataTable toMetadataOnlyDataTable() {
+ DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
+ attachMetadata(dataTable);
+ return dataTable;
+ }
+
+ private void attachMetadata(DataTable dataTable) {
+ for (Map.Entry<Integer, String> entry : _exceptions.entrySet()) {
+ dataTable.addException(entry.getKey(), entry.getValue());
+ }
+ dataTable.getMetadata().putAll(_metadata);
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index 10c641713e..3724b645e3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -21,6 +21,8 @@ package org.apache.pinot.core.operator.blocks.results;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import java.io.IOException;
import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
@@ -55,11 +57,8 @@ public class AggregationResultsBlock extends
BaseResultsBlock {
}
@Override
- public DataTable getDataTable(QueryContext queryContext)
- throws Exception {
+ public DataSchema getDataSchema(QueryContext queryContext) {
boolean returnFinalResult = queryContext.isServerReturnFinalResult();
-
- // Extract result column name and type from each aggregation function
int numColumns = _aggregationFunctions.length;
String[] columnNames = new String[numColumns];
ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
@@ -69,10 +68,23 @@ public class AggregationResultsBlock extends
BaseResultsBlock {
columnDataTypes[i] = returnFinalResult ?
aggregationFunction.getFinalResultColumnType()
: aggregationFunction.getIntermediateResultColumnType();
}
+ return new DataSchema(columnNames, columnDataTypes);
+ }
+
+ @Override
+ public Collection<Object[]> getRows(QueryContext queryContext) {
+ return Collections.singletonList(_results.toArray());
+ }
- // Build the data table.
- DataTableBuilder dataTableBuilder =
- DataTableBuilderFactory.getDataTableBuilder(new
DataSchema(columnNames, columnDataTypes));
+ @Override
+ public DataTable getDataTable(QueryContext queryContext)
+ throws IOException {
+ boolean returnFinalResult = queryContext.isServerReturnFinalResult();
+ DataSchema dataSchema = getDataSchema(queryContext);
+ assert dataSchema != null;
+ ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+ int numColumns = columnDataTypes.length;
+ DataTableBuilder dataTableBuilder =
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
if (queryContext.isNullHandlingEnabled()) {
RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
for (int i = 0; i < numColumns; i++) {
@@ -108,10 +120,7 @@ public class AggregationResultsBlock extends
BaseResultsBlock {
}
dataTableBuilder.finishRow();
}
-
- DataTable dataTable = dataTableBuilder.build();
- attachMetadataToDataTable(dataTable);
- return dataTable;
+ return dataTableBuilder.build();
}
private void setIntermediateResult(DataTableBuilder dataTableBuilder,
ColumnDataType[] columnDataTypes, int index,
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
index 24573e04c1..e48e50b89d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
@@ -19,14 +19,17 @@
package org.apache.pinot.core.operator.blocks.results;
import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.common.BlockDocIdValueSet;
@@ -155,16 +158,29 @@ public abstract class BaseResultsBlock implements Block {
_numServerThreads = numServerThreads;
}
+ /**
+ * Returns the data schema for the results. Return {@code null} when the
block only contains metadata.
+ */
+ @Nullable
+ public abstract DataSchema getDataSchema(QueryContext queryContext);
+
+ /**
+ * Returns the rows for the results. Return {@code null} when the block only
contains metadata.
+ */
+ @Nullable
+ public abstract Collection<Object[]> getRows(QueryContext queryContext);
+
+ /**
+ * Returns a data table without metadata or exception attached.
+ */
public abstract DataTable getDataTable(QueryContext queryContext)
- throws Exception;
+ throws IOException;
- protected void attachMetadataToDataTable(DataTable dataTable) {
- if (CollectionUtils.isNotEmpty(_processingExceptions)) {
- for (ProcessingException exception : _processingExceptions) {
- dataTable.addException(exception);
- }
- }
- Map<String, String> metadata = dataTable.getMetadata();
+ /**
+ * Returns the metadata for the results.
+ */
+ public Map<String, String> getResultsMetadata() {
+ Map<String, String> metadata = new HashMap<>();
metadata.put(MetadataKey.TOTAL_DOCS.getName(),
Long.toString(_numTotalDocs));
metadata.put(MetadataKey.NUM_DOCS_SCANNED.getName(),
Long.toString(_numDocsScanned));
metadata.put(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(),
Long.toString(_numEntriesScannedInFilter));
@@ -174,6 +190,7 @@ public abstract class BaseResultsBlock implements Block {
metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
Integer.toString(_numConsumingSegmentsProcessed));
metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(),
Integer.toString(_numConsumingSegmentsMatched));
+ return metadata;
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
index b721671726..8db1dfc474 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
@@ -18,11 +18,16 @@
*/
package org.apache.pinot.core.operator.blocks.results;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.data.table.Record;
import
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -48,9 +53,23 @@ public class DistinctResultsBlock extends BaseResultsBlock {
_distinctTable = distinctTable;
}
+ @Override
+ public DataSchema getDataSchema(QueryContext queryContext) {
+ return _distinctTable.getDataSchema();
+ }
+
+ @Override
+ public Collection<Object[]> getRows(QueryContext queryContext) {
+ List<Object[]> rows = new ArrayList<>(_distinctTable.size());
+ for (Record record : _distinctTable.getRecords()) {
+ rows.add(record.getValues());
+ }
+ return rows;
+ }
+
@Override
public DataTable getDataTable(QueryContext queryContext)
- throws Exception {
+ throws IOException {
String[] columnNames = new String[]{_distinctFunction.getColumnName()};
ColumnDataType[] columnDataTypes = new
ColumnDataType[]{ColumnDataType.OBJECT};
DataTableBuilder dataTableBuilder =
@@ -58,8 +77,6 @@ public class DistinctResultsBlock extends BaseResultsBlock {
dataTableBuilder.startRow();
dataTableBuilder.setColumn(0, _distinctTable);
dataTableBuilder.finishRow();
- DataTable dataTable = dataTableBuilder.build();
- attachMetadataToDataTable(dataTable);
- return dataTable;
+ return dataTableBuilder.build();
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
index 23a6a2120d..d8df97ee7c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
@@ -18,10 +18,13 @@
*/
package org.apache.pinot.core.operator.blocks.results;
+import java.util.Collection;
+import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -35,11 +38,20 @@ public class ExceptionResultsBlock extends BaseResultsBlock
{
this(QueryException.QUERY_EXECUTION_ERROR, e);
}
+ @Nullable
@Override
- public DataTable getDataTable(QueryContext queryContext)
- throws Exception {
- DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
- attachMetadataToDataTable(dataTable);
- return dataTable;
+ public DataSchema getDataSchema(QueryContext queryContext) {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Collection<Object[]> getRows(QueryContext queryContext) {
+ return null;
+ }
+
+ @Override
+ public DataTable getDataTable(QueryContext queryContext) {
+ return DataTableBuilderFactory.getEmptyDataTable();
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java
new file mode 100644
index 0000000000..e229848591
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.blocks.results;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+/**
+ * Results block for EXPLAIN queries.
+ */
+public class ExplainResultsBlock extends BaseResultsBlock {
+ private final List<ExplainEntry> _entries = new ArrayList<>();
+
+ public void addOperator(String operatorName, int operatorId, int parentId) {
+ _entries.add(new ExplainEntry(operatorName, operatorId, parentId));
+ }
+
+ @Override
+ public DataSchema getDataSchema(QueryContext queryContext) {
+ return DataSchema.EXPLAIN_RESULT_SCHEMA;
+ }
+
+ @Override
+ public Collection<Object[]> getRows(QueryContext queryContext) {
+ List<Object[]> rows = new ArrayList<>(_entries.size());
+ for (ExplainEntry entry : _entries) {
+ rows.add(entry.toRow());
+ }
+ return rows;
+ }
+
+ @Override
+ public DataTable getDataTable(QueryContext queryContext)
+ throws IOException {
+ DataTableBuilder dataTableBuilder =
DataTableBuilderFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
+ for (ExplainEntry entry : _entries) {
+ dataTableBuilder.startRow();
+ dataTableBuilder.setColumn(0, entry._operatorName);
+ dataTableBuilder.setColumn(1, entry._operatorId);
+ dataTableBuilder.setColumn(2, entry._parentId);
+ dataTableBuilder.finishRow();
+ }
+ return dataTableBuilder.build();
+ }
+
+ @Override
+ public Map<String, String> getResultsMetadata() {
+ // Do not add metadata for EXPLAIN results
+ return new HashMap<>();
+ }
+
+ private static class ExplainEntry {
+ final String _operatorName;
+ final int _operatorId;
+ final int _parentId;
+
+ ExplainEntry(String operatorName, int operatorId, int parentId) {
+ _operatorName = operatorName;
+ _operatorId = operatorId;
+ _parentId = parentId;
+ }
+
+ Object[] toRow() {
+ return new Object[]{_operatorName, _parentId, _parentId};
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index 3cd88b95cc..ffc5109f14 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -18,13 +18,16 @@
*/
package org.apache.pinot.core.operator.blocks.results;
-import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import java.io.IOException;
import java.math.BigDecimal;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.utils.DataSchema;
@@ -83,6 +86,16 @@ public class GroupByResultsBlock extends BaseResultsBlock {
_table = table;
}
+ /**
+ * For instance level empty group-by results.
+ */
+ public GroupByResultsBlock(DataSchema dataSchema) {
+ _dataSchema = dataSchema;
+ _aggregationGroupByResult = null;
+ _intermediateRecords = null;
+ _table = null;
+ }
+
public DataSchema getDataSchema() {
return _dataSchema;
}
@@ -123,11 +136,33 @@ public class GroupByResultsBlock extends BaseResultsBlock
{
_resizeTimeMs = resizeTimeMs;
}
+ @Nullable
+ @Override
+ public DataSchema getDataSchema(QueryContext queryContext) {
+ return _dataSchema;
+ }
+
+ @Nullable
+ @Override
+ public Collection<Object[]> getRows(QueryContext queryContext) {
+ if (_table == null) {
+ return Collections.emptyList();
+ }
+ List<Object[]> rows = new ArrayList<>(_table.size());
+ Iterator<Record> iterator = _table.iterator();
+ while (iterator.hasNext()) {
+ rows.add(iterator.next().getValues());
+ }
+ return rows;
+ }
+
@Override
public DataTable getDataTable(QueryContext queryContext)
- throws Exception {
- Preconditions.checkState(_table != null, "Cannot get DataTable from
segment level results");
+ throws IOException {
DataTableBuilder dataTableBuilder =
DataTableBuilderFactory.getDataTableBuilder(_dataSchema);
+ if (_table == null) {
+ return dataTableBuilder.build();
+ }
ColumnDataType[] storedColumnDataTypes =
_dataSchema.getStoredColumnDataTypes();
int numColumns = _dataSchema.size();
Iterator<Record> iterator = _table.iterator();
@@ -166,9 +201,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
dataTableBuilder.finishRow();
}
}
- DataTable dataTable = dataTableBuilder.build();
- attachMetadataToDataTable(dataTable);
- return dataTable;
+ return dataTableBuilder.build();
}
private void setDataTableColumn(ColumnDataType storedColumnDataType,
DataTableBuilder dataTableBuilder,
@@ -224,13 +257,13 @@ public class GroupByResultsBlock extends BaseResultsBlock
{
}
@Override
- protected void attachMetadataToDataTable(DataTable dataTable) {
- super.attachMetadataToDataTable(dataTable);
- Map<String, String> metadata = dataTable.getMetadata();
+ public Map<String, String> getResultsMetadata() {
+ Map<String, String> metadata = super.getResultsMetadata();
if (_numGroupsLimitReached) {
metadata.put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true");
}
metadata.put(MetadataKey.NUM_RESIZES.getName(),
Integer.toString(_numResizes));
metadata.put(MetadataKey.RESIZE_TIME_MS.getName(),
Long.toString(_resizeTimeMs));
+ return metadata;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
index 7ffeb0643d..dcfddf17cf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
@@ -18,18 +18,30 @@
*/
package org.apache.pinot.core.operator.blocks.results;
+import java.util.Collection;
+import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.query.request.context.QueryContext;
public class MetadataResultsBlock extends BaseResultsBlock {
+ @Nullable
@Override
- public DataTable getDataTable(QueryContext queryContext)
- throws Exception {
- DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
- attachMetadataToDataTable(dataTable);
- return dataTable;
+ public DataSchema getDataSchema(QueryContext queryContext) {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Collection<Object[]> getRows(QueryContext queryContext) {
+ return null;
+ }
+
+ @Override
+ public DataTable getDataTable(QueryContext queryContext) {
+ return DataTableBuilderFactory.getEmptyDataTable();
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java
new file mode 100644
index 0000000000..5f5e7d0769
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.blocks.results;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
+import org.apache.pinot.core.query.distinct.DistinctTable;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
+
+
+@SuppressWarnings("rawtypes")
+public class ResultsBlockUtils {
+ private ResultsBlockUtils() {
+ }
+
+ public static BaseResultsBlock buildEmptyQueryResults(QueryContext
queryContext) {
+ if (QueryContextUtils.isSelectionQuery(queryContext)) {
+ return buildEmptySelectionQueryResults(queryContext);
+ } else if (QueryContextUtils.isAggregationQuery(queryContext)) {
+ if (queryContext.getGroupByExpressions() == null) {
+ return buildEmptyAggregationQueryResults(queryContext);
+ } else {
+ return buildEmptyGroupByQueryResults(queryContext);
+ }
+ } else {
+ assert QueryContextUtils.isDistinctQuery(queryContext);
+ return buildEmptyDistinctQueryResults(queryContext);
+ }
+ }
+
+ private static SelectionResultsBlock
buildEmptySelectionQueryResults(QueryContext queryContext) {
+ List<ExpressionContext> selectExpressions =
queryContext.getSelectExpressions();
+ int numSelectExpressions = selectExpressions.size();
+ String[] columnNames = new String[numSelectExpressions];
+ for (int i = 0; i < numSelectExpressions; i++) {
+ columnNames[i] = selectExpressions.get(i).toString();
+ }
+ ColumnDataType[] columnDataTypes = new
ColumnDataType[numSelectExpressions];
+ // NOTE: Use STRING column data type as default for selection query
+ Arrays.fill(columnDataTypes, ColumnDataType.STRING);
+ DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+ return new SelectionResultsBlock(dataSchema, Collections.emptyList());
+ }
+
+ private static AggregationResultsBlock
buildEmptyAggregationQueryResults(QueryContext queryContext) {
+ AggregationFunction[] aggregationFunctions =
queryContext.getAggregationFunctions();
+ assert aggregationFunctions != null;
+ int numAggregations = aggregationFunctions.length;
+ List<Object> results = new ArrayList<>(numAggregations);
+ for (AggregationFunction aggregationFunction : aggregationFunctions) {
+
results.add(aggregationFunction.extractAggregationResult(aggregationFunction.createAggregationResultHolder()));
+ }
+ return new AggregationResultsBlock(aggregationFunctions, results);
+ }
+
+ private static GroupByResultsBlock
buildEmptyGroupByQueryResults(QueryContext queryContext) {
+ AggregationFunction[] aggregationFunctions =
queryContext.getAggregationFunctions();
+ assert aggregationFunctions != null;
+ int numAggregations = aggregationFunctions.length;
+ List<ExpressionContext> groupByExpressions =
queryContext.getGroupByExpressions();
+ assert groupByExpressions != null;
+ int numColumns = groupByExpressions.size() + numAggregations;
+ String[] columnNames = new String[numColumns];
+ ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
+ int index = 0;
+ for (ExpressionContext groupByExpression : groupByExpressions) {
+ columnNames[index] = groupByExpression.toString();
+ // Use STRING column data type as default for group-by expressions
+ columnDataTypes[index] = ColumnDataType.STRING;
+ index++;
+ }
+ for (AggregationFunction aggregationFunction : aggregationFunctions) {
+ // NOTE: Use AggregationFunction.getResultColumnName() for SQL format
response
+ columnNames[index] = aggregationFunction.getResultColumnName();
+ columnDataTypes[index] =
aggregationFunction.getIntermediateResultColumnType();
+ index++;
+ }
+ return new GroupByResultsBlock(new DataSchema(columnNames,
columnDataTypes));
+ }
+
+ private static DistinctResultsBlock
buildEmptyDistinctQueryResults(QueryContext queryContext) {
+ AggregationFunction[] aggregationFunctions =
queryContext.getAggregationFunctions();
+ assert aggregationFunctions != null && aggregationFunctions.length == 1
+ && aggregationFunctions[0] instanceof DistinctAggregationFunction;
+ DistinctAggregationFunction distinctAggregationFunction =
(DistinctAggregationFunction) aggregationFunctions[0];
+ String[] columnNames = distinctAggregationFunction.getColumns();
+ ColumnDataType[] columnDataTypes = new ColumnDataType[columnNames.length];
+ // NOTE: Use STRING column data type as default for distinct query
+ Arrays.fill(columnDataTypes, ColumnDataType.STRING);
+ DistinctTable distinctTable =
+ new DistinctTable(new DataSchema(columnNames, columnDataTypes),
Collections.emptySet(),
+ queryContext.isNullHandlingEnabled());
+ return new DistinctResultsBlock(distinctAggregationFunction,
distinctTable);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
index 4f714ae7be..2a7fd5847c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.operator.blocks.results;
+import java.io.IOException;
import java.util.Collection;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
@@ -45,12 +46,19 @@ public class SelectionResultsBlock extends BaseResultsBlock
{
return _rows;
}
+ @Override
+ public DataSchema getDataSchema(QueryContext queryContext) {
+ return _dataSchema;
+ }
+
+ @Override
+ public Collection<Object[]> getRows(QueryContext queryContext) {
+ return _rows;
+ }
+
@Override
public DataTable getDataTable(QueryContext queryContext)
- throws Exception {
- DataTable dataTable =
- SelectionOperatorUtils.getDataTableFromRows(_rows, _dataSchema,
queryContext.isNullHandlingEnabled());
- attachMetadataToDataTable(dataTable);
- return dataTable;
+ throws IOException {
+ return SelectionOperatorUtils.getDataTableFromRows(_rows, _dataSchema,
queryContext.isNullHandlingEnabled());
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java
similarity index 69%
rename from
pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
rename to
pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java
index 3243dcd406..4bbd21ba3c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java
@@ -16,18 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.operator;
+package org.apache.pinot.core.operator.streaming;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.List;
-import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.Server;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
+import org.apache.pinot.core.operator.InstanceResponseOperator;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.combine.BaseCombineOperator;
-import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.FetchContext;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -46,18 +44,15 @@ public class StreamingInstanceResponseOperator extends
InstanceResponseOperator
@Override
protected InstanceResponseBlock getNextBlock() {
- InstanceResponseBlock nextBlock = super.getNextBlock();
- DataTable instanceResponseDataTable =
nextBlock.getInstanceResponseDataTable();
- DataTable metadataOnlyDataTable;
+ InstanceResponseBlock responseBlock = super.getNextBlock();
+ InstanceResponseBlock metadataOnlyResponseBlock =
responseBlock.toMetadataOnlyResponseBlock();
try {
- metadataOnlyDataTable =
instanceResponseDataTable.toMetadataOnlyDataTable();
-
_streamObserver.onNext(StreamingResponseUtils.getDataResponse(instanceResponseDataTable.toDataOnlyDataTable()));
+
_streamObserver.onNext(StreamingResponseUtils.getDataResponse(responseBlock.toDataOnlyDataTable()));
} catch (IOException e) {
- // when exception occurs in streaming, we return an error-only metadata
block.
- metadataOnlyDataTable = DataTableBuilderUtils.getEmptyDataTable();
-
metadataOnlyDataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
+ metadataOnlyResponseBlock.addException(
+
QueryException.getException(QueryException.DATA_TABLE_SERIALIZATION_ERROR, e));
}
// return a metadata-only block.
- return new InstanceResponseBlock(metadataOnlyDataTable);
+ return metadataOnlyResponseBlock;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java
index d2bff6ab65..94cccc45ae 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java
@@ -19,7 +19,6 @@
package org.apache.pinot.core.plan;
import java.util.concurrent.TimeoutException;
-import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.core.operator.InstanceResponseOperator;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.slf4j.Logger;
@@ -46,7 +45,7 @@ public class GlobalPlanImplV0 implements Plan {
}
@Override
- public DataTable execute()
+ public InstanceResponseBlock execute()
throws TimeoutException {
long startTime = System.currentTimeMillis();
InstanceResponseOperator instanceResponseOperator =
_instanceResponsePlanNode.run();
@@ -58,6 +57,6 @@ public class GlobalPlanImplV0 implements Plan {
InstanceResponseBlock instanceResponseBlock =
instanceResponseOperator.nextBlock();
long endTime2 = System.currentTimeMillis();
LOGGER.debug("InstanceResponseOperator.nextBlock() took: {}ms", endTime2 -
endTime1);
- return instanceResponseBlock.getInstanceResponseDataTable();
+ return instanceResponseBlock;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java
index 567dce0f6a..06432da734 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java
@@ -19,7 +19,7 @@
package org.apache.pinot.core.plan;
import java.util.concurrent.TimeoutException;
-import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.spi.annotations.InterfaceAudience;
@@ -33,6 +33,6 @@ public interface Plan {
PlanNode getPlanNode();
/** Execute the query plan and get the instance response. */
- DataTable execute()
+ InstanceResponseBlock execute()
throws TimeoutException;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingInstanceResponsePlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingInstanceResponsePlanNode.java
index 6cd2b09846..25b20ec5b9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingInstanceResponsePlanNode.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingInstanceResponsePlanNode.java
@@ -22,7 +22,7 @@ import io.grpc.stub.StreamObserver;
import java.util.List;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.core.operator.InstanceResponseOperator;
-import org.apache.pinot.core.operator.StreamingInstanceResponseOperator;
+import
org.apache.pinot.core.operator.streaming.StreamingInstanceResponseOperator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.FetchContext;
import org.apache.pinot.segment.spi.IndexSegment;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
index 4712d7f911..b7ac472743 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
@@ -24,9 +24,11 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -55,7 +57,10 @@ public interface QueryExecutor {
/**
* Processes the non-streaming query with the given executor service.
+ *
+ * Deprecated: use execute() instead.
*/
+ @Deprecated
default DataTable processQuery(ServerQueryRequest queryRequest,
ExecutorService executorService) {
return processQuery(queryRequest, executorService, null);
}
@@ -64,14 +69,48 @@ public interface QueryExecutor {
* Processes the query (streaming or non-streaming) with the given executor
service.
* <ul>
* <li>
- * For streaming request, the returned DataTable contains only the
metadata. The response is streamed back via the
- * observer.
+ * For streaming request, the returned {@link DataTable} contains only
the metadata. The response is streamed back
+ * via the observer.
* </li>
* <li>
- * For non-streaming request, the returned DataTable contains both data
and metadata.
+ * For non-streaming request, the returned {@link DataTable} contains
both data and metadata.
* </li>
* </ul>
+ *
+ * Deprecated: use execute() instead.
*/
- DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService
executorService,
+ @Deprecated
+ default DataTable processQuery(ServerQueryRequest queryRequest,
ExecutorService executorService,
+ @Nullable StreamObserver<Server.ServerResponse> responseObserver) {
+ InstanceResponseBlock instanceResponse = execute(queryRequest,
executorService, responseObserver);
+ try {
+ return instanceResponse.toDataTable();
+ } catch (Exception e) {
+ DataTable metadataOnlyDataTable =
instanceResponse.toMetadataOnlyDataTable();
+
metadataOnlyDataTable.addException(QueryException.getException(QueryException.DATA_TABLE_SERIALIZATION_ERROR,
e));
+ return metadataOnlyDataTable;
+ }
+ }
+
+ /**
+ * Executes the non-streaming query with the given executor service.
+ */
+ default InstanceResponseBlock execute(ServerQueryRequest queryRequest,
ExecutorService executorService) {
+ return execute(queryRequest, executorService, null);
+ }
+
+ /**
+ * Executes the query (streaming or non-streaming) with the given executor
service.
+ * <ul>
+ * <li>
+ * For streaming request, the returned {@link InstanceResponseBlock}
contains only the metadata. The response is
+ * streamed back via the observer.
+ * </li>
+ * <li>
+ * For non-streaming request, the returned {@link InstanceResponseBlock}
contains both data and metadata.
+ * </li>
+ * </ul>
+ */
+ InstanceResponseBlock execute(ServerQueryRequest queryRequest,
ExecutorService executorService,
@Nullable StreamObserver<Server.ServerResponse> responseObserver);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 075882448e..bf0838d4d1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -20,10 +20,7 @@ package org.apache.pinot.core.query.executor;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -34,7 +31,6 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang.StringUtils;
-import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.function.TransformFunctionType;
@@ -45,16 +41,16 @@ import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.FunctionContext;
-import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.ExplainPlanRowData;
import org.apache.pinot.core.common.ExplainPlanRows;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.ExplainResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.ResultsBlockUtils;
import org.apache.pinot.core.operator.filter.EmptyFilterOperator;
import org.apache.pinot.core.operator.filter.MatchAllFilterOperator;
import org.apache.pinot.core.plan.Plan;
@@ -68,6 +64,7 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.TimerContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.query.utils.idset.IdSet;
import org.apache.pinot.core.util.QueryOptionsUtils;
import org.apache.pinot.core.util.trace.TraceContext;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
@@ -132,20 +129,20 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
}
@Override
- public DataTable processQuery(ServerQueryRequest queryRequest,
ExecutorService executorService,
+ public InstanceResponseBlock execute(ServerQueryRequest queryRequest,
ExecutorService executorService,
@Nullable StreamObserver<Server.ServerResponse> responseObserver) {
if (!queryRequest.isEnableTrace()) {
- return processQueryInternal(queryRequest, executorService,
responseObserver);
+ return executeInternal(queryRequest, executorService, responseObserver);
}
try {
Tracing.getTracer().register(queryRequest.getRequestId());
- return processQueryInternal(queryRequest, executorService,
responseObserver);
+ return executeInternal(queryRequest, executorService, responseObserver);
} finally {
Tracing.getTracer().unregister();
}
}
- private DataTable processQueryInternal(ServerQueryRequest queryRequest,
ExecutorService executorService,
+ private InstanceResponseBlock executeInternal(ServerQueryRequest
queryRequest, ExecutorService executorService,
@Nullable StreamObserver<Server.ServerResponse> responseObserver) {
TimerContext timerContext = queryRequest.getTimerContext();
TimerContext.Timer schedulerWaitTimer =
timerContext.getPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT);
@@ -178,20 +175,22 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
String errorMessage =
String.format("Query scheduling took %dms (longer than query timeout
of %dms) on server: %s",
querySchedulingTimeMs, queryTimeoutMs,
_instanceDataManager.getInstanceId());
- DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
-
dataTable.addException(QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR,
errorMessage));
+ InstanceResponseBlock instanceResponse = new InstanceResponseBlock();
+ instanceResponse.addException(
+
QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR,
errorMessage));
LOGGER.error("{} while processing requestId: {}", errorMessage,
requestId);
- return dataTable;
+ return instanceResponse;
}
TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(tableNameWithType);
if (tableDataManager == null) {
String errorMessage = String.format("Failed to find table: %s on server:
%s", tableNameWithType,
_instanceDataManager.getInstanceId());
- DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
-
dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR,
errorMessage));
+ InstanceResponseBlock instanceResponse = new InstanceResponseBlock();
+ instanceResponse.addException(
+
QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR,
errorMessage));
LOGGER.error("{} while processing requestId: {}", errorMessage,
requestId);
- return dataTable;
+ return instanceResponse;
}
List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
@@ -245,17 +244,17 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
}
}
- DataTable dataTable = null;
+ InstanceResponseBlock instanceResponse = null;
try {
- dataTable = processQuery(indexSegments, queryContext, timerContext,
executorService, responseObserver,
+ instanceResponse = executeInternal(indexSegments, queryContext,
timerContext, executorService, responseObserver,
queryRequest.isEnableStreaming());
} catch (Exception e) {
_serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
- dataTable = DataTableBuilderUtils.getEmptyDataTable();
+ instanceResponse = new InstanceResponseBlock();
// Do not log verbose error for BadQueryRequestException and
QueryCancelledException.
if (e instanceof BadQueryRequestException) {
LOGGER.info("Caught BadQueryRequestException while processing
requestId: {}, {}", requestId, e.getMessage());
-
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
+
instanceResponse.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
} else if (e instanceof QueryCancelledException) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Cancelled while processing requestId: {}", requestId,
e);
@@ -265,28 +264,27 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
// NOTE most likely the onFailure() callback registered on query
future in InstanceRequestHandler would
// return the error table to broker sooner than here. But in case of
race condition, we construct the error
// table here too.
-
dataTable.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR,
+
instanceResponse.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR,
"Query cancelled on: " + _instanceDataManager.getInstanceId()));
} else {
LOGGER.error("Exception processing requestId {}", requestId, e);
-
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
+
instanceResponse.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
}
} finally {
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
tableDataManager.releaseSegment(segmentDataManager);
}
if (queryRequest.isEnableTrace()) {
- if (TraceContext.traceEnabled() && dataTable != null) {
- dataTable.getMetadata().put(MetadataKey.TRACE_INFO.getName(),
TraceContext.getTraceInfo());
+ if (TraceContext.traceEnabled() && instanceResponse != null) {
+ instanceResponse.addMetadata(MetadataKey.TRACE_INFO.getName(),
TraceContext.getTraceInfo());
}
}
}
queryProcessingTimer.stopAndRecord();
long queryProcessingTime = queryProcessingTimer.getDurationMs();
- Map<String, String> metadata = dataTable.getMetadata();
- metadata.put(MetadataKey.NUM_SEGMENTS_QUERIED.getName(),
Integer.toString(numSegmentsAcquired));
- metadata.put(MetadataKey.TIME_USED_MS.getName(),
Long.toString(queryProcessingTime));
+ instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_QUERIED.getName(),
Integer.toString(numSegmentsAcquired));
+ instanceResponse.addMetadata(MetadataKey.TIME_USED_MS.getName(),
Long.toString(queryProcessingTime));
// When segment is removed from the IdealState:
// 1. Controller schedules a state transition to server to turn segment
OFFLINE
@@ -302,7 +300,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
.collect(Collectors.toList());
int numMissingSegments = missingSegments.size();
if (numMissingSegments > 0) {
-
dataTable.addException(QueryException.getException(QueryException.SERVER_SEGMENT_MISSING_ERROR,
+
instanceResponse.addException(QueryException.getException(QueryException.SERVER_SEGMENT_MISSING_ERROR,
String.format("%d segments %s missing on server: %s",
numMissingSegments, missingSegments,
_instanceDataManager.getInstanceId())));
_serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_MISSING_SEGMENTS, numMissingSegments);
@@ -310,31 +308,32 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
}
if (tableDataManager instanceof RealtimeTableDataManager) {
- long minConsumingFreshnessTimeMs = Long.MAX_VALUE;
+ long minConsumingFreshnessTimeMs;
if (numConsumingSegmentsQueried > 0) {
minConsumingFreshnessTimeMs = minIngestionTimeMs != Long.MAX_VALUE ?
minIngestionTimeMs : minIndexTimeMs;
- metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
+
instanceResponse.addMetadata(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
Integer.toString(numConsumingSegmentsQueried));
- metadata.put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
Long.toString(minConsumingFreshnessTimeMs));
+
instanceResponse.addMetadata(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
+ Long.toString(minConsumingFreshnessTimeMs));
LOGGER.debug("Request {} queried {} consuming segments with
minConsumingFreshnessTimeMs: {}", requestId,
numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
} else if (numConsumingSegmentsQueried == 0 && maxEndTimeMs !=
Long.MIN_VALUE) {
minConsumingFreshnessTimeMs = maxEndTimeMs;
- metadata.put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
Long.toString(maxEndTimeMs));
+
instanceResponse.addMetadata(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
+ Long.toString(maxEndTimeMs));
LOGGER.debug("Request {} queried {} consuming segments with
minConsumingFreshnessTimeMs: {}", requestId,
numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
}
}
LOGGER.debug("Query processing time for request Id - {}: {}", requestId,
queryProcessingTime);
- LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId,
dataTable);
- return dataTable;
+ return instanceResponse;
}
// NOTE: This method might change indexSegments. Do not use it after calling
this method.
- private DataTable processQuery(List<IndexSegment> indexSegments,
QueryContext queryContext, TimerContext timerContext,
- ExecutorService executorService, @Nullable
StreamObserver<Server.ServerResponse> responseObserver,
- boolean enableStreaming)
+ private InstanceResponseBlock executeInternal(List<IndexSegment>
indexSegments, QueryContext queryContext,
+ TimerContext timerContext, ExecutorService executorService,
+ @Nullable StreamObserver<Server.ServerResponse> responseObserver,
boolean enableStreaming)
throws Exception {
handleSubquery(queryContext, indexSegments, timerContext, executorService);
@@ -345,33 +344,20 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
}
TimerContext.Timer segmentPruneTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
- int totalSegments = indexSegments.size();
+ int numTotalSegments = indexSegments.size();
SegmentPrunerStatistics prunerStats = new SegmentPrunerStatistics();
List<IndexSegment> selectedSegments =
_segmentPrunerService.prune(indexSegments, queryContext, prunerStats);
segmentPruneTimer.stopAndRecord();
int numSelectedSegments = selectedSegments.size();
LOGGER.debug("Matched {} segments after pruning", numSelectedSegments);
+ InstanceResponseBlock instanceResponse;
if (numSelectedSegments == 0) {
- // Only return metadata for streaming query
- DataTable dataTable;
if (queryContext.isExplain()) {
- dataTable = getExplainPlanResultsForNoMatchingSegment(totalSegments);
+ instanceResponse =
getExplainResponseForNoMatchingSegment(numTotalSegments, queryContext);
} else {
- dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext);
+ instanceResponse =
+ new
InstanceResponseBlock(ResultsBlockUtils.buildEmptyQueryResults(queryContext),
queryContext);
}
-
- Map<String, String> metadata = dataTable.getMetadata();
- metadata.put(MetadataKey.TOTAL_DOCS.getName(),
String.valueOf(numTotalDocs));
- metadata.put(MetadataKey.NUM_DOCS_SCANNED.getName(), "0");
- metadata.put(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), "0");
- metadata.put(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), "0");
- metadata.put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), "0");
- metadata.put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), "0");
- metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
"0");
- metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), "0");
- metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(),
String.valueOf(totalSegments));
- addPrunerStats(metadata, prunerStats);
- return dataTable;
} else {
TimerContext.Timer planBuildTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
Plan queryPlan =
@@ -381,40 +367,28 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
planBuildTimer.stopAndRecord();
TimerContext.Timer planExecTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
- DataTable dataTable = queryContext.isExplain() ?
processExplainPlanQueries(queryPlan) : queryPlan.execute();
+ instanceResponse = queryContext.isExplain() ?
executeExplainQuery(queryPlan, queryContext) : queryPlan.execute();
planExecTimer.stopAndRecord();
+ }
- Map<String, String> metadata = dataTable.getMetadata();
- // Update the total docs in the metadata based on the un-pruned segments
- metadata.put(MetadataKey.TOTAL_DOCS.getName(),
Long.toString(numTotalDocs));
+ // Update the total docs in the metadata based on the un-pruned segments
+ instanceResponse.addMetadata(MetadataKey.TOTAL_DOCS.getName(),
Long.toString(numTotalDocs));
- // Set the number of pruned segments. This count does not include the
segments which returned empty filters
- int prunedSegments = totalSegments - numSelectedSegments;
- metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(),
String.valueOf(prunedSegments));
- addPrunerStats(metadata, prunerStats);
+ // Set the number of pruned segments. This count does not include the
segments which returned empty filters
+ int prunedSegments = numTotalSegments - numSelectedSegments;
+
instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(),
String.valueOf(prunedSegments));
+ addPrunerStats(instanceResponse, prunerStats);
- return dataTable;
- }
+ return instanceResponse;
}
- /** @return EXPLAIN_PLAN query result {@link DataTable} when no segments get
selected for query execution.*/
- private static DataTable getExplainPlanResultsForNoMatchingSegment(int
totalNumSegments) {
- DataTableBuilder dataTableBuilder =
DataTableBuilderFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
- try {
- dataTableBuilder.startRow();
- dataTableBuilder.setColumn(0,
String.format(ExplainPlanRows.PLAN_START_FORMAT, totalNumSegments));
- dataTableBuilder.setColumn(1, ExplainPlanRows.PLAN_START_IDS);
- dataTableBuilder.setColumn(2, ExplainPlanRows.PLAN_START_IDS);
- dataTableBuilder.finishRow();
- dataTableBuilder.startRow();
- dataTableBuilder.setColumn(0,
ExplainPlanRows.ALL_SEGMENTS_PRUNED_ON_SERVER);
- dataTableBuilder.setColumn(1, 2);
- dataTableBuilder.setColumn(2, 1);
- dataTableBuilder.finishRow();
- } catch (IOException ioe) {
- LOGGER.error("Unable to create EXPLAIN PLAN result table.", ioe);
- }
- return dataTableBuilder.build();
+ private static InstanceResponseBlock
getExplainResponseForNoMatchingSegment(int numTotalSegments,
+ QueryContext queryContext) {
+ ExplainResultsBlock explainResults = new ExplainResultsBlock();
+
explainResults.addOperator(String.format(ExplainPlanRows.PLAN_START_FORMAT,
numTotalSegments),
+ ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS);
+ explainResults.addOperator(ExplainPlanRows.ALL_SEGMENTS_PRUNED_ON_SERVER,
2, 1);
+ return new InstanceResponseBlock(explainResults, queryContext);
}
/**
@@ -502,9 +476,8 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
}
}
- /** @return EXPLAIN PLAN query result {@link DataTable}. */
- public static DataTable processExplainPlanQueries(Plan queryPlan) {
- DataTableBuilder dataTableBuilder =
DataTableBuilderFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
+ public static InstanceResponseBlock executeExplainQuery(Plan queryPlan,
QueryContext queryContext) {
+ ExplainResultsBlock explainResults = new ExplainResultsBlock();
List<Operator> childOperators =
queryPlan.getPlanNode().run().getChildOperators();
assert childOperators.size() == 1;
Operator root = childOperators.get(0);
@@ -512,55 +485,35 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
int numEmptyFilterSegments = 0;
int numMatchAllFilterSegments = 0;
- try {
- // Get the list of unique explain plans
- operatorDepthToRowDataMap = getAllSegmentsUniqueExplainPlanRowData(root);
- List<ExplainPlanRows> listOfExplainPlans = new ArrayList<>();
- operatorDepthToRowDataMap.forEach((key, value) ->
listOfExplainPlans.addAll(value));
-
- // Setup the combine root's explain string
- setValueInDataTableBuilder(dataTableBuilder, root.toExplainString(), 2,
1);
-
- // Walk through all the explain plans and create the entries in the
explain plan output for each plan
- for (ExplainPlanRows explainPlanRows : listOfExplainPlans) {
- numEmptyFilterSegments +=
- explainPlanRows.isHasEmptyFilter() ?
explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
- numMatchAllFilterSegments +=
- explainPlanRows.isHasMatchAllFilter() ?
explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
- setValueInDataTableBuilder(dataTableBuilder,
- String.format(ExplainPlanRows.PLAN_START_FORMAT,
explainPlanRows.getNumSegmentsMatchingThisPlan()),
- ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS);
- for (ExplainPlanRowData explainPlanRowData :
explainPlanRows.getExplainPlanRowData()) {
- setValueInDataTableBuilder(dataTableBuilder,
explainPlanRowData.getExplainPlanString(),
- explainPlanRowData.getOperatorId(),
explainPlanRowData.getParentId());
- }
+ // Get the list of unique explain plans
+ operatorDepthToRowDataMap = getAllSegmentsUniqueExplainPlanRowData(root);
+ List<ExplainPlanRows> listOfExplainPlans = new ArrayList<>();
+ operatorDepthToRowDataMap.forEach((key, value) ->
listOfExplainPlans.addAll(value));
+
+ // Setup the combine root's explain string
+ explainResults.addOperator(root.toExplainString(), 2, 1);
+
+ // Walk through all the explain plans and create the entries in the
explain plan output for each plan
+ for (ExplainPlanRows explainPlanRows : listOfExplainPlans) {
+ numEmptyFilterSegments +=
+ explainPlanRows.isHasEmptyFilter() ?
explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
+ numMatchAllFilterSegments +=
+ explainPlanRows.isHasMatchAllFilter() ?
explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
+ explainResults.addOperator(
+ String.format(ExplainPlanRows.PLAN_START_FORMAT,
explainPlanRows.getNumSegmentsMatchingThisPlan()),
+ ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS);
+ for (ExplainPlanRowData explainPlanRowData :
explainPlanRows.getExplainPlanRowData()) {
+ explainResults.addOperator(explainPlanRowData.getExplainPlanString(),
explainPlanRowData.getOperatorId(),
+ explainPlanRowData.getParentId());
}
- } catch (IOException ioe) {
- LOGGER.error("Unable to create EXPLAIN PLAN result table.", ioe);
}
- DataTable dataTable = dataTableBuilder.build();
- dataTable.getMetadata()
- .put(MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName(),
String.valueOf(numEmptyFilterSegments));
-
dataTable.getMetadata().put(MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName(),
+ InstanceResponseBlock instanceResponse = new
InstanceResponseBlock(explainResults, queryContext);
+
instanceResponse.addMetadata(MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName(),
+ String.valueOf(numEmptyFilterSegments));
+
instanceResponse.addMetadata(MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName(),
String.valueOf(numMatchAllFilterSegments));
- return dataTable;
- }
-
- /**
- * Set the value for the explain plan fields in the DataTableBuilder
- */
- private static void setValueInDataTableBuilder(DataTableBuilder
dataTableBuilder, String explainPlanString,
- int operatorId, int parentId)
- throws IOException {
- if (explainPlanString != null) {
- // Only those operators that return a non-null description will be added
to the EXPLAIN PLAN output.
- dataTableBuilder.startRow();
- dataTableBuilder.setColumn(0, explainPlanString);
- dataTableBuilder.setColumn(1, operatorId);
- dataTableBuilder.setColumn(2, parentId);
- dataTableBuilder.finishRow();
- }
+ return instanceResponse;
}
/**
@@ -626,16 +579,19 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
// Execute the subquery
subquery.setEndTimeMs(endTimeMs);
// Make a clone of indexSegments because the method might modify the list
- DataTable dataTable =
- processQuery(new ArrayList<>(indexSegments), subquery, timerContext,
executorService, null, false);
- DataTable.CustomObject idSet = dataTable.getCustomObject(0, 0);
- Preconditions.checkState(idSet != null && idSet.getType() ==
ObjectSerDeUtils.ObjectType.IdSet.getValue(),
- "Result is not an IdSet");
- String serializedIdSet =
- new String(Base64.getEncoder().encode(idSet.getBuffer()).array(),
StandardCharsets.ISO_8859_1);
+ InstanceResponseBlock instanceResponse =
+ executeInternal(new ArrayList<>(indexSegments), subquery,
timerContext, executorService, null, false);
+ BaseResultsBlock resultsBlock = instanceResponse.getResultsBlock();
+ Preconditions.checkState(resultsBlock instanceof AggregationResultsBlock,
+ "Got unexpected results block type: %s, expecting aggregation
results",
+ resultsBlock != null ? resultsBlock.getClass().getSimpleName() :
null);
+ Object result = ((AggregationResultsBlock)
resultsBlock).getResults().get(0);
+ Preconditions.checkState(result instanceof IdSet, "Got unexpected result
type: %s, expecting IdSet",
+ result != null ? result.getClass().getSimpleName() : null);
// Rewrite the expression
function.setFunctionName(TransformFunctionType.INIDSET.name());
- arguments.set(1,
ExpressionContext.forLiteralContext(FieldSpec.DataType.STRING,
serializedIdSet));
+ arguments.set(1,
+ ExpressionContext.forLiteralContext(FieldSpec.DataType.STRING,
((IdSet) result).toBase64String()));
} else {
for (ExpressionContext argument : arguments) {
handleSubquery(argument, indexSegments, timerContext, executorService,
endTimeMs);
@@ -643,9 +599,12 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
}
}
- private void addPrunerStats(Map<String, String> metadata,
SegmentPrunerStatistics prunerStats) {
- metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(),
String.valueOf(prunerStats.getInvalidSegments()));
- metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(),
String.valueOf(prunerStats.getLimitPruned()));
- metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(),
String.valueOf(prunerStats.getValuePruned()));
+ private void addPrunerStats(InstanceResponseBlock instanceResponse,
SegmentPrunerStatistics prunerStats) {
+
instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(),
+ String.valueOf(prunerStats.getInvalidSegments()));
+
instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(),
+ String.valueOf(prunerStats.getLimitPruned()));
+
instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(),
+ String.valueOf(prunerStats.getValuePruned()));
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 0ce0046a99..d1d2b4ec23 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAccumulator;
import javax.annotation.Nullable;
-import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerGauge;
@@ -38,7 +37,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.response.ProcessingException;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.TimerContext;
@@ -145,59 +144,58 @@ public abstract class QueryScheduler {
@Nullable
protected byte[] processQueryAndSerialize(ServerQueryRequest queryRequest,
ExecutorService executorService) {
_latestQueryTime.accumulate(System.currentTimeMillis());
- DataTable dataTable;
+ InstanceResponseBlock instanceResponse;
try {
- dataTable = _queryExecutor.processQuery(queryRequest, executorService);
+ instanceResponse = _queryExecutor.execute(queryRequest, executorService);
} catch (Exception e) {
LOGGER.error("Encountered exception while processing requestId {} from
broker {}", queryRequest.getRequestId(),
queryRequest.getBrokerId(), e);
// For not handled exceptions
_serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
- dataTable = DataTableBuilderUtils.getEmptyDataTable();
-
dataTable.addException(QueryException.getException(QueryException.INTERNAL_ERROR,
e));
+ instanceResponse = new InstanceResponseBlock();
+
instanceResponse.addException(QueryException.getException(QueryException.INTERNAL_ERROR,
e));
}
long requestId = queryRequest.getRequestId();
- Map<String, String> dataTableMetadata = dataTable.getMetadata();
- dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
+ Map<String, String> responseMetadata =
instanceResponse.getResponseMetadata();
+ responseMetadata.put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
- byte[] responseBytes = serializeDataTable(queryRequest, dataTable);
+ byte[] responseBytes = serializeResponse(queryRequest, instanceResponse);
// Log the statistics
String tableNameWithType = queryRequest.getTableNameWithType();
long numDocsScanned =
-
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.NUM_DOCS_SCANNED.getName(),
INVALID_NUM_SCANNED));
+
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.NUM_DOCS_SCANNED.getName(),
INVALID_NUM_SCANNED));
long numEntriesScannedInFilter = Long.parseLong(
-
dataTableMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(),
INVALID_NUM_SCANNED));
+
responseMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(),
INVALID_NUM_SCANNED));
long numEntriesScannedPostFilter = Long.parseLong(
-
dataTableMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(),
INVALID_NUM_SCANNED));
+
responseMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(),
INVALID_NUM_SCANNED));
long numSegmentsProcessed = Long.parseLong(
-
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(),
INVALID_SEGMENTS_COUNT));
+
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(),
INVALID_SEGMENTS_COUNT));
long numSegmentsMatched = Long.parseLong(
-
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(),
INVALID_SEGMENTS_COUNT));
+
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(),
INVALID_SEGMENTS_COUNT));
long numSegmentsPrunedInvalid = Long.parseLong(
-
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(),
INVALID_SEGMENTS_COUNT));
+
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(),
INVALID_SEGMENTS_COUNT));
long numSegmentsPrunedByLimit = Long.parseLong(
-
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(),
INVALID_SEGMENTS_COUNT));
+
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(),
INVALID_SEGMENTS_COUNT));
long numSegmentsPrunedByValue = Long.parseLong(
-
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(),
INVALID_SEGMENTS_COUNT));
+
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(),
INVALID_SEGMENTS_COUNT));
long numSegmentsConsuming = Long.parseLong(
-
dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
INVALID_SEGMENTS_COUNT));
+
responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
INVALID_SEGMENTS_COUNT));
long numConsumingSegmentsProcessed = Long.parseLong(
-
dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
INVALID_SEGMENTS_COUNT));
+
responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
INVALID_SEGMENTS_COUNT));
long numConsumingSegmentsMatched = Long.parseLong(
-
dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(),
INVALID_SEGMENTS_COUNT));
+
responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(),
INVALID_SEGMENTS_COUNT));
long minConsumingFreshnessMs = Long.parseLong(
-
dataTableMetadata.getOrDefault(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
INVALID_FRESHNESS_MS));
+
responseMetadata.getOrDefault(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
INVALID_FRESHNESS_MS));
int numResizes =
-
Integer.parseInt(dataTableMetadata.getOrDefault(MetadataKey.NUM_RESIZES.getName(),
INVALID_NUM_RESIZES));
+
Integer.parseInt(responseMetadata.getOrDefault(MetadataKey.NUM_RESIZES.getName(),
INVALID_NUM_RESIZES));
long resizeTimeMs =
-
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.RESIZE_TIME_MS.getName(),
INVALID_RESIZE_TIME_MS));
- long threadCpuTimeNs =
-
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(),
"0"));
+
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.RESIZE_TIME_MS.getName(),
INVALID_RESIZE_TIME_MS));
+ long threadCpuTimeNs =
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(),
"0"));
long systemActivitiesCpuTimeNs =
-
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(),
"0"));
+
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(),
"0"));
long responseSerializationCpuTimeNs =
-
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(),
"0"));
+
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(),
"0"));
long totalCpuTimeNs = threadCpuTimeNs + systemActivitiesCpuTimeNs +
responseSerializationCpuTimeNs;
if (numDocsScanned > 0) {
@@ -311,20 +309,20 @@ public abstract class QueryScheduler {
}
/**
- * Serialize the DataTable response for query request
+ * Serialize the instance response for query request
* @param queryRequest Server query request for which response is serialized
- * @param dataTable DataTable to serialize
+ * @param instanceResponse instance response to serialize
* @return serialized response bytes
*/
@Nullable
- private byte[] serializeDataTable(ServerQueryRequest queryRequest, DataTable
dataTable) {
+ private byte[] serializeResponse(ServerQueryRequest queryRequest,
InstanceResponseBlock instanceResponse) {
TimerContext timerContext = queryRequest.getTimerContext();
TimerContext.Timer responseSerializationTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.RESPONSE_SERIALIZATION);
byte[] responseByte = null;
try {
- responseByte = dataTable.toBytes();
+ responseByte = instanceResponse.toDataTable().toBytes();
} catch (Exception e) {
_serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS,
1);
LOGGER.error("Caught exception while serializing response for requestId:
{}, brokerId: {}",
@@ -344,12 +342,9 @@ public abstract class QueryScheduler {
*/
protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest
queryRequest,
ProcessingException error) {
- DataTable result = DataTableBuilderUtils.getEmptyDataTable();
-
- Map<String, String> dataTableMetadata = result.getMetadata();
- dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(),
Long.toString(queryRequest.getRequestId()));
-
- result.addException(error);
- return Futures.immediateFuture(serializeDataTable(queryRequest, result));
+ InstanceResponseBlock instanceResponse = new InstanceResponseBlock();
+ instanceResponse.addMetadata(MetadataKey.REQUEST_ID.getName(),
Long.toString(queryRequest.getRequestId()));
+ instanceResponse.addException(error);
+ return Futures.immediateFuture(serializeResponse(queryRequest,
instanceResponse));
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 71dbe1127f..649e377942 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.query.selection;
+import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
@@ -231,11 +232,11 @@ public class SelectionOperatorUtils {
* @param dataSchema data schema.
* @param nullHandlingEnabled whether null handling is enabled.
* @return data table.
- * @throws Exception
+ * @throws IOException
*/
public static DataTable getDataTableFromRows(Collection<Object[]> rows,
DataSchema dataSchema,
boolean nullHandlingEnabled)
- throws Exception {
+ throws IOException {
ColumnDataType[] storedColumnDataTypes =
dataSchema.getStoredColumnDataTypes();
int numColumns = storedColumnDataTypes.length;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index 886dd68446..9b5031c46b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -46,7 +46,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.request.InstanceRequest;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.server.access.AccessControl;
@@ -155,7 +155,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
long requestId = instanceRequest != null ?
instanceRequest.getRequestId() : 0;
LOGGER.error("Exception while processing instance request: {}",
hexString, e);
sendErrorResponse(ctx, requestId, tableNameWithType, queryArrivalTimeMs,
- DataTableBuilderUtils.getEmptyDataTable(), e);
+ DataTableBuilderFactory.getEmptyDataTable(), e);
}
}
@@ -175,9 +175,9 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
}
_queryFuturesById.put(queryId, future);
}
- Futures
- .addCallback(future, createCallback(ctx, tableNameWithType,
queryArrivalTimeMs, instanceRequest, queryRequest),
- MoreExecutors.directExecutor());
+ Futures.addCallback(future,
+ createCallback(ctx, tableNameWithType, queryArrivalTimeMs,
instanceRequest, queryRequest),
+ MoreExecutors.directExecutor());
}
private FutureCallback<byte[]> createCallback(ChannelHandlerContext ctx,
String tableNameWithType,
@@ -198,7 +198,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
} else {
// Send exception response.
sendErrorResponse(ctx, queryRequest.getRequestId(),
tableNameWithType, queryArrivalTimeMs,
- DataTableBuilderUtils.getEmptyDataTable(), new Exception("Null
query response."));
+ DataTableBuilderFactory.getEmptyDataTable(), new Exception("Null
query response."));
}
}
@@ -225,7 +225,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
e = new Exception(t);
}
sendErrorResponse(ctx, instanceRequest.getRequestId(),
tableNameWithType, queryArrivalTimeMs,
- DataTableBuilderUtils.getEmptyDataTable(), e);
+ DataTableBuilderFactory.getEmptyDataTable(), e);
}
};
}
@@ -236,7 +236,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
// will only be called if for some remote reason we are unable to handle
exceptions in channelRead0.
String message = "Unhandled Exception in " + getClass().getCanonicalName();
LOGGER.error(message, cause);
- sendErrorResponse(ctx, 0, null, System.currentTimeMillis(),
DataTableBuilderUtils.getEmptyDataTable(),
+ sendErrorResponse(ctx, 0, null, System.currentTimeMillis(),
DataTableBuilderFactory.getEmptyDataTable(),
new Exception(message, cause));
}
@@ -282,8 +282,8 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
Map<String, String> dataTableMetadata = dataTable.getMetadata();
dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
if (cancelled) {
- dataTable.addException(QueryException
- .getException(QueryException.QUERY_CANCELLATION_ERROR, "Query
cancelled on: " + _instanceName));
+
dataTable.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR,
+ "Query cancelled on: " + _instanceName));
} else {
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index 579f0691a5..f92a5e23f0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -40,6 +40,7 @@ import org.apache.pinot.common.proto.PinotQueryServerGrpc;
import org.apache.pinot.common.proto.Server.ServerRequest;
import org.apache.pinot.common.proto.Server.ServerResponse;
import org.apache.pinot.common.utils.TlsUtils;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -68,8 +69,7 @@ public class GrpcQueryServer extends
PinotQueryServerGrpc.PinotQueryServerImplBa
if (tlsConfig != null) {
try {
_server =
NettyServerBuilder.forPort(port).sslContext(buildGRpcSslContext(tlsConfig))
- .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
- .addService(this).build();
+
.maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()).addService(this).build();
} catch (Exception e) {
throw new RuntimeException("Failed to start secure grpcQueryServer",
e);
}
@@ -144,9 +144,9 @@ public class GrpcQueryServer extends
PinotQueryServerGrpc.PinotQueryServerImplBa
}
// Process the query
- DataTable dataTable;
+ InstanceResponseBlock instanceResponse;
try {
- dataTable = _queryExecutor.processQuery(queryRequest, _executorService,
responseObserver);
+ instanceResponse = _queryExecutor.execute(queryRequest,
_executorService, responseObserver);
} catch (Exception e) {
LOGGER.error("Caught exception while processing request {}: {} from
broker: {}", queryRequest.getRequestId(),
queryRequest.getQueryContext(), queryRequest.getBrokerId(), e);
@@ -155,18 +155,19 @@ public class GrpcQueryServer extends
PinotQueryServerGrpc.PinotQueryServerImplBa
return;
}
- ServerResponse response;
+ ServerResponse serverResponse;
try {
- response = queryRequest.isEnableStreaming() ?
StreamingResponseUtils.getMetadataResponse(dataTable)
+ DataTable dataTable = instanceResponse.toDataTable();
+ serverResponse = queryRequest.isEnableStreaming() ?
StreamingResponseUtils.getMetadataResponse(dataTable)
: StreamingResponseUtils.getNonStreamingResponse(dataTable);
} catch (Exception e) {
- LOGGER.error("Caught exception while constructing response from data
table for request {}: {} from broker: {}",
+ LOGGER.error("Caught exception while serializing response for request
{}: {} from broker: {}",
queryRequest.getRequestId(), queryRequest.getQueryContext(),
queryRequest.getBrokerId(), e);
_serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS,
1);
responseObserver.onError(Status.INTERNAL.withCause(e).asException());
return;
}
- responseObserver.onNext(response);
+ responseObserver.onNext(serverResponse);
responseObserver.onCompleted();
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index 86f28d1f6b..0040e842a8 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -108,7 +108,7 @@ public class DataTableSerDeTest {
QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
exception);
String expected = processingException.getMessage();
- DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
+ DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
dataTable.addException(processingException);
DataTable newDataTable =
DataTableFactory.getDataTable(dataTable.toBytes());
Assert.assertNull(newDataTable.getDataSchema());
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java
similarity index 70%
rename from
pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtilsTest.java
rename to
pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java
index 7ef912bd6d..75fb625a94 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtilsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java
@@ -16,12 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.common.datatable;
+package org.apache.pinot.core.operator.blocks.results;
import java.io.IOException;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -32,27 +31,28 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-public class DataTableBuilderUtilsTest {
+public class ResultsBlockUtilsTest {
@Test
- public void testBuildEmptyDataTable()
+ public void testBuildEmptyQueryResults()
throws IOException {
// Selection
QueryContext queryContext =
QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable WHERE foo =
'bar'");
- DataTable dataTable =
DataTableBuilderUtils.buildEmptyDataTable(queryContext);
+ DataTable dataTable =
ResultsBlockUtils.buildEmptyQueryResults(queryContext).getDataTable(queryContext);
DataSchema dataSchema = dataTable.getDataSchema();
assertEquals(dataSchema.getColumnNames(), new String[]{"*"});
- assertEquals(dataSchema.getColumnDataTypes(), new
ColumnDataType[]{ColumnDataType.STRING});
+ assertEquals(dataSchema.getColumnDataTypes(), new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
assertEquals(dataTable.getNumberOfRows(), 0);
// Aggregation
queryContext =
QueryContextConverterUtils.getQueryContext("SELECT COUNT(*), SUM(a),
MAX(b) FROM testTable WHERE foo = 'bar'");
- dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext);
+ dataTable =
ResultsBlockUtils.buildEmptyQueryResults(queryContext).getDataTable(queryContext);
dataSchema = dataTable.getDataSchema();
assertEquals(dataSchema.getColumnNames(), new String[]{"count_star",
"sum_a", "max_b"});
- assertEquals(dataSchema.getColumnDataTypes(),
- new ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.DOUBLE,
ColumnDataType.DOUBLE});
+ assertEquals(dataSchema.getColumnDataTypes(), new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
DataSchema.ColumnDataType.DOUBLE
+ });
assertEquals(dataTable.getNumberOfRows(), 1);
assertEquals(dataTable.getLong(0, 0), 0L);
assertEquals(dataTable.getDouble(0, 1), 0.0);
@@ -61,20 +61,21 @@ public class DataTableBuilderUtilsTest {
// Group-by
queryContext = QueryContextConverterUtils.getQueryContext(
"SELECT c, d, COUNT(*), SUM(a), MAX(b) FROM testTable WHERE foo =
'bar' GROUP BY c, d");
- dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext);
+ dataTable =
ResultsBlockUtils.buildEmptyQueryResults(queryContext).getDataTable(queryContext);
dataSchema = dataTable.getDataSchema();
assertEquals(dataSchema.getColumnNames(), new String[]{"c", "d",
"count(*)", "sum(a)", "max(b)"});
- assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{
- ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.LONG,
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
+ assertEquals(dataSchema.getColumnDataTypes(), new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.LONG,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE
});
assertEquals(dataTable.getNumberOfRows(), 0);
// Distinct
queryContext = QueryContextConverterUtils.getQueryContext("SELECT DISTINCT
a, b FROM testTable WHERE foo = 'bar'");
- dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext);
+ dataTable =
ResultsBlockUtils.buildEmptyQueryResults(queryContext).getDataTable(queryContext);
dataSchema = dataTable.getDataSchema();
assertEquals(dataSchema.getColumnNames(), new String[]{"distinct_a:b"});
- assertEquals(dataSchema.getColumnDataTypes(), new
ColumnDataType[]{ColumnDataType.OBJECT});
+ assertEquals(dataSchema.getColumnDataTypes(), new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.OBJECT});
assertEquals(dataTable.getNumberOfRows(), 1);
DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0);
assertNotNull(customObject);
@@ -82,6 +83,6 @@ public class DataTableBuilderUtilsTest {
assertEquals(distinctTable.size(), 0);
assertEquals(distinctTable.getDataSchema().getColumnNames(), new
String[]{"a", "b"});
assertEquals(distinctTable.getDataSchema().getColumnDataTypes(),
- new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING});
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.STRING});
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
index d9cda7ad6d..1b3f0fa7ad 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
@@ -31,12 +31,12 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
@@ -168,7 +168,7 @@ public class QueryExecutorExceptionsTest {
String query = "SELECT COUNT(*) FROM " + TABLE_NAME;
InstanceRequest instanceRequest = new InstanceRequest(0L,
CalciteSqlCompiler.compileToBrokerRequest(query));
instanceRequest.setSearchSegments(_segmentNames);
- DataTable instanceResponse =
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+ InstanceResponseBlock instanceResponse =
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
Map<Integer, String> exceptions = instanceResponse.getExceptions();
assertTrue(exceptions.containsKey(QueryException.SERVER_SEGMENT_MISSING_ERROR_CODE));
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index a62ec50dbb..4ccef3a050 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -28,11 +28,12 @@ import
org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
@@ -61,6 +62,8 @@ import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
public class QueryExecutorTest {
@@ -84,7 +87,7 @@ public class QueryExecutorTest {
throws Exception {
// Set up the segments
FileUtils.deleteQuietly(INDEX_DIR);
- Assert.assertTrue(INDEX_DIR.mkdirs());
+ assertTrue(INDEX_DIR.mkdirs());
URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
Assert.assertNotNull(resourceUrl);
File avroFile = new File(resourceUrl.getFile());
@@ -160,8 +163,9 @@ public class QueryExecutorTest {
String query = "SELECT COUNT(*) FROM " + TABLE_NAME;
InstanceRequest instanceRequest = new InstanceRequest(0L,
CalciteSqlCompiler.compileToBrokerRequest(query));
instanceRequest.setSearchSegments(_segmentNames);
- DataTable instanceResponse =
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
- Assert.assertEquals(instanceResponse.getLong(0, 0), 400002L);
+ InstanceResponseBlock instanceResponse =
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+ assertTrue(instanceResponse.getResultsBlock() instanceof
AggregationResultsBlock);
+ assertEquals(((AggregationResultsBlock)
instanceResponse.getResultsBlock()).getResults().get(0), 400002L);
}
@Test
@@ -169,8 +173,9 @@ public class QueryExecutorTest {
String query = "SELECT SUM(met) FROM " + TABLE_NAME;
InstanceRequest instanceRequest = new InstanceRequest(0L,
CalciteSqlCompiler.compileToBrokerRequest(query));
instanceRequest.setSearchSegments(_segmentNames);
- DataTable instanceResponse =
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
- Assert.assertEquals(instanceResponse.getDouble(0, 0), 40000200000.0);
+ InstanceResponseBlock instanceResponse =
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+ assertTrue(instanceResponse.getResultsBlock() instanceof
AggregationResultsBlock);
+ assertEquals(((AggregationResultsBlock)
instanceResponse.getResultsBlock()).getResults().get(0), 40000200000.0);
}
@Test
@@ -178,8 +183,9 @@ public class QueryExecutorTest {
String query = "SELECT MAX(met) FROM " + TABLE_NAME;
InstanceRequest instanceRequest = new InstanceRequest(0L,
CalciteSqlCompiler.compileToBrokerRequest(query));
instanceRequest.setSearchSegments(_segmentNames);
- DataTable instanceResponse =
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
- Assert.assertEquals(instanceResponse.getDouble(0, 0), 200000.0);
+ InstanceResponseBlock instanceResponse =
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+ assertTrue(instanceResponse.getResultsBlock() instanceof
AggregationResultsBlock);
+ assertEquals(((AggregationResultsBlock)
instanceResponse.getResultsBlock()).getResults().get(0), 200000.0);
}
@Test
@@ -187,8 +193,9 @@ public class QueryExecutorTest {
String query = "SELECT MIN(met) FROM " + TABLE_NAME;
InstanceRequest instanceRequest = new InstanceRequest(0L,
CalciteSqlCompiler.compileToBrokerRequest(query));
instanceRequest.setSearchSegments(_segmentNames);
- DataTable instanceResponse =
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
- Assert.assertEquals(instanceResponse.getDouble(0, 0), 0.0);
+ InstanceResponseBlock instanceResponse =
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+ assertTrue(instanceResponse.getResultsBlock() instanceof
AggregationResultsBlock);
+ assertEquals(((AggregationResultsBlock)
instanceResponse.getResultsBlock()).getResults().get(0), 0.0);
}
@AfterClass
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index 527744b795..01e5920aa8 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -44,8 +44,8 @@ import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Server;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import
org.apache.pinot.core.query.scheduler.resources.PolicyBasedResourceManager;
@@ -297,7 +297,7 @@ public class PrioritySchedulerTest {
}
@Override
- public DataTable processQuery(ServerQueryRequest queryRequest,
ExecutorService executorService,
+ public InstanceResponseBlock execute(ServerQueryRequest queryRequest,
ExecutorService executorService,
@Nullable StreamObserver<Server.ServerResponse> responseObserver) {
if (_useBarrier) {
try {
@@ -306,8 +306,8 @@ public class PrioritySchedulerTest {
throw new RuntimeException(e);
}
}
- DataTable result = DataTableBuilderUtils.getEmptyDataTable();
- result.getMetadata().put(MetadataKey.TABLE.getName(),
queryRequest.getTableNameWithType());
+ InstanceResponseBlock instanceResponse = new InstanceResponseBlock();
+ instanceResponse.addMetadata(MetadataKey.TABLE.getName(),
queryRequest.getTableNameWithType());
if (_useBarrier) {
try {
_validationBarrier.await();
@@ -316,7 +316,7 @@ public class PrioritySchedulerTest {
}
}
_numQueries.countDown();
- return result;
+ return instanceResponse;
}
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index a6deca642e..9fdaf7fb7d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -27,7 +27,7 @@ import
org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.server.access.AccessControl;
@@ -85,7 +85,7 @@ public class QueryRoutingTest {
public void testValidResponse()
throws Exception {
long requestId = 123;
- DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
+ DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
@@ -163,7 +163,7 @@ public class QueryRoutingTest {
public void testNonMatchingRequestId()
throws Exception {
long requestId = 123;
- DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
+ DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
@@ -196,7 +196,7 @@ public class QueryRoutingTest {
// To avoid flakyness, set timeoutMs to 2000 msec. For some test runs, it
can take up to
// 1400 msec to mark request as failed.
long timeoutMs = 2000L;
- DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
+ DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index d449ec5f8b..e44d40d70f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -32,6 +32,7 @@ import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.plan.Plan;
import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.plan.maker.PlanMaker;
@@ -197,10 +198,10 @@ public abstract class BaseQueriesTest {
// Server side
serverQueryContext.setEndTimeMs(System.currentTimeMillis() +
Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
Plan plan = planMaker.makeInstancePlan(getIndexSegments(),
serverQueryContext, EXECUTOR_SERVICE, null);
- DataTable instanceResponse;
+ InstanceResponseBlock instanceResponse;
try {
instanceResponse =
- queryContext.isExplain() ?
ServerQueryExecutorV1Impl.processExplainPlanQueries(plan) : plan.execute();
+ queryContext.isExplain() ?
ServerQueryExecutorV1Impl.executeExplainQuery(plan, queryContext) :
plan.execute();
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
@@ -212,7 +213,7 @@ public abstract class BaseQueriesTest {
Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
try {
// For multi-threaded BrokerReduceService, we cannot reuse the same
data-table
- byte[] serializedResponse = instanceResponse.toBytes();
+ byte[] serializedResponse = instanceResponse.toDataTable().toBytes();
dataTableMap.put(new ServerRoutingInstance("localhost", 1234,
TableType.OFFLINE),
DataTableFactory.getDataTable(serializedResponse));
dataTableMap.put(new ServerRoutingInstance("localhost", 1234,
TableType.REALTIME),
@@ -270,17 +271,17 @@ public abstract class BaseQueriesTest {
Plan plan1 = planMaker.makeInstancePlan(instances.get(0),
serverQueryContext, EXECUTOR_SERVICE, null);
Plan plan2 = planMaker.makeInstancePlan(instances.get(1),
serverQueryContext, EXECUTOR_SERVICE, null);
- DataTable instanceResponse1;
+ InstanceResponseBlock instanceResponse1;
try {
- instanceResponse1 =
- queryContext.isExplain() ?
ServerQueryExecutorV1Impl.processExplainPlanQueries(plan1) : plan1.execute();
+ instanceResponse1 = queryContext.isExplain() ?
ServerQueryExecutorV1Impl.executeExplainQuery(plan1, queryContext)
+ : plan1.execute();
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
- DataTable instanceResponse2;
+ InstanceResponseBlock instanceResponse2;
try {
- instanceResponse2 =
- queryContext.isExplain() ?
ServerQueryExecutorV1Impl.processExplainPlanQueries(plan2) : plan2.execute();
+ instanceResponse2 = queryContext.isExplain() ?
ServerQueryExecutorV1Impl.executeExplainQuery(plan2, queryContext)
+ : plan2.execute();
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
@@ -292,8 +293,8 @@ public abstract class BaseQueriesTest {
Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
try {
// For multi-threaded BrokerReduceService, we cannot reuse the same
data-table
- byte[] serializedResponse1 = instanceResponse1.toBytes();
- byte[] serializedResponse2 = instanceResponse2.toBytes();
+ byte[] serializedResponse1 = instanceResponse1.toDataTable().toBytes();
+ byte[] serializedResponse2 = instanceResponse2.toDataTable().toBytes();
dataTableMap.put(new ServerRoutingInstance("localhost", 1234,
TableType.OFFLINE),
DataTableFactory.getDataTable(serializedResponse1));
dataTableMap.put(new ServerRoutingInstance("localhost", 1234,
TableType.REALTIME),
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index ecd0745ec9..b4f8aafa1e 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -44,6 +44,7 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.ExplainPlanRows;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.reduce.BrokerReduceService;
@@ -316,11 +317,11 @@ public class ExplainPlanQueriesTest extends
BaseQueriesTest {
InstanceRequest instanceRequest1 = new InstanceRequest(0L, brokerRequest);
instanceRequest1.setSearchSegments(indexSegmentsForServer1);
- DataTable instanceResponse1 =
_queryExecutor.processQuery(getQueryRequest(instanceRequest1), QUERY_RUNNERS);
+ InstanceResponseBlock instanceResponse1 =
_queryExecutor.execute(getQueryRequest(instanceRequest1), QUERY_RUNNERS);
InstanceRequest instanceRequest2 = new InstanceRequest(0L, brokerRequest);
instanceRequest2.setSearchSegments(indexSegmentsForServer2);
- DataTable instanceResponse2 =
_queryExecutor.processQuery(getQueryRequest(instanceRequest2), QUERY_RUNNERS);
+ InstanceResponseBlock instanceResponse2 =
_queryExecutor.execute(getQueryRequest(instanceRequest2), QUERY_RUNNERS);
// Broker side
// Use 2 Threads for 2 data-tables
@@ -329,10 +330,10 @@ public class ExplainPlanQueriesTest extends
BaseQueriesTest {
Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
try {
// For multi-threaded BrokerReduceService, we cannot reuse the same
data-table
- byte[] serializedResponse1 = instanceResponse1.toBytes();
+ byte[] serializedResponse1 = instanceResponse1.toDataTable().toBytes();
dataTableMap.put(new ServerRoutingInstance("localhost", 1234,
TableType.OFFLINE),
DataTableFactory.getDataTable(serializedResponse1));
- byte[] serializedResponse2 = instanceResponse2.toBytes();
+ byte[] serializedResponse2 = instanceResponse2.toDataTable().toBytes();
dataTableMap.put(new ServerRoutingInstance("localhost", 1234,
TableType.REALTIME),
DataTableFactory.getDataTable(serializedResponse2));
} catch (Exception e) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
index 682cfa5716..807d27a1ad 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
@@ -35,11 +35,12 @@ import
org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -74,6 +75,8 @@ import org.testng.annotations.Test;
import static
org.apache.pinot.segment.local.segment.index.creator.RawIndexCreatorTest.getRandomValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
/**
@@ -107,8 +110,8 @@ public class SegmentWithNullValueVectorTest {
private static final String TABLE_NAME = "testTable";
private static final String QUERY_EXECUTOR_CONFIG_PATH =
"conf/query-executor.properties";
private static final ExecutorService QUERY_RUNNERS =
Executors.newFixedThreadPool(20);
- private int _nullIntKeyCount = 0;
- private int _longKeyCount = 0;
+ private long _nullIntKeyCount = 0;
+ private long _longKeyCount = 0;
/**
* Setup to build a segment with raw indexes (no-dictionary) of various data
types.
@@ -251,7 +254,7 @@ public class SegmentWithNullValueVectorTest {
for (int i = 0; i < NUM_ROWS; i++) {
for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
String colName = fieldSpec.getName();
- Assert.assertEquals(_actualNullVectorMap.get(colName)[i],
nullValueVectorReaderMap.get(colName).isNull(i));
+ assertEquals(_actualNullVectorMap.get(colName)[i],
nullValueVectorReaderMap.get(colName).isNull(i));
}
}
}
@@ -261,8 +264,10 @@ public class SegmentWithNullValueVectorTest {
String query = "SELECT COUNT(*) FROM " + TABLE_NAME + " where " +
INT_COLUMN + " IS NOT NULL";
InstanceRequest instanceRequest = new InstanceRequest(0L,
CalciteSqlCompiler.compileToBrokerRequest(query));
instanceRequest.setSearchSegments(_segmentNames);
- DataTable instanceResponse =
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
- Assert.assertEquals(instanceResponse.getLong(0, 0), NUM_ROWS -
_nullIntKeyCount);
+ InstanceResponseBlock instanceResponse =
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+ assertTrue(instanceResponse.getResultsBlock() instanceof
AggregationResultsBlock);
+ assertEquals(((AggregationResultsBlock)
instanceResponse.getResultsBlock()).getResults().get(0),
+ NUM_ROWS - _nullIntKeyCount);
}
@Test
@@ -270,8 +275,9 @@ public class SegmentWithNullValueVectorTest {
String query = "SELECT COUNT(*) FROM " + TABLE_NAME + " where " +
INT_COLUMN + " IS NULL";
InstanceRequest instanceRequest = new InstanceRequest(0L,
CalciteSqlCompiler.compileToBrokerRequest(query));
instanceRequest.setSearchSegments(_segmentNames);
- DataTable instanceResponse =
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
- Assert.assertEquals(instanceResponse.getLong(0, 0), _nullIntKeyCount);
+ InstanceResponseBlock instanceResponse =
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+ assertTrue(instanceResponse.getResultsBlock() instanceof
AggregationResultsBlock);
+ assertEquals(((AggregationResultsBlock)
instanceResponse.getResultsBlock()).getResults().get(0), _nullIntKeyCount);
}
@Test
@@ -281,8 +287,9 @@ public class SegmentWithNullValueVectorTest {
+ LONG_VALUE_THRESHOLD;
InstanceRequest instanceRequest = new InstanceRequest(0L,
CalciteSqlCompiler.compileToBrokerRequest(query));
instanceRequest.setSearchSegments(_segmentNames);
- DataTable instanceResponse =
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
- Assert.assertEquals(instanceResponse.getLong(0, 0), _longKeyCount);
+ InstanceResponseBlock instanceResponse =
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+ assertTrue(instanceResponse.getResultsBlock() instanceof
AggregationResultsBlock);
+ assertEquals(((AggregationResultsBlock)
instanceResponse.getResultsBlock()).getResults().get(0), _longKeyCount);
}
private ServerQueryRequest getQueryRequest(InstanceRequest instanceRequest) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index c26ea3bffe..a19658c586 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -37,6 +37,7 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.transport.ServerInstance;
@@ -139,18 +140,17 @@ public class QueryRunner {
}
}
- private BaseDataBlock processServerQuery(ServerQueryRequest
serverQueryRequest,
- ExecutorService executorService) {
+ private BaseDataBlock processServerQuery(ServerQueryRequest
serverQueryRequest, ExecutorService executorService) {
BaseDataBlock dataBlock;
try {
- DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest,
executorService, null);
- if (!dataTable.getExceptions().isEmpty()) {
+ InstanceResponseBlock instanceResponse =
_serverExecutor.execute(serverQueryRequest, executorService);
+ if (!instanceResponse.getExceptions().isEmpty()) {
// if contains exception, directly return a metadata block with the
exceptions.
- dataBlock =
DataBlockUtils.getErrorDataBlock(dataTable.getExceptions());
+ dataBlock =
DataBlockUtils.getErrorDataBlock(instanceResponse.getExceptions());
} else {
// this works because default DataTableImplV3 will have a version
number at beginning:
// the new DataBlock encodes lower 16 bits as version and upper 16
bits as type (ROW, COLUMNAR, METADATA)
- dataBlock =
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
+ dataBlock =
DataBlockUtils.getDataBlock(ByteBuffer.wrap(instanceResponse.toDataTable().toBytes()));
}
} catch (Exception e) {
dataBlock = DataBlockUtils.getErrorDataBlock(e);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]