This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 630f94da4c [multistage][bugfix] allow having clause in leaf-stage
return extra columns (#9947)
630f94da4c is described below
commit 630f94da4c333403ce9449f9783f550b39982787
Author: Rong Rong <[email protected]>
AuthorDate: Thu Dec 8 15:47:03 2022 -0800
[multistage][bugfix] allow having clause in leaf-stage return extra columns
(#9947)
* allow having clause in leaf-stage return extra columns
* fix error message
Co-authored-by: Rong Rong <[email protected]>
---
.../pinot/common/datablock/DataBlockUtils.java | 2 +-
.../pinot/common/exception/QueryException.java | 1 +
.../apache/pinot/query/runtime/QueryRunner.java | 2 +-
.../LeafStageTransferableBlockOperator.java | 32 +++++++---------------
.../LeafStageTransferableBlockOperatorTest.java | 22 ++++++++++++++-
.../src/test/resources/queries/Aggregates.json | 4 ++-
6 files changed, 37 insertions(+), 26 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
index 963d49ab74..8b41969e8e 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
@@ -50,7 +50,7 @@ public final class DataBlockUtils {
while (t.getMessage() == null) {
t = t.getCause();
}
- return QueryException.getTruncatedStackTrace(t);
+ return t.getMessage() + "\n" + QueryException.getTruncatedStackTrace(t);
}
public static MetadataBlock getErrorDataBlock(Map<Integer, String>
exceptions) {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index a4176f65fe..5b2910a300 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -174,6 +174,7 @@ public class QueryException {
return copiedProcessingException;
}
+ // TODO: getTruncatedStackTrace(Throwable) always precede by t.getMessage();
public static String getTruncatedStackTrace(Throwable t) {
StringWriter stringWriter = new StringWriter();
t.printStackTrace(new PrintWriter(stringWriter));
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 968fbb7f7b..54e733e27b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -202,7 +202,7 @@ public class QueryRunner {
} catch (Exception e) {
InstanceResponseBlock errorResponse = new InstanceResponseBlock();
errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE,
- QueryException.getTruncatedStackTrace(e));
+ e.getMessage() + QueryException.getTruncatedStackTrace(e));
return errorResponse;
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 4b24b62f3a..41f8b4535e 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -25,7 +25,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.PriorityQueue;
-import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
@@ -40,7 +39,6 @@ import
org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
-import
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -148,12 +146,10 @@ public class LeafStageTransferableBlockOperator extends
BaseOperator<Transferabl
@SuppressWarnings("ConstantConditions")
private static TransferableBlock
composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
DataSchema desiredDataSchema) {
- List<String> selectionColumns = Arrays.asList(
- ((DistinctAggregationFunction)
responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
- int[] columnIndices =
SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
- Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct
table schema for leaf stage."
- + " Expected: " + desiredDataSchema + ". Actual Columns: " +
selectionColumns
- + " Column Ordering: " + Arrays.toString(columnIndices));
+ DataSchema resultSchema = responseBlock.getDataSchema();
+
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+ resultSchema.getColumnDataTypes()), "Incompatible selection result
data schema: "
+ + " Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
}
@@ -168,13 +164,9 @@ public class LeafStageTransferableBlockOperator extends
BaseOperator<Transferabl
private static TransferableBlock
composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
DataSchema desiredDataSchema) {
DataSchema resultSchema = responseBlock.getDataSchema();
- // GROUP-BY column names conforms with selection expression
- List<String> selectionColumns =
responseBlock.getQueryContext().getSelectExpressions().stream()
- .map(e -> e.toString()).collect(Collectors.toList());
- int[] columnIndices =
SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
- Preconditions.checkState(inOrder(columnIndices), "Incompatible group by
result schema for leaf stage."
- + " Expected: " + desiredDataSchema + ". Actual: " + resultSchema
- + " Column Ordering: " + Arrays.toString(columnIndices));
+
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+ resultSchema.getColumnDataTypes()), "Incompatible selection result
data schema: "
+ + " Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
}
@@ -190,13 +182,9 @@ public class LeafStageTransferableBlockOperator extends
BaseOperator<Transferabl
private static TransferableBlock
composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
DataSchema desiredDataSchema) {
DataSchema resultSchema = responseBlock.getDataSchema();
- // AGG-ONLY column names are derived from AggFunction.getColumnName()
- List<String> selectionColumns =
Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
- a -> a.getColumnName()).collect(Collectors.toList());
- int[] columnIndices =
SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
- Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate
result schema for leaf stage."
- + " Expected: " + desiredDataSchema + ". Actual: " + resultSchema
- + " Column Ordering: " + Arrays.toString(columnIndices));
+
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+ resultSchema.getColumnDataTypes()), "Incompatible selection result
data schema: "
+ + " Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
index 60ea7c722b..587b8c5247 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
@@ -208,7 +208,7 @@ public class LeafStageTransferableBlockOperatorTest {
// Given:
QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
"SELECT intCol, count(*), sum(doubleCol), strCol FROM tbl GROUP BY
strCol, intCol");
- // result schema doesn't match with DISTINCT columns using GROUP BY.
+ // result schema doesn't match with columns ordering using GROUP BY, this
should not occur.
DataSchema schema = new DataSchema(new String[]{"intCol", "count(*)",
"sum(doubleCol)", "strCol"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.STRING});
@@ -223,6 +223,26 @@ public class LeafStageTransferableBlockOperatorTest {
Assert.assertFalse(resultBlock.isErrorBlock());
}
+ @Test
+ public void
shouldNotErrorOutWhenQueryContextAskForGroupByOutOfOrderWithHaving() {
+ // Given:
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext("SELECT strCol, intCol, count(*), "
+ + "sum(doubleCol) FROM tbl GROUP BY strCol, intCol HAVING
sum(doubleCol) < 10 AND count(*) > 0");
+ // result schema contains duplicate reference from agg and having. it will
repeat itself.
+ DataSchema schema = new DataSchema(new String[]{"strCol", "intCol",
"count(*)", "sum(doubleCol)", "sum(doubleCol)"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG,
DataSchema.ColumnDataType.LONG});
+ List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
+ new InstanceResponseBlock(new GroupByResultsBlock(schema,
Collections.emptyList()), queryContext));
+ LeafStageTransferableBlockOperator operator = new
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+
+ // When:
+ TransferableBlock resultBlock = operator.nextBlock();
+
+ // Then:
+ Assert.assertFalse(resultBlock.isErrorBlock());
+ }
+
@Test
public void shouldNotErrorOutWhenDealingWithAggregationResults() {
// Given:
diff --git a/pinot-query-runtime/src/test/resources/queries/Aggregates.json
b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
index 53b0fbf4f6..7e2f7e4e9c 100644
--- a/pinot-query-runtime/src/test/resources/queries/Aggregates.json
+++ b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
@@ -255,7 +255,9 @@
{ "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY
upper(string_col) ORDER BY upper(string_col)" },
{ "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY
upper(string_col) ORDER BY count(int_col)" },
{ "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m,
count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY
string_col) WHERE c < m" },
- { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m,
count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY
bool_col, string_col) WHERE c < m" }
+ { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m,
count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY
bool_col, string_col) WHERE c < m" },
+ { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY
upper(string_col) HAVING sum(int_col) > 0 ORDER BY upper(string_col)" },
+ { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY
upper(string_col) HAVING sum(int_col) >= 0 AND count(int_col) >= 0 ORDER BY
count(int_col)" }
]
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]