fix merge join to handle case where right batch has zero records

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ba123e69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ba123e69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ba123e69

Branch: refs/heads/master
Commit: ba123e69da6bdf4977e9db151236c3903ebcc028
Parents: dcc102a
Author: Steven Phillips <[email protected]>
Authored: Wed Apr 2 22:20:25 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Sat Apr 19 18:07:11 2014 -0700

----------------------------------------------------------------------
 .../exec/physical/config/MergeJoinPOP.java      |   1 +
 .../exec/physical/impl/join/JoinStatus.java     |  20 +++-
 .../exec/physical/impl/join/MergeJoinBatch.java | 107 ++++++++++++-------
 .../exec/physical/impl/join/TestMergeJoin.java  |  55 +++++++++-
 .../resources/join/merge_join_empty_batch.json  |   1 +
 5 files changed, 135 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba123e69/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
index a9a0f6d..047bed7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
@@ -62,6 +62,7 @@ public class MergeJoinPOP extends AbstractBase{
     this.right = right;
     this.conditions = conditions;
     this.joinType = joinType;
+    Preconditions.checkArgument(joinType != JoinRelType.FULL, "Full outer join 
not currently supported");
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba123e69/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index 5a83b48..baa232e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.eigenbase.rel.JoinRelType;
 
 /**
  * The status of the current join.  Maintained outside the individually 
compiled join templates so that we can carry status across multiple schemas.
@@ -48,6 +49,8 @@ public final class JoinStatus {
   public MergeJoinBatch outputBatch;
   public SelectionVector4 sv4;
 
+  private final JoinRelType joinType;
+
   public boolean ok = true;
   private boolean initialSet = false;
   private boolean leftRepeating = false;
@@ -57,6 +60,7 @@ public final class JoinStatus {
     this.left = left;
     this.right = right;
     this.outputBatch = output;
+    this.joinType = output.getJoinType();
   }
 
   public final void ensureInitial(){
@@ -204,17 +208,25 @@ public final class JoinStatus {
   public JoinOutcome getOutcome(){
     if (!ok)
       return JoinOutcome.FAILURE;
-    if (lastLeft == IterOutcome.OK && lastRight == IterOutcome.OK)
+    if (bothMatches(IterOutcome.NONE) ||
+            (joinType == JoinRelType.INNER && eitherMatches(IterOutcome.NONE)) 
||
+            (joinType == JoinRelType.LEFT && lastLeft == IterOutcome.NONE) ||
+            (joinType == JoinRelType.RIGHT && lastRight == IterOutcome.NONE))
+      return JoinOutcome.NO_MORE_DATA;
+    if (bothMatches(IterOutcome.OK) ||
+            (eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK)))
       return JoinOutcome.BATCH_RETURNED;
-    if (eitherMatches(IterOutcome.NONE))
-      return JoinOutcome.NO_MORE_DATA;    
     if (eitherMatches(IterOutcome.OK_NEW_SCHEMA))
       return JoinOutcome.SCHEMA_CHANGED;
     if (eitherMatches(IterOutcome.NOT_YET))
       return JoinOutcome.WAITING;
     return JoinOutcome.FAILURE;
   }
-  
+
+  private boolean bothMatches(IterOutcome outcome){
+    return lastLeft == outcome && lastRight == outcome;
+  }
+
   private boolean eitherMatches(IterOutcome outcome){
     return lastLeft == outcome || lastRight == outcome;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba123e69/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 0cb2f7d..bbdfbe5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -25,7 +25,10 @@ import java.util.List;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.TypedNullConstant;
 import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -108,10 +111,14 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     }
     this.left = left;
     this.right = right;
+    this.joinType = popConfig.getJoinType();
     this.status = new JoinStatus(left, right, this);
     this.batchBuilder = new MergeJoinBatchBuilder(context, status);
-    this.joinType = popConfig.getJoinType();
-    this.conditions = popConfig.getConditions();   
+    this.conditions = popConfig.getConditions();
+  }
+
+  public JoinRelType getJoinType() {
+    return joinType;
   }
 
   @Override
@@ -319,19 +326,21 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     //////////////////////
     cg.setMappingSet(copyLeftMapping);
     int vectorId = 0;
-    for (VectorWrapper<?> vw : left) {
-      JVar vvIn = cg.declareVectorValueSetupAndMember("incomingLeft",
-                                                      new 
TypedFieldId(vw.getField().getType(), vectorId));
-      JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
-                                                       new 
TypedFieldId(vw.getField().getType(),vectorId));
-      // todo: check result of copyFromSafe and grow allocation
-      cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
-                                   .arg(copyLeftMapping.getValueReadIndex())
-                                   .arg(copyLeftMapping.getValueWriteIndex())
-                                   .arg(vvIn).eq(JExpr.FALSE))
-          ._then()
-          ._return(JExpr.FALSE);
-      ++vectorId;
+    if (status.isLeftPositionAllowed()) {
+      for (VectorWrapper<?> vw : left) {
+        JVar vvIn = cg.declareVectorValueSetupAndMember("incomingLeft",
+                                                        new 
TypedFieldId(vw.getField().getType(), vectorId));
+        JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
+                                                         new 
TypedFieldId(vw.getField().getType(),vectorId));
+        // todo: check result of copyFromSafe and grow allocation
+        cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
+                                     .arg(copyLeftMapping.getValueReadIndex())
+                                     .arg(copyLeftMapping.getValueWriteIndex())
+                                     .arg(vvIn).eq(JExpr.FALSE))
+            ._then()
+            ._return(JExpr.FALSE);
+        ++vectorId;
+      }
     }
     cg.getEvalBlock()._return(JExpr.lit(true));
 
@@ -340,19 +349,21 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     cg.setMappingSet(copyRightMappping);
 
     int rightVectorBase = vectorId;
-    for (VectorWrapper<?> vw : right) {
-      JVar vvIn = cg.declareVectorValueSetupAndMember("incomingRight",
-                                                      new 
TypedFieldId(vw.getField().getType(), vectorId - rightVectorBase));
-      JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
-                                                       new 
TypedFieldId(vw.getField().getType(),vectorId));
-      // todo: check result of copyFromSafe and grow allocation
-      cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
-                                 .arg(copyRightMappping.getValueReadIndex())
-                                 .arg(copyRightMappping.getValueWriteIndex())
-                                 .arg(vvIn).eq(JExpr.FALSE))
-          ._then()
-          ._return(JExpr.FALSE);
-      ++vectorId;
+    if (status.isRightPositionAllowed()) {
+      for (VectorWrapper<?> vw : right) {
+        JVar vvIn = cg.declareVectorValueSetupAndMember("incomingRight",
+                                                        new 
TypedFieldId(vw.getField().getType(), vectorId - rightVectorBase));
+        JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
+                                                         new 
TypedFieldId(vw.getField().getType(),vectorId));
+        // todo: check result of copyFromSafe and grow allocation
+        cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
+                                   .arg(copyRightMappping.getValueReadIndex())
+                                   .arg(copyRightMappping.getValueWriteIndex())
+                                   .arg(vvIn).eq(JExpr.FALSE))
+            ._then()
+            ._return(JExpr.FALSE);
+        ++vectorId;
+      }
     }
     cg.getEvalBlock()._return(JExpr.lit(true));
 
@@ -366,19 +377,25 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     container.clear();
     
     //estimation of joinBatchSize : max of left/right size, expanded by a 
factor of 16, which is then bounded by MAX_BATCH_SIZE.
-    int joinBatchSize = Math.min(Math.max(left.getRecordCount() , 
right.getRecordCount() ) * 16, MAX_BATCH_SIZE);
+    int leftCount = status.isLeftPositionAllowed() ? left.getRecordCount() : 0;
+    int rightCount = status.isRightPositionAllowed() ? right.getRecordCount() 
: 0;
+    int joinBatchSize = Math.min(Math.max(leftCount, rightCount) * 16, 
MAX_BATCH_SIZE);
     
-    // add fields from both batches    
-    for (VectorWrapper<?> w : left) {
-      ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), 
context.getAllocator());
-      VectorAllocator.getAllocator(outgoingVector, (int) 
Math.ceil(w.getValueVector().getBufferSize() / 
left.getRecordCount())).alloc(joinBatchSize);
-      container.add(outgoingVector);
+    // add fields from both batches
+    if (leftCount > 0) {
+      for (VectorWrapper<?> w : left) {
+        ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), 
context.getAllocator());
+        VectorAllocator.getAllocator(outgoingVector, (int) 
Math.ceil(w.getValueVector().getBufferSize() / 
left.getRecordCount())).alloc(joinBatchSize);
+        container.add(outgoingVector);
+      }
     }
 
-    for (VectorWrapper<?> w : right) {
-      ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), 
context.getAllocator());
-      VectorAllocator.getAllocator(outgoingVector, (int) 
Math.ceil(w.getValueVector().getBufferSize() / 
right.getRecordCount())).alloc(joinBatchSize);
-      container.add(outgoingVector);
+    if (rightCount > 0) {
+      for (VectorWrapper<?> w : right) {
+        ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), 
context.getAllocator());
+        VectorAllocator.getAllocator(outgoingVector, (int) 
Math.ceil(w.getValueVector().getBufferSize() / 
right.getRecordCount())).alloc(joinBatchSize);
+        container.add(outgoingVector);
+      }
     }
 
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
@@ -395,12 +412,22 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
       final LogicalExpression rightFieldExpr = condition.getRight();
 
       // materialize value vector readers from join expression
-      final LogicalExpression materializedLeftExpr = 
ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, 
context.getFunctionRegistry());
+      LogicalExpression materializedLeftExpr;
+      if (status.isLeftPositionAllowed()) {
+        materializedLeftExpr = 
ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, 
context.getFunctionRegistry());
+      } else {
+        materializedLeftExpr = new 
TypedNullConstant(Types.optional(MinorType.INT));
+      }
       if (collector.hasErrors())
         throw new ClassTransformationException(String.format(
             "Failure while trying to materialize incoming left field.  
Errors:\n %s.", collector.toErrorString()));
 
-      final LogicalExpression materializedRightExpr = 
ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector, 
context.getFunctionRegistry());
+      LogicalExpression materializedRightExpr;
+      if (status.isRightPositionAllowed()) {
+        materializedRightExpr = 
ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector, 
context.getFunctionRegistry());
+      } else {
+        materializedRightExpr = new 
TypedNullConstant(Types.optional(MinorType.INT));
+      }
       if (collector.hasErrors())
         throw new ClassTransformationException(String.format(
             "Failure while trying to materialize incoming right field.  
Errors:\n %s.", collector.toErrorString()));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba123e69/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
index 8e92181..1623b86 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
@@ -53,6 +53,7 @@ import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.vector.ValueVector;
 import org.junit.AfterClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import com.beust.jcommander.internal.Lists;
@@ -299,7 +300,29 @@ public class TestMergeJoin extends PopUnitTestBase {
   }
 
   @Test
-  public void testMergeJoinEmptyBatch() throws Exception {
+  public void testMergeJoinInnerEmptyBatch() throws Exception {
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
+        DrillClient client = new DrillClient(CONFIG, 
serviceSet.getCoordinator());) {
+
+      bit1.run();
+      client.connect();
+      List<QueryResultBatch> results = 
client.runQuery(UserProtos.QueryType.PHYSICAL,
+              
Files.toString(FileUtils.getResourceAsFile("/join/merge_join_empty_batch.json"),
+                      Charsets.UTF_8)
+                      .replace("${JOIN_TYPE}", "INNER"));
+      int count = 0;
+      for(QueryResultBatch b : results) {
+        if (b.getHeader().getRowCount() != 0)
+          count += b.getHeader().getRowCount();
+      }
+      assertEquals(0, count);
+    }
+  }
+
+  @Test
+  public void testMergeJoinLeftEmptyBatch() throws Exception {
     RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
 
     try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet);      
@@ -309,7 +332,30 @@ public class TestMergeJoin extends PopUnitTestBase {
       client.connect();
       List<QueryResultBatch> results = 
client.runQuery(UserProtos.QueryType.PHYSICAL,
           
Files.toString(FileUtils.getResourceAsFile("/join/merge_join_empty_batch.json"),
-              Charsets.UTF_8));
+              Charsets.UTF_8)
+              .replace("${JOIN_TYPE}", "LEFT"));
+      int count = 0;
+      for(QueryResultBatch b : results) {
+        if (b.getHeader().getRowCount() != 0)
+          count += b.getHeader().getRowCount();
+      }
+      assertEquals(50, count);
+    }
+  }
+
+  @Test
+  public void testMergeJoinRightEmptyBatch() throws Exception {
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
+        DrillClient client = new DrillClient(CONFIG, 
serviceSet.getCoordinator());) {
+
+      bit1.run();
+      client.connect();
+      List<QueryResultBatch> results = 
client.runQuery(UserProtos.QueryType.PHYSICAL,
+              
Files.toString(FileUtils.getResourceAsFile("/join/merge_join_empty_batch.json"),
+                      Charsets.UTF_8)
+                      .replace("${JOIN_TYPE}", "RIGHT"));
       int count = 0;
       for(QueryResultBatch b : results) {
         if (b.getHeader().getRowCount() != 0)
@@ -317,9 +363,8 @@ public class TestMergeJoin extends PopUnitTestBase {
       }
       assertEquals(0, count);
     }
-  }  
-  
-  
+  }
+
   @AfterClass
   public static void tearDown() throws Exception{
     // pause to get logger to catch up.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba123e69/exec/java-exec/src/test/resources/join/merge_join_empty_batch.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/join/merge_join_empty_batch.json 
b/exec/java-exec/src/test/resources/join/merge_join_empty_batch.json
index 549cb83..19254e3 100644
--- a/exec/java-exec/src/test/resources/join/merge_join_empty_batch.json
+++ b/exec/java-exec/src/test/resources/join/merge_join_empty_batch.json
@@ -36,6 +36,7 @@
       right: 1,
       left: 2,
       pop: "merge-join",
+      join-type: "${JOIN_TYPE}",
       join-conditions: [ {relationship: "==", left: "blue1", right: "blue"} ]
     },
     {

Reply via email to