This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c5a2ec5b4a support SEMI join (#9367)
c5a2ec5b4a is described below
commit c5a2ec5b4a434c3bd986928938c235cb3edd2f2c
Author: Rong Rong <[email protected]>
AuthorDate: Tue Sep 13 01:21:51 2022 -0700
support SEMI join (#9367)
Co-authored-by: Rong Rong <[email protected]>
---
.../calcite/rel/rules/PinotQueryRuleSets.java | 3 +++
.../runtime/executor/WorkerQueryExecutor.java | 3 ++-
.../query/runtime/operator/HashJoinOperator.java | 28 +++++++---------------
.../pinot/query/runtime/QueryRunnerTest.java | 4 ++++
4 files changed, 18 insertions(+), 20 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index fcba253d20..84865be084 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -81,6 +81,9 @@ public class PinotQueryRuleSets {
// remove unnecessary sort rule
CoreRules.SORT_REMOVE,
+ // projection to SEMI JOIN.
+ CoreRules.PROJECT_TO_SEMI_JOIN,
+
// prune empty results rules
PruneEmptyRules.AGGREGATE_INSTANCE, PruneEmptyRules.FILTER_INSTANCE,
PruneEmptyRules.JOIN_LEFT_INSTANCE,
PruneEmptyRules.JOIN_RIGHT_INSTANCE,
PruneEmptyRules.PROJECT_INSTANCE, PruneEmptyRules.SORT_INSTANCE,
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 018b3dc82c..58d5371691 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -123,7 +123,8 @@ public class WorkerQueryExecutor {
BaseOperator<TransferableBlock> leftOperator = getOperator(requestId,
joinNode.getInputs().get(0), metadataMap);
BaseOperator<TransferableBlock> rightOperator = getOperator(requestId,
joinNode.getInputs().get(1), metadataMap);
return new HashJoinOperator(leftOperator,
joinNode.getInputs().get(0).getDataSchema(), rightOperator,
- joinNode.getInputs().get(1).getDataSchema(),
joinNode.getDataSchema(), joinNode.getCriteria());
+ joinNode.getInputs().get(1).getDataSchema(),
joinNode.getDataSchema(), joinNode.getCriteria(),
+ joinNode.getJoinRelType());
} else if (stageNode instanceof AggregateNode) {
AggregateNode aggregateNode = (AggregateNode) stageNode;
BaseOperator<TransferableBlock> inputOperator =
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index ed9743e88a..a3b3b215b1 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import javax.annotation.Nullable;
+import org.apache.calcite.rel.core.JoinRelType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
@@ -48,6 +49,7 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
private final HashMap<Integer, List<Object[]>> _broadcastHashTable;
private final BaseOperator<TransferableBlock> _leftTableOperator;
private final BaseOperator<TransferableBlock> _rightTableOperator;
+ private final JoinRelType _joinType;
private final DataSchema _resultSchema;
private final DataSchema _leftTableSchema;
private final DataSchema _rightTableSchema;
@@ -59,7 +61,7 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
public HashJoinOperator(BaseOperator<TransferableBlock> leftTableOperator,
DataSchema leftSchema,
BaseOperator<TransferableBlock> rightTableOperator, DataSchema
rightSchema, DataSchema outputSchema,
- List<JoinNode.JoinClause> criteria) {
+ List<JoinNode.JoinClause> criteria, JoinRelType joinType) {
_leftKeySelector = criteria.get(0).getLeftJoinKeySelector();
_rightKeySelector = criteria.get(0).getRightJoinKeySelector();
_leftTableOperator = leftTableOperator;
@@ -67,6 +69,7 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
_resultSchema = outputSchema;
_leftTableSchema = leftSchema;
_rightTableSchema = rightSchema;
+ _joinType = joinType;
_resultRowSize = _resultSchema.size();
_isHashTableBuilt = false;
_broadcastHashTable = new HashMap<>();
@@ -132,7 +135,7 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
rows.add(joinRow(leftRow, rightRow));
}
}
- return new TransferableBlock(rows, computeSchema(),
BaseDataBlock.Type.ROW);
+ return new TransferableBlock(rows, _resultSchema,
BaseDataBlock.Type.ROW);
} else if (leftBlock.isErrorBlock()) {
_upstreamErrorBlock = leftBlock;
return _upstreamErrorBlock;
@@ -147,24 +150,11 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
for (Object obj : leftRow) {
resultRow[idx++] = obj;
}
- for (Object obj : rightRow) {
- resultRow[idx++] = obj;
+ if (_joinType != JoinRelType.SEMI) {
+ for (Object obj : rightRow) {
+ resultRow[idx++] = obj;
+ }
}
return resultRow;
}
-
- private DataSchema computeSchema() {
- String[] columnNames = new String[_resultRowSize];
- DataSchema.ColumnDataType[] columnDataTypes = new
DataSchema.ColumnDataType[_resultRowSize];
- int idx = 0;
- for (int index = 0; index < _leftTableSchema.size(); index++) {
- columnNames[idx] = _leftTableSchema.getColumnName(index);
- columnDataTypes[idx++] = _leftTableSchema.getColumnDataType(index);
- }
- for (int index = 0; index < _rightTableSchema.size(); index++) {
- columnNames[idx] = _rightTableSchema.getColumnName(index);
- columnDataTypes[idx++] = _rightTableSchema.getColumnDataType(index);
- }
- return new DataSchema(columnNames, columnDataTypes);
- }
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index d2ac8c95c8..13418c4ecd 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -155,6 +155,10 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
+ " (SELECT a.col2 AS joinKey, MAX(a.col3) AS maxVal FROM a GROUP
BY a.col2) AS i "
+ " ON b.col1 = i.joinKey", 3},
+ // Sub-query with IN clause to SEMI JOIN.
+ new Object[]{"SELECT b.col1, b.col3 FROM b WHERE b.col1 IN "
+ + "(SELECT DISTINCT a.col2 FROM a WHERE a.col2 != 'foo')", 9},
+
// Aggregate query with HAVING clause, "foo" and "bar" occurred 6/2
times each and "alice" occurred 3/1 times
// numbers are cycle in (1, 42, 1, 42, 1), and (foo, bar, alice, foo,
bar)
// - COUNT(*) < 5 matches "alice" (3 times)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]