DRILL-467: support multiple "and" join conditions in merge join operator.

DRILL-467: Support multiple "and" join conditions in merge join operator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/356c2dba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/356c2dba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/356c2dba

Branch: refs/heads/master
Commit: 356c2dba979c12c1768f8aa4f1b9f58a2327b598
Parents: 3651182
Author: Jinfeng Ni <[email protected]>
Authored: Thu Mar 27 14:34:48 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Sat Mar 29 14:38:34 2014 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/join/MergeJoinBatch.java | 278 ++++++++++---------
 .../impl/join/TestMergeJoinMulCondition.java    |  71 +++++
 .../test/resources/join/mj_multi_condition.json | 129 +++++++++
 3 files changed, 347 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/356c2dba/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index e30a649..0a287b6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -20,13 +20,11 @@ package org.apache.drill.exec.physical.impl.join;
 import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.logical.data.Join;
 import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -34,12 +32,11 @@ import 
org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.HoldingContainerExpression;
 import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
-import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
 import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
@@ -51,8 +48,8 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.eigenbase.rel.JoinRelType;
 
-import com.google.common.collect.ImmutableList;
 import com.sun.codemodel.JClass;
+import com.sun.codemodel.JConditional;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JMod;
 import com.sun.codemodel.JType;
@@ -64,21 +61,7 @@ import com.sun.codemodel.JVar;
 public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
 
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class);
-  
-//  private static GeneratorMapping setup = GM("doSetup", "doSetup");
-//  private static GeneratorMapping copyLeft = GM("doSetup", "doCopyLeft");
-//  private static GeneratorMapping copyRight = GM("doSetup", "doCopyRight");
-//  private static GeneratorMapping compare = GM("doSetup", "doCompare");
-//  private static GeneratorMapping compareLeft= GM("doSetup", 
"doCompareNextLeftKey");
-//  
-//  private static final MappingSet SETUP_MAPPING = new MappingSet((String) 
null, null, setup, setup);
-//  private static final MappingSet COPY_LEFT_MAPPING = new 
MappingSet("leftIndex", "outIndex", copyLeft, copyLeft);
-//  private static final MappingSet COPY_RIGHT_MAPPING = new 
MappingSet("rightIndex", "outIndex", copyRight, copyRight);
-//  private static final MappingSet COMPARE_MAPPING = new 
MappingSet("leftIndex", "rightIndex", compare, compare);
-//  private static final MappingSet COMPARE_RIGHT_MAPPING = new 
MappingSet("rightIndex", null, compare, compare);
-//  private static final MappingSet COMPARE_LEFT_MAPPING = new 
MappingSet("leftIndex", "null", compareLeft, compareLeft);
-//  private static final MappingSet COMPARE_NEXT_LEFT_MAPPING = new 
MappingSet("nextLeftIndex", "null", compareLeft, compareLeft);
-//  
+    
   public final MappingSet setupMapping =
       new MappingSet("null", "null", 
                      GM("doSetup", "doSetup", null, null),
@@ -112,7 +95,7 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
   private final RecordBatch left;
   private final RecordBatch right;
   private final JoinStatus status;
-  private final JoinCondition condition;
+  private final List<JoinCondition> conditions;
   private final JoinRelType joinType;
   private JoinWorker worker;
   public MergeJoinBatchBuilder batchBuilder;
@@ -120,14 +103,13 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
   protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, 
RecordBatch left, RecordBatch right) {
     super(popConfig, context);
     // currently only one join condition is supported
-    assert popConfig.getConditions().size() == 1 : String.format("Merge Join 
currently only supports joins with a single condition.  This join operator was 
configured with %d condition(s).", popConfig.getConditions().size());
+    assert popConfig.getConditions().size() >= 1 : String.format("Merge Join 
currently does not support cartisian join.  This join operator was configured 
with %d condition(s).", popConfig.getConditions().size());
     this.left = left;
     this.right = right;
     this.status = new JoinStatus(left, right, this);
     this.batchBuilder = new MergeJoinBatchBuilder(context, status);
     this.joinType = popConfig.getJoinType();
-    this.condition = popConfig.getConditions().get(0);
-    
+    this.conditions = popConfig.getConditions();   
   }
 
   @Override
@@ -218,12 +200,74 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     right.kill();
   }
 
+  
+  private void generateDoCompareNextLeft(ClassGenerator<JoinWorker> cg, JVar 
incomingRecordBatch, 
+      JVar incomingLeftRecordBatch, JVar joinStatus, ErrorCollector collector) 
throws ClassTransformationException {
+    boolean nextLeftIndexDeclared = false;
+    for (JoinCondition condition : conditions) {
+      final LogicalExpression leftFieldExpr = condition.getLeft();
+
+      // materialize value vector readers from join expression
+      final LogicalExpression materializedLeftExpr = 
ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, 
context.getFunctionRegistry());
+      if (collector.hasErrors())
+        throw new ClassTransformationException(String.format(
+            "Failure while trying to materialize incoming left field.  
Errors:\n %s.", collector.toErrorString()));
+
+      // generate compareNextLeftKey()
+      ////////////////////////////////
+      cg.setMappingSet(compareLeftMapping);
+      cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), 
JExpr._this().ref(incomingLeftRecordBatch));
+  
+      if (!nextLeftIndexDeclared) {
+        // int nextLeftIndex = leftIndex + 1;
+        cg.getEvalBlock().decl(JType.parse(cg.getModel(), "int"), 
"nextLeftIndex", JExpr.direct("leftIndex").plus(JExpr.lit(1)));
+        nextLeftIndexDeclared = true;
+      } 
+      // check if the next key is in this batch
+      
cg.getEvalBlock()._if(joinStatus.invoke("isNextLeftPositionInCurrentBatch").eq(JExpr.lit(false)))
+                       ._then()
+                         ._return(JExpr.lit(-1));
+  
+      // generate VV read expressions
+      ClassGenerator.HoldingContainer compareThisLeftExprHolder = 
cg.addExpr(materializedLeftExpr, false);
+      cg.setMappingSet(compareNextLeftMapping); // change mapping from 
'leftIndex' to 'nextLeftIndex'
+      ClassGenerator.HoldingContainer compareNextLeftExprHolder = 
cg.addExpr(materializedLeftExpr, false);
+  
+      if (compareThisLeftExprHolder.isOptional()) {
+        // handle null == null
+        
cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+                              
.cand(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
+                         ._then()
+                           ._return(JExpr.lit(0));
+    
+        // handle null == !null
+        
cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+                              
.cor(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
+                         ._then()
+                           ._return(JExpr.lit(1));
+      }
+  
+      // check value equality
+  
+      LogicalExpression gh = 
FunctionGenerationHelper.getComparator(compareThisLeftExprHolder,
+        compareNextLeftExprHolder,
+        context.getFunctionRegistry());
+      HoldingContainer out = cg.addExpr(gh, false);
+      
+      //If not 0, it means not equal. We return this out value. 
+      JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
+      jc._then()._return(out.getValue());
+    }
+    
+    //Pass the equality check for all the join conditions. Finally, return 0.
+    cg.getEvalBlock()._return(JExpr.lit(0));
+    
+  }  
+  
   private JoinWorker generateNewWorker() throws ClassTransformationException, 
IOException, SchemaChangeException{
 
     final ClassGenerator<JoinWorker> cg = 
CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
     final ErrorCollector collector = new ErrorCollectorImpl();
-    final LogicalExpression leftFieldExpr = condition.getLeft();
-    final LogicalExpression rightFieldExpr = condition.getRight();
 
     // Generate members and initialization code
     /////////////////////////////////////////
@@ -251,110 +295,14 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     // declare 'incoming' member so VVReadExpr generated code can point to the 
left or right batch
     JVar incomingRecordBatch = cg.clazz.field(JMod.NONE, recordBatchClass, 
"incoming");
 
-    // materialize value vector readers from join expression
-    final LogicalExpression materializedLeftExpr = 
ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, 
context.getFunctionRegistry());
-    if (collector.hasErrors())
-      throw new ClassTransformationException(String.format(
-          "Failure while trying to materialize incoming left field.  Errors:\n 
%s.", collector.toErrorString()));
-
-    final LogicalExpression materializedRightExpr = 
ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector, 
context.getFunctionRegistry());
-    if (collector.hasErrors())
-      throw new ClassTransformationException(String.format(
-          "Failure while trying to materialize incoming right field.  
Errors:\n %s.", collector.toErrorString()));
-
-
-    // generate compare()
-    ////////////////////////
-    cg.setMappingSet(compareMapping);
-    cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), 
JExpr._this().ref(incomingLeftRecordBatch));
-    ClassGenerator.HoldingContainer compareLeftExprHolder = 
cg.addExpr(materializedLeftExpr, false);
-
-    cg.setMappingSet(compareRightMapping);
-    cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), 
JExpr._this().ref(incomingRightRecordBatch));
-    ClassGenerator.HoldingContainer compareRightExprHolder = 
cg.addExpr(materializedRightExpr, false);
-
-    if (compareLeftExprHolder.isOptional() && 
compareRightExprHolder.isOptional()) {
-      // handle null == null
-      cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0))
-          .cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))))
-          ._then()
-          ._return(JExpr.lit(0));
-  
-      // handle null == !null
-      cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0))
-          .cor(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))))
-          ._then()
-          ._return(JExpr.lit(1));
-
-    } else if (compareLeftExprHolder.isOptional()) {
-      // handle null == required (null is less than any value)
-      cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)))
-          ._then()
-          ._return(JExpr.lit(-1));
-
-    } else if (compareRightExprHolder.isOptional()) {
-      // handle required == null (null is less than any value)
-      cg.getEvalBlock()._if(compareRightExprHolder.getIsSet().eq(JExpr.lit(0)))
-          ._then()
-          ._return(JExpr.lit(1));
-    }
-
-    LogicalExpression fh = 
FunctionGenerationHelper.getComparator(compareLeftExprHolder,
-      compareRightExprHolder,
-      context.getFunctionRegistry());
-    cg.addExpr(new ReturnValueExpression(fh, false), false);
-//    
-//    // equality
-//    
cg.getEvalBlock()._if(compareLeftExprHolder.getValue().eq(compareRightExprHolder.getValue()))
-//                     ._then()
-//                       ._return(JExpr.lit(0));
-//    // less than
-//    
cg.getEvalBlock()._if(compareLeftExprHolder.getValue().lt(compareRightExprHolder.getValue()))
-//                     ._then()
-//                       ._return(JExpr.lit(-1));
-//    // greater than
-//    cg.getEvalBlock()._return(JExpr.lit(1));
-
-
-    // generate compareNextLeftKey()
-    ////////////////////////////////
-    cg.setMappingSet(compareLeftMapping);
-    cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), 
JExpr._this().ref(incomingLeftRecordBatch));
-
-    // int nextLeftIndex = leftIndex + 1;
-    cg.getEvalBlock().decl(JType.parse(cg.getModel(), "int"), "nextLeftIndex", 
JExpr.direct("leftIndex").plus(JExpr.lit(1)));
-
-    // check if the next key is in this batch
-    
cg.getEvalBlock()._if(joinStatus.invoke("isNextLeftPositionInCurrentBatch").eq(JExpr.lit(false)))
-                     ._then()
-                       ._return(JExpr.lit(-1));
-
-    // generate VV read expressions
-    ClassGenerator.HoldingContainer compareThisLeftExprHolder = 
cg.addExpr(materializedLeftExpr, false);
-    cg.setMappingSet(compareNextLeftMapping); // change mapping from 
'leftIndex' to 'nextLeftIndex'
-    ClassGenerator.HoldingContainer compareNextLeftExprHolder = 
cg.addExpr(materializedLeftExpr, false);
-
-    if (compareThisLeftExprHolder.isOptional()) {
-      // handle null == null
-      
cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
-                            
.cand(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
-                       ._then()
-                         ._return(JExpr.lit(0));
-  
-      // handle null == !null
-      
cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
-                            
.cor(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
-                       ._then()
-                         ._return(JExpr.lit(1));
-    }
-
-    // check value equality
-
-    LogicalExpression gh = 
FunctionGenerationHelper.getComparator(compareThisLeftExprHolder,
-      compareNextLeftExprHolder,
-      context.getFunctionRegistry());
-    cg.addExpr(new ReturnValueExpression(gh, false), false);
-
+    //generate doCompare() method
+    /////////////////////////////////////////
+    generateDoCompare(cg, incomingRecordBatch, incomingLeftRecordBatch, 
incomingRightRecordBatch, collector);
+    
+    //generate doCompareNextLeftKey() method
+    /////////////////////////////////////////
+    generateDoCompareNextLeft(cg, incomingRecordBatch, 
incomingLeftRecordBatch, joinStatus, collector);
+    
     // generate copyLeft()
     //////////////////////
     cg.setMappingSet(copyLeftMapping);
@@ -425,4 +373,72 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     logger.debug("Built joined schema: {}", container.getSchema());
   }
 
+  private void generateDoCompare(ClassGenerator<JoinWorker> cg, JVar 
incomingRecordBatch, 
+      JVar incomingLeftRecordBatch, JVar incomingRightRecordBatch, 
ErrorCollector collector) throws ClassTransformationException {
+    
+    for (JoinCondition condition : conditions) {
+      final LogicalExpression leftFieldExpr = condition.getLeft();
+      final LogicalExpression rightFieldExpr = condition.getRight();
+
+      // materialize value vector readers from join expression
+      final LogicalExpression materializedLeftExpr = 
ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, 
context.getFunctionRegistry());
+      if (collector.hasErrors())
+        throw new ClassTransformationException(String.format(
+            "Failure while trying to materialize incoming left field.  
Errors:\n %s.", collector.toErrorString()));
+
+      final LogicalExpression materializedRightExpr = 
ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector, 
context.getFunctionRegistry());
+      if (collector.hasErrors())
+        throw new ClassTransformationException(String.format(
+            "Failure while trying to materialize incoming right field.  
Errors:\n %s.", collector.toErrorString()));
+
+      // generate compare()
+      ////////////////////////
+      cg.setMappingSet(compareMapping);
+      cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), 
JExpr._this().ref(incomingLeftRecordBatch));
+      ClassGenerator.HoldingContainer compareLeftExprHolder = 
cg.addExpr(materializedLeftExpr, false);
+
+      cg.setMappingSet(compareRightMapping);
+      cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), 
JExpr._this().ref(incomingRightRecordBatch));
+      ClassGenerator.HoldingContainer compareRightExprHolder = 
cg.addExpr(materializedRightExpr, false);
+
+      if (compareLeftExprHolder.isOptional() && 
compareRightExprHolder.isOptional()) {
+        // handle null == null
+        cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+            .cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))))
+            ._then()
+            ._return(JExpr.lit(0));
+    
+        // handle null == !null
+        cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+            .cor(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))))
+            ._then()
+            ._return(JExpr.lit(1));
+  
+      } else if (compareLeftExprHolder.isOptional()) {
+        // handle null == required (null is less than any value)
+        
cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)))
+            ._then()
+            ._return(JExpr.lit(-1));
+  
+      } else if (compareRightExprHolder.isOptional()) {
+        // handle required == null (null is less than any value)
+        
cg.getEvalBlock()._if(compareRightExprHolder.getIsSet().eq(JExpr.lit(0)))
+            ._then()
+            ._return(JExpr.lit(1));
+      }
+  
+      LogicalExpression fh = 
FunctionGenerationHelper.getComparator(compareLeftExprHolder,
+        compareRightExprHolder,
+        context.getFunctionRegistry()); 
+      HoldingContainer out = cg.addExpr(fh, false);
+      
+      //If not 0, it means not equal. We return this out value.       
+      JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
+      jc._then()._return(out.getValue());
+    }
+    
+    //Pass the equality check for all the join conditions. Finally, return 0.  
  
+    cg.getEvalBlock()._return(JExpr.lit(0));  
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/356c2dba/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinMulCondition.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinMulCondition.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinMulCondition.java
new file mode 100644
index 0000000..24dbe71
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinMulCondition.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.common.util.TestTools;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public class TestMergeJoinMulCondition extends PopUnitTestBase {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestMergeJoinMulCondition.class);
+  
+  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(200000);
+
+  @Test
+  //the physical plan is obtained for the following SQL query:
+  //  "select l.l_partkey, l.l_suppkey, ps.ps_partkey, ps.ps_suppkey "
+  //      + " from cp.`tpch/lineitem.parquet` l join "
+  //      + "      cp.`tpch/partsupp.parquet` ps"
+  //      + " on l.l_partkey = ps.ps_partkey and "
+  //      + "    l.l_suppkey = ps.ps_suppkey";    
+  public void testMergeJoinMultiKeys() throws Exception {
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
+        DrillClient client = new DrillClient(CONFIG, 
serviceSet.getCoordinator());) {
+
+      bit1.run();
+      client.connect();
+      List<QueryResultBatch> results = 
client.runQuery(UserProtos.QueryType.PHYSICAL,
+          
Files.toString(FileUtils.getResourceAsFile("/join/mj_multi_condition.json"),
+              Charsets.UTF_8));
+      int count = 0;
+      for(QueryResultBatch b : results) {
+        if (b.getHeader().getRowCount() != 0)
+          count += b.getHeader().getRowCount();
+      }
+      assertEquals(60175, count);
+    }
+  }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/356c2dba/exec/java-exec/src/test/resources/join/mj_multi_condition.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/join/mj_multi_condition.json 
b/exec/java-exec/src/test/resources/join/mj_multi_condition.json
new file mode 100644
index 0000000..f27f279
--- /dev/null
+++ b/exec/java-exec/src/test/resources/join/mj_multi_condition.json
@@ -0,0 +1,129 @@
+{
+  "head" : {
+    "version" : 1,
+    "generator" : {
+      "type" : "org.apache.drill.exec.planner.logical.DrillImplementor",
+      "info" : ""
+    },
+    "type" : "APACHE_DRILL_PHYSICAL",
+    "resultMode" : "EXEC"
+  },
+  "graph" : [ {
+    "pop" : "parquet-scan",
+    "@id" : 1,
+    "entries" : [ {
+      "path" : "/tpch/lineitem.parquet"
+    } ],
+    "storage" : {
+      "type" : "file",
+      "connection" : "classpath:///",
+      "workspaces" : null,
+      "formats" : null
+    },
+    "format" : {
+      "type" : "parquet"
+    }
+  }, {
+    "pop" : "sort",
+    "@id" : 2,
+    "child" : 1,
+    "orderings" : [ {
+      "order" : "ASC",
+      "expr" : "l_partkey",
+      "nullDirection" : "FIRST"
+    }, {
+      "order" : "ASC",
+      "expr" : "l_suppkey",
+      "nullDirection" : "FIRST"
+    } ],
+    "reverse" : false
+  }, {
+    "pop" : "selection-vector-remover",
+    "@id" : 3,
+    "child" : 2
+  }, {
+    "pop" : "parquet-scan",
+    "@id" : 4,
+    "entries" : [ {
+      "path" : "/tpch/partsupp.parquet"
+    } ],
+    "storage" : {
+      "type" : "file",
+      "connection" : "classpath:///",
+      "workspaces" : null,
+      "formats" : null
+    },
+    "format" : {
+      "type" : "parquet"
+    }
+  }, {
+    "pop" : "project",
+    "@id" : 5,
+    "exprs" : [ {
+      "ref" : "output.ps_partkey",
+      "expr" : "ps_partkey"
+    }, {
+      "ref" : "output.ps_suppkey",
+      "expr" : "ps_suppkey"
+    } ],
+    "child" : 4
+  }, {
+    "pop" : "sort",
+    "@id" : 6,
+    "child" : 5,
+    "orderings" : [ {
+      "order" : "ASC",
+      "expr" : "ps_partkey",
+      "nullDirection" : "FIRST"
+    }, {
+      "order" : "ASC",
+      "expr" : "ps_suppkey",
+      "nullDirection" : "FIRST"
+    } ],
+    "reverse" : false
+  }, {
+    "pop" : "selection-vector-remover",
+    "@id" : 7,
+    "child" : 6
+  }, {
+    "pop" : "merge-join",
+    "@id" : 8,
+    "left" : 3,
+    "right" : 7,
+    "conditions" : [ {
+      "relationship" : "==",
+      "left" : "l_partkey",
+      "right" : "ps_partkey"
+    }, {
+      "relationship" : "==",
+      "left" : "l_suppkey",
+      "right" : "ps_suppkey"
+    } ],
+    "joinType" : "INNER"
+  }, {
+    "pop" : "selection-vector-remover",
+    "@id" : 9,
+    "child" : 8
+  }, {
+    "pop" : "project",
+    "@id" : 10,
+    "exprs" : [ {
+      "ref" : "output.l_partkey",
+      "expr" : "l_partkey"
+    }, {
+      "ref" : "output.l_suppkey",
+      "expr" : "l_suppkey"
+    }, {
+      "ref" : "output.ps_partkey",
+      "expr" : "ps_partkey"
+    }, {
+      "ref" : "output.ps_suppkey",
+      "expr" : "ps_suppkey"
+    } ],
+    "child" : 9
+  }, {
+    "pop" : "screen",
+    "@id" : 11,
+    "child" : 10
+  } ]
+}

Reply via email to