This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c5a2ec5b4a support SEMI join (#9367)
c5a2ec5b4a is described below

commit c5a2ec5b4a434c3bd986928938c235cb3edd2f2c
Author: Rong Rong <[email protected]>
AuthorDate: Tue Sep 13 01:21:51 2022 -0700

    support SEMI join (#9367)
    
    Co-authored-by: Rong Rong <[email protected]>
---
 .../calcite/rel/rules/PinotQueryRuleSets.java      |  3 +++
 .../runtime/executor/WorkerQueryExecutor.java      |  3 ++-
 .../query/runtime/operator/HashJoinOperator.java   | 28 +++++++---------------
 .../pinot/query/runtime/QueryRunnerTest.java       |  4 ++++
 4 files changed, 18 insertions(+), 20 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index fcba253d20..84865be084 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -81,6 +81,9 @@ public class PinotQueryRuleSets {
           // remove unnecessary sort rule
           CoreRules.SORT_REMOVE,
 
+          // projection to SEMI JOIN.
+          CoreRules.PROJECT_TO_SEMI_JOIN,
+
           // prune empty results rules
           PruneEmptyRules.AGGREGATE_INSTANCE, PruneEmptyRules.FILTER_INSTANCE, 
PruneEmptyRules.JOIN_LEFT_INSTANCE,
           PruneEmptyRules.JOIN_RIGHT_INSTANCE, 
PruneEmptyRules.PROJECT_INSTANCE, PruneEmptyRules.SORT_INSTANCE,
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 018b3dc82c..58d5371691 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -123,7 +123,8 @@ public class WorkerQueryExecutor {
       BaseOperator<TransferableBlock> leftOperator = getOperator(requestId, 
joinNode.getInputs().get(0), metadataMap);
       BaseOperator<TransferableBlock> rightOperator = getOperator(requestId, 
joinNode.getInputs().get(1), metadataMap);
       return new HashJoinOperator(leftOperator, 
joinNode.getInputs().get(0).getDataSchema(), rightOperator,
-          joinNode.getInputs().get(1).getDataSchema(), 
joinNode.getDataSchema(), joinNode.getCriteria());
+          joinNode.getInputs().get(1).getDataSchema(), 
joinNode.getDataSchema(), joinNode.getCriteria(),
+          joinNode.getJoinRelType());
     } else if (stageNode instanceof AggregateNode) {
       AggregateNode aggregateNode = (AggregateNode) stageNode;
       BaseOperator<TransferableBlock> inputOperator =
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index ed9743e88a..a3b3b215b1 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.common.datablock.BaseDataBlock;
@@ -48,6 +49,7 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
   private final HashMap<Integer, List<Object[]>> _broadcastHashTable;
   private final BaseOperator<TransferableBlock> _leftTableOperator;
   private final BaseOperator<TransferableBlock> _rightTableOperator;
+  private final JoinRelType _joinType;
   private final DataSchema _resultSchema;
   private final DataSchema _leftTableSchema;
   private final DataSchema _rightTableSchema;
@@ -59,7 +61,7 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
 
   public HashJoinOperator(BaseOperator<TransferableBlock> leftTableOperator, 
DataSchema leftSchema,
       BaseOperator<TransferableBlock> rightTableOperator, DataSchema 
rightSchema, DataSchema outputSchema,
-      List<JoinNode.JoinClause> criteria) {
+      List<JoinNode.JoinClause> criteria, JoinRelType joinType) {
     _leftKeySelector = criteria.get(0).getLeftJoinKeySelector();
     _rightKeySelector = criteria.get(0).getRightJoinKeySelector();
     _leftTableOperator = leftTableOperator;
@@ -67,6 +69,7 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
     _resultSchema = outputSchema;
     _leftTableSchema = leftSchema;
     _rightTableSchema = rightSchema;
+    _joinType = joinType;
     _resultRowSize = _resultSchema.size();
     _isHashTableBuilt = false;
     _broadcastHashTable = new HashMap<>();
@@ -132,7 +135,7 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
           rows.add(joinRow(leftRow, rightRow));
         }
       }
-      return new TransferableBlock(rows, computeSchema(), 
BaseDataBlock.Type.ROW);
+      return new TransferableBlock(rows, _resultSchema, 
BaseDataBlock.Type.ROW);
     } else if (leftBlock.isErrorBlock()) {
       _upstreamErrorBlock = leftBlock;
       return _upstreamErrorBlock;
@@ -147,24 +150,11 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
     for (Object obj : leftRow) {
       resultRow[idx++] = obj;
     }
-    for (Object obj : rightRow) {
-      resultRow[idx++] = obj;
+    if (_joinType != JoinRelType.SEMI) {
+      for (Object obj : rightRow) {
+        resultRow[idx++] = obj;
+      }
     }
     return resultRow;
   }
-
-  private DataSchema computeSchema() {
-    String[] columnNames = new String[_resultRowSize];
-    DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[_resultRowSize];
-    int idx = 0;
-    for (int index = 0; index < _leftTableSchema.size(); index++) {
-      columnNames[idx] = _leftTableSchema.getColumnName(index);
-      columnDataTypes[idx++] = _leftTableSchema.getColumnDataType(index);
-    }
-    for (int index = 0; index < _rightTableSchema.size(); index++) {
-      columnNames[idx] = _rightTableSchema.getColumnName(index);
-      columnDataTypes[idx++] = _rightTableSchema.getColumnDataType(index);
-    }
-    return new DataSchema(columnNames, columnDataTypes);
-  }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index d2ac8c95c8..13418c4ecd 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -155,6 +155,10 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
             + "  (SELECT a.col2 AS joinKey, MAX(a.col3) AS maxVal FROM a GROUP 
BY a.col2) AS i "
             + "  ON b.col1 = i.joinKey", 3},
 
+        // Sub-query with IN clause to SEMI JOIN.
+        new Object[]{"SELECT b.col1, b.col3 FROM b WHERE b.col1 IN "
+            + "(SELECT DISTINCT a.col2 FROM a WHERE a.col2 != 'foo')", 9},
+
         // Aggregate query with HAVING clause, "foo" and "bar" occurred 6/2 
times each and "alice" occurred 3/1 times
         // numbers are cycle in (1, 42, 1, 42, 1), and (foo, bar, alice, foo, 
bar)
         // - COUNT(*) < 5 matches "alice" (3 times)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to