Ben-Zvi closed pull request #1480: DRILL-6755: Avoid building Hash Table for 
inner/left join when probe side is empty
URL: https://github.com/apache/drill/pull/1480
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 368bb5dc91b..21dcdcf83e3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -81,7 +81,6 @@
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
 
-
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
@@ -101,7 +100,7 @@
  *   processed individually (that Build partition should be smaller than the 
original, hence likely fit whole into
  *   memory to allow probing; if not -- see below).
  *      Processing of each spilled pair is EXACTLY like processing the 
original Build/Probe incomings. (As a fact,
- *   the {@Link #innerNext() innerNext} method calls itself recursively !!). 
Thus the spilled build partition is
+ *   the {@link #innerNext()} method calls itself recursively !!). Thus the 
spilled build partition is
  *   read and divided into new partitions, which in turn may spill again (and 
again...).
  *   The code tracks these spilling "cycles". Normally any such "again" (i.e. 
cycle of 2 or greater) is a waste,
  *   indicating that the number of partitions chosen was too small.
@@ -116,6 +115,9 @@
 
   // Join type, INNER, LEFT, RIGHT or OUTER
   private final JoinRelType joinType;
+  private boolean joinIsLeftOrFull;
+  private boolean joinIsRightOrFull;
+  private boolean skipHashTableBuild; // when outer side is empty, and the 
join is inner or left (see DRILL-6755)
 
   // Join conditions
   private final List<JoinCondition> conditions;
@@ -131,8 +133,6 @@
   private final Set<String> buildJoinColumns;
 
   // Fields used for partitioning
-
-  private long maxIncomingBatchSize;
   /**
    * The number of {@link HashPartition}s. This is configured via a system 
option and set in {@link #partitionNumTuning(int, 
HashJoinMemoryCalculator.BuildSidePartitioning)}.
    */
@@ -264,6 +264,8 @@ protected void buildSchema() throws SchemaChangeException {
         buildSchema = right.getSchema();
         // position of the new "column" for keeping the hash values (after the 
real columns)
         rightHVColPosition = right.getContainer().getNumberOfColumns();
+        // In special cases, when the probe side is empty, and inner/left join 
- no need for Hash Table
+        skipHashTableBuild = leftUpstream == IterOutcome.NONE && ! 
joinIsRightOrFull;
         // We only need the hash tables if we have data on the build side.
         setupHashTable();
       }
@@ -447,12 +449,12 @@ public IterOutcome innerNext() {
 
       // Try to probe and project, or recursively handle a spilled partition
       if (!buildSideIsEmpty.booleanValue() ||  // If there are build-side rows
-        joinType != JoinRelType.INNER) {  // or if this is a left/full outer 
join
+          joinIsLeftOrFull) {  // or if this is a left/full outer join
 
         prefetchFirstProbeBatch();
 
         if (leftUpstream.isError() ||
-            ( leftUpstream == NONE && joinType != JoinRelType.FULL && joinType 
!= JoinRelType.RIGHT )) {
+            ( leftUpstream == NONE && ! joinIsRightOrFull )) {
           // A termination condition was reached while prefetching the first 
probe side data holding batch.
           // We need to terminate.
           return leftUpstream;
@@ -568,19 +570,7 @@ public IterOutcome innerNext() {
 
       } else {
         // Our build side is empty, we won't have any matches, clear the probe 
side
-        if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == 
IterOutcome.OK) {
-          for (final VectorWrapper<?> wrapper : probeBatch) {
-            wrapper.getValueVector().clear();
-          }
-          probeBatch.kill(true);
-          leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
-          while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == 
IterOutcome.OK) {
-            for (final VectorWrapper<?> wrapper : probeBatch) {
-              wrapper.getValueVector().clear();
-            }
-            leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
-          }
-        }
+        killAndDrainLeftUpstream();
       }
 
       // No more output records, clean up and return
@@ -596,10 +586,31 @@ public IterOutcome innerNext() {
     }
   }
 
+  /**
+   *  In case an upstream data is no longer needed, send a kill and flush any 
remaining batch
+   *
+   * @param batch probe or build batch
+   * @param upstream which upstream
+   * @param isLeft is it the left or right
+   */
+  private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, 
boolean isLeft) {
+      batch.kill(true);
+      while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == 
IterOutcome.OK) {
+        for (final VectorWrapper<?> wrapper : batch) {
+          wrapper.getValueVector().clear();
+        }
+        upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : 
HashJoinHelper.RIGHT_INPUT, batch);
+      }
+  }
+  private void killAndDrainLeftUpstream() { killAndDrainUpstream(probeBatch, 
leftUpstream, true); }
+  private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, 
rightUpstream, false); }
+
   private void setupHashTable() throws SchemaChangeException {
     final List<Comparator> comparators = 
Lists.newArrayListWithExpectedSize(conditions.size());
     
conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
 
+    if ( skipHashTableBuild ) { return; }
+
     // Setup the hash table configuration object
     List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
 
@@ -819,6 +830,11 @@ public IterOutcome executeBuildPhase() throws 
SchemaChangeException {
       return null;
     }
 
+    if ( skipHashTableBuild ) { // No hash table needed - then consume all the 
right upstream
+      killAndDrainRightUpstream();
+      return null;
+    }
+
     HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
     boolean firstCycle = cycleNum == 0;
 
@@ -1013,7 +1029,7 @@ private void setupOutputContainerSchema() {
         final MajorType outputType;
         // If left or full outer join, then the output type must be nullable. 
However, map types are
         // not nullable so we must exclude them from the check below (see 
DRILL-2197).
-        if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) && 
inputType.getMode() == DataMode.REQUIRED
+        if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED
             && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
           outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
         } else {
@@ -1034,7 +1050,7 @@ private void setupOutputContainerSchema() {
 
         // If right or full outer join then the output type should be 
optional. However, map types are
         // not nullable so we must exclude them from the check below (see 
DRILL-2771, DRILL-2197).
-        if ((joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) && 
inputType.getMode() == DataMode.REQUIRED
+        if (joinIsRightOrFull && inputType.getMode() == DataMode.REQUIRED
             && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
           outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
         } else {
@@ -1074,6 +1090,8 @@ public HashJoinBatch(HashJoinPOP popConfig, 
FragmentContext context,
     this.buildBatch = right;
     this.probeBatch = left;
     joinType = popConfig.getJoinType();
+    joinIsLeftOrFull  = joinType == JoinRelType.LEFT  || joinType == 
JoinRelType.FULL;
+    joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType == 
JoinRelType.FULL;
     conditions = popConfig.getConditions();
     this.popConfig = popConfig;
     rightExpr = new ArrayList<>(conditions.size());
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index e7fa4e6b57b..486fb1e1a99 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -113,7 +113,7 @@ protected boolean prefetchFirstBatchFromBothSides() {
     return verifyOutcomeToSetBatchState(leftUpstream, rightUpstream);
   }
 
-  /*
+  /**
    * Checks for the operator specific early terminal condition.
    * @return true if the further processing can stop.
    *         false if the further processing is needed.
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 43abd8e705d..67789465a59 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -103,7 +103,7 @@ public synchronized void enqueue(final RawFragmentBatch 
batch) throws IOExceptio
   @Override
   public void close() {
     if (!isTerminated() && context.getExecutorState().shouldContinue()) {
-      final String msg = String.format("Cleanup before finished. %d out of %d 
strams have finished", completedStreams(), fragmentCount);
+      final String msg = String.format("Cleanup before finished. %d out of %d 
streams have finished", completedStreams(), fragmentCount);
       throw  new IllegalStateException(msg);
     }
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
index 349a295114f..5beb7cbdd98 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
@@ -209,4 +209,47 @@ public void testHashJoinNoneOutcomeUninitRightSide() {
   public void testHashJoinNoneOutcomeUninitLeftSide() {
     testHashJoinOutcomes(UninitializedSide.Left, RecordBatch.IterOutcome.NONE, 
RecordBatch.IterOutcome.NONE);
   }
+
+  /**
+   * Testing for DRILL-6755: No Hash Table is built when the first probe batch 
is NONE
+   */
+  @Test
+  public void testHashJoinWhenProbeIsNONE() {
+
+    inputOutcomesLeft.add(RecordBatch.IterOutcome.NONE);
+
+    inputOutcomesRight.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomesRight.add(RecordBatch.IterOutcome.OK);
+    inputOutcomesRight.add(RecordBatch.IterOutcome.NONE);
+
+    // for the probe side input - use multiple batches (to check that they are 
all cleared/drained)
+    final List<VectorContainer> buildSideinputContainer = new ArrayList<>(5);
+    buildSideinputContainer.add(emptyInputRowSetRight.container());
+    buildSideinputContainer.add(nonEmptyInputRowSetRight.container());
+    RowSet.SingleRowSet secondInputRowSetRight = 
operatorFixture.rowSetBuilder(inputSchemaRight).addRow(456).build();
+    RowSet.SingleRowSet thirdInputRowSetRight = 
operatorFixture.rowSetBuilder(inputSchemaRight).addRow(789).build();
+    buildSideinputContainer.add(secondInputRowSetRight.container());
+    buildSideinputContainer.add(thirdInputRowSetRight.container());
+
+    final MockRecordBatch mockInputBatchRight = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext, 
buildSideinputContainer, inputOutcomesRight, batchSchemaRight);
+    final MockRecordBatch mockInputBatchLeft = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext, 
inputContainerLeft, inputOutcomesLeft, batchSchemaLeft);
+
+    List<JoinCondition> conditions = Lists.newArrayList();
+
+    conditions.add(new JoinCondition(SqlKind.EQUALS.toString(), 
FieldReference.getWithQuotedRef("leftcol"), 
FieldReference.getWithQuotedRef("rightcol")));
+
+    HashJoinPOP hjConf = new HashJoinPOP(null, null, conditions, 
JoinRelType.INNER);
+
+    HashJoinBatch hjBatch = new HashJoinBatch(hjConf, 
operatorFixture.getFragmentContext(), mockInputBatchLeft, mockInputBatchRight);
+
+    RecordBatch.IterOutcome gotOutcome = hjBatch.next();
+    assertTrue(gotOutcome == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    gotOutcome = hjBatch.next();
+    assertTrue(gotOutcome == RecordBatch.IterOutcome.NONE);
+
+    secondInputRowSetRight.clear();
+    thirdInputRowSetRight.clear();
+    buildSideinputContainer.clear();
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to