[ 
https://issues.apache.org/jira/browse/DRILL-6798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672353#comment-16672353
 ] 

ASF GitHub Bot commented on DRILL-6798:
---------------------------------------

Ben-Zvi closed pull request #1514: DRILL-6798: Planner changes to support 
semi-join.
URL: https://github.com/apache/drill/pull/1514
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index ae55c9f21b5..17f8da52321 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -358,15 +358,18 @@ static RuleSet getDrillBasicRules(OptimizerRulesContext 
optimizerRulesContext) {
      * We have to create another copy of the ruleset with the context 
dependent elements;
      * this cannot be reused across queries.
      */
-    final ImmutableSet<RelOptRule> basicRules = 
ImmutableSet.<RelOptRule>builder()
+    ImmutableSet.Builder<RelOptRule> basicRules = 
ImmutableSet.<RelOptRule>builder()
         .addAll(staticRuleSet)
         .add(
             DrillMergeProjectRule.getInstance(true, 
RelFactories.DEFAULT_PROJECT_FACTORY,
                 optimizerRulesContext.getFunctionRegistry())
-            )
-        .build();
+            );
+    if (optimizerRulesContext.getPlannerSettings().isHashJoinEnabled() &&
+        optimizerRulesContext.getPlannerSettings().isSemiJoinEnabled()) {
+      basicRules.add(RuleInstance.SEMI_JOIN_PROJECT_RULE);
+    }
 
-    return RuleSets.ofList(basicRules);
+    return RuleSets.ofList(basicRules.build());
   }
 
   /**
@@ -474,7 +477,6 @@ static RuleSet getJoinPermRules(OptimizerRulesContext 
optimizerRulesContext) {
   static RuleSet getPhysicalRules(OptimizerRulesContext optimizerRulesContext) 
{
     final List<RelOptRule> ruleList = new ArrayList<>();
     final PlannerSettings ps = optimizerRulesContext.getPlannerSettings();
-
     ruleList.add(ConvertCountToDirectScan.AGG_ON_PROJ_ON_SCAN);
     ruleList.add(ConvertCountToDirectScan.AGG_ON_SCAN);
     ruleList.add(SortConvertPrule.INSTANCE);
@@ -509,9 +511,14 @@ static RuleSet getPhysicalRules(OptimizerRulesContext 
optimizerRulesContext) {
 
     if (ps.isHashJoinEnabled()) {
       ruleList.add(HashJoinPrule.DIST_INSTANCE);
-
+      if (ps.isSemiJoinEnabled()) {
+        ruleList.add(HashJoinPrule.SEMI_DIST_INSTANCE);
+      }
       if(ps.isBroadcastJoinEnabled()){
         ruleList.add(HashJoinPrule.BROADCAST_INSTANCE);
+        if (ps.isSemiJoinEnabled()) {
+          ruleList.add(HashJoinPrule.SEMI_BROADCAST_INSTANCE);
+        }
       }
     }
 
@@ -521,7 +528,6 @@ static RuleSet getPhysicalRules(OptimizerRulesContext 
optimizerRulesContext) {
       if(ps.isBroadcastJoinEnabled()){
         ruleList.add(MergeJoinPrule.BROADCAST_INSTANCE);
       }
-
     }
 
     // NLJ plans consist of broadcasting the right child, hence we need
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
index 8aec96c947f..b14488c9ca5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
@@ -18,8 +18,11 @@
 package org.apache.drill.exec.planner;
 
 import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.volcano.AbstractConverter;
+import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalCalc;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -39,12 +42,13 @@
 import org.apache.calcite.rel.rules.ProjectToWindowRule;
 import org.apache.calcite.rel.rules.ProjectWindowTransposeRule;
 import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rel.rules.SemiJoinRule;
 import org.apache.calcite.rel.rules.SortRemoveRule;
 import org.apache.calcite.rel.rules.SubQueryRemoveRule;
 import org.apache.calcite.rel.rules.UnionToDistinctRule;
 import org.apache.drill.exec.planner.logical.DrillConditions;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
-
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 /**
  * Contains rule instances which use custom RelBuilder.
  */
@@ -58,6 +62,15 @@
       new UnionToDistinctRule(LogicalUnion.class,
           DrillRelFactories.LOGICAL_BUILDER);
 
+  SemiJoinRule SEMI_JOIN_PROJECT_RULE = new 
SemiJoinRule.ProjectToSemiJoinRule(Project.class, Join.class, Aggregate.class,
+          DrillRelFactories.LOGICAL_BUILDER, "DrillSemiJoinRule:project") {
+    public boolean matches(RelOptRuleCall call) {
+      Preconditions.checkArgument(call.rel(1) instanceof Join);
+      Join join = call.rel(1);
+      return !(join.getCondition().isAlwaysTrue() || 
join.getCondition().isAlwaysFalse());
+    }
+  };
+
   JoinPushExpressionsRule JOIN_PUSH_EXPRESSIONS_RULE =
       new JoinPushExpressionsRule(Join.class,
           DrillRelFactories.LOGICAL_BUILDER);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
index 434016ff830..cde49e4b96e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
@@ -29,6 +29,7 @@
 import org.apache.drill.exec.physical.impl.join.JoinUtils;
 import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.planner.logical.DrillJoin;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -45,7 +46,7 @@
 /**
  * Base class for logical and physical Joins implemented in Drill.
  */
-public abstract class DrillJoinRelBase extends Join implements DrillRelNode {
+public abstract class DrillJoinRelBase extends Join implements DrillJoin {
   protected List<Integer> leftKeys = Lists.newArrayList();
   protected List<Integer> rightKeys = Lists.newArrayList();
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java
new file mode 100644
index 00000000000..30067dab9c6
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillRelNode;
+import java.util.List;
+
+/**
+ * Interface which needs to be implemented by all the join relation 
expressions.
+ */
+public interface DrillJoin extends DrillRelNode {
+
+  /* Columns of left table that are part of join condition */
+  List<Integer> getLeftKeys();
+
+  /* Columns of right table that are part of join condition */
+  List<Integer> getRightKeys();
+
+  /* JoinType of the join operation*/
+  JoinRelType getJoinType();
+
+  /* Join condition of the join relation */
+  RexNode getCondition();
+
+  /* Left RelNode of the Join Relation */
+  RelNode getLeft();
+
+  /* Right RelNode of the Join Relation */
+  RelNode getRight();
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
index 42f7e72bc51..0126e745cd9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
@@ -104,7 +104,7 @@ public LogicalOperator implement(DrillImplementor 
implementor) {
    * @return
    */
   private LogicalOperator implementInput(DrillImplementor implementor, int i, 
int offset, RelNode input) {
-    return implementInput(implementor, i, offset, input, this);
+    return implementInput(implementor, i, offset, input, this, 
this.getRowType().getFieldNames());
   }
 
   /**
@@ -118,12 +118,12 @@ private LogicalOperator implementInput(DrillImplementor 
implementor, int i, int
    * @return
    */
   public static LogicalOperator implementInput(DrillImplementor implementor, 
int i, int offset,
-                                                RelNode input, DrillRel 
currentNode) {
+                                               RelNode input, DrillRel 
currentNode,
+                                               List<String> parentFields) {
     final LogicalOperator inputOp = implementor.visitChild(currentNode, i, 
input);
     assert uniqueFieldNames(input.getRowType());
-    final List<String> fields = currentNode.getRowType().getFieldNames();
     final List<String> inputFields = input.getRowType().getFieldNames();
-    final List<String> outputFields = fields.subList(offset, offset + 
inputFields.size());
+    final List<String> outputFields = parentFields.subList(offset, offset + 
inputFields.size());
     if (!outputFields.equals(inputFields)) {
       // Ensure that input field names are the same as output field names.
       // If there are duplicate field names on left and right, fields will get
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
index 4356d491046..ca03de14fd6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
@@ -28,6 +28,7 @@
 import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
 
+import java.util.ArrayList;
 import java.util.List;
 
 
@@ -48,12 +49,14 @@ public Correlate copy(RelTraitSet traitSet,
 
   @Override
   public LogicalOperator implement(DrillImplementor implementor) {
-    final List<String> fields = getRowType().getFieldNames();
+    List<String> fields = new ArrayList<>();
+    fields.addAll(getInput(0).getRowType().getFieldNames());
+    fields.addAll(getInput(1).getRowType().getFieldNames());
     assert DrillJoinRel.isUnique(fields);
     final int leftCount = getInputSize(0);
 
-    final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 
0, left, this);
-    final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 
1, leftCount, right, this);
+    final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 
0, left, this, fields);
+    final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 
1, leftCount, right, this, fields);
 
     return new LateralJoin(leftOp, rightOp);
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
index d5ff56bc1b5..a0b727d3f29 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
@@ -22,6 +22,7 @@
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.type.RelDataType;
@@ -39,7 +40,6 @@
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_JOIN_FACTORY;
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_MATCH_FACTORY;
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_PROJECT_FACTORY;
-import static 
org.apache.calcite.rel.core.RelFactories.DEFAULT_SEMI_JOIN_FACTORY;
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_SET_OP_FACTORY;
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_SORT_FACTORY;
 import static 
org.apache.calcite.rel.core.RelFactories.DEFAULT_TABLE_SCAN_FACTORY;
@@ -60,6 +60,17 @@
   public static final RelFactories.JoinFactory DRILL_LOGICAL_JOIN_FACTORY = 
new DrillJoinFactoryImpl();
 
   public static final RelFactories.AggregateFactory 
DRILL_LOGICAL_AGGREGATE_FACTORY = new DrillAggregateFactoryImpl();
+
+  public static final RelFactories.SemiJoinFactory DRILL_SEMI_JOIN_FACTORY = 
new SemiJoinFactoryImpl();
+
+  private static class SemiJoinFactoryImpl implements 
RelFactories.SemiJoinFactory {
+    public RelNode createSemiJoin(RelNode left, RelNode right,
+                                  RexNode condition) {
+      final JoinInfo joinInfo = JoinInfo.of(left, right, condition);
+      return DrillSemiJoinRel.create(left, right,
+              condition, joinInfo.leftKeys, joinInfo.rightKeys);
+    }
+  }
   /**
    * A {@link RelBuilderFactory} that creates a {@link DrillRelBuilder} that 
will
    * create logical relational expressions for everything.
@@ -69,7 +80,7 @@
           Contexts.of(DEFAULT_PROJECT_FACTORY,
               DEFAULT_FILTER_FACTORY,
               DEFAULT_JOIN_FACTORY,
-              DEFAULT_SEMI_JOIN_FACTORY,
+              DRILL_SEMI_JOIN_FACTORY,
               DEFAULT_SORT_FACTORY,
               DEFAULT_AGGREGATE_FACTORY,
               DEFAULT_MATCH_FACTORY,
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
new file mode 100644
index 00000000000..09e4be9de25
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Join;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.LogicalSemiJoin;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DrillSemiJoinRel extends SemiJoin implements DrillJoin, DrillRel {
+
+  public DrillSemiJoinRel(
+          RelOptCluster cluster,
+          RelTraitSet traitSet,
+          RelNode left,
+          RelNode right,
+          RexNode condition,
+          ImmutableIntList leftKeys,
+          ImmutableIntList rightKeys) {
+    super(cluster,
+          traitSet,
+          left,
+          right,
+          condition,
+          leftKeys,
+          rightKeys);
+  }
+
+  public static SemiJoin create(RelNode left, RelNode right, RexNode condition,
+                                ImmutableIntList leftKeys, ImmutableIntList 
rightKeys) {
+    final RelOptCluster cluster = left.getCluster();
+    return new DrillSemiJoinRel(cluster, 
cluster.traitSetOf(DrillRel.DRILL_LOGICAL), left,
+            right, condition, leftKeys, rightKeys);
+  }
+
+  @Override
+  public SemiJoin copy(RelTraitSet traitSet, RexNode condition,
+                                 RelNode left, RelNode right, JoinRelType 
joinType, boolean semiJoinDone) {
+    Preconditions.checkArgument(joinType == JoinRelType.INNER);
+    final JoinInfo joinInfo = JoinInfo.of(left, right, condition);
+    Preconditions.checkArgument(joinInfo.isEqui());
+    return new DrillSemiJoinRel(getCluster(), traitSet, left, right, condition,
+            joinInfo.leftKeys, joinInfo.rightKeys);
+  }
+
+  @Override
+  public LogicalOperator implement(DrillImplementor implementor) {
+    List<String> fields = new ArrayList<>();
+    fields.addAll(getInput(0).getRowType().getFieldNames());
+    fields.addAll(getInput(1).getRowType().getFieldNames());
+    Preconditions.checkArgument(DrillJoinRel.isUnique(fields));
+    final int leftCount = left.getRowType().getFieldCount();
+    final List<String> leftFields = fields.subList(0, leftCount);
+    final List<String> rightFields = fields.subList(leftCount, leftCount + 
right.getRowType().getFieldCount());
+
+    final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 
0, left, this, fields);
+    final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 
1, leftCount, right, this, fields);
+
+    Join.Builder builder = Join.builder();
+    builder.type(joinType);
+    builder.left(leftOp);
+    builder.right(rightOp);
+    List<JoinCondition> conditions = Lists.newArrayList();
+    for (Pair<Integer, Integer> pair : Pair.zip(leftKeys, rightKeys)) {
+      conditions.add(new JoinCondition(DrillJoinRel.EQUALITY_CONDITION,
+              new FieldReference(leftFields.get(pair.left)), new 
FieldReference(rightFields.get(pair.right))));
+    }
+
+    return new LogicalSemiJoin(leftOp, rightOp, conditions, joinType);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
index 0e1fc4e85ce..6480f3d3581 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
@@ -20,8 +20,13 @@
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.common.logical.data.JoinCondition;
 
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -38,7 +43,7 @@
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rex.RexNode;
-
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
 
@@ -50,14 +55,25 @@
   private int joinControl;
 
   public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, 
RelNode right, RexNode condition,
-                      JoinRelType joinType) throws InvalidRelException {
-    this(cluster, traits, left, right, condition, joinType, false, null, 
false, JoinControl.DEFAULT);
+                      JoinRelType joinType, boolean semiJoin) throws 
InvalidRelException {
+    this(cluster, traits, left, right, condition, joinType, false, null, 
false, JoinControl.DEFAULT, semiJoin);
+  }
+
+  public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, 
RelNode right, RexNode condition,
+                      JoinRelType joinType, boolean swapped, RuntimeFilterDef 
runtimeFilterDef,
+                      boolean isRowKeyJoin, int joinControl) throws 
InvalidRelException {
+    this(cluster, traits, left, right, condition, joinType, swapped, 
runtimeFilterDef, isRowKeyJoin, joinControl, false);
   }
 
   public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, 
RelNode right, RexNode condition,
       JoinRelType joinType, boolean swapped, RuntimeFilterDef runtimeFilterDef,
-      boolean isRowKeyJoin, int joinControl) throws InvalidRelException {
-    super(cluster, traits, left, right, condition, joinType);
+      boolean isRowKeyJoin, int joinControl, boolean semiJoin) throws 
InvalidRelException {
+    super(cluster, traits, left, right, condition, joinType, semiJoin);
+    Preconditions.checkArgument(isSemiJoin && !swapped || swapped && 
!isSemiJoin || (!swapped && !isSemiJoin));
+    if (isSemiJoin) {
+      Preconditions.checkArgument(!swapped, "swapping of inputs is not allowed 
for semi-joins");
+      Preconditions.checkArgument(validateTraits(traitSet, left, right));
+    }
     this.swapped = swapped;
     this.isRowKeyJoin = isRowKeyJoin;
     joincategory = JoinUtils.getJoinCategory(left, right, condition, leftKeys, 
rightKeys, filterNulls);
@@ -65,11 +81,34 @@ public HashJoinPrel(RelOptCluster cluster, RelTraitSet 
traits, RelNode left, Rel
     this.joinControl = joinControl;
   }
 
+  private static boolean validateTraits(RelTraitSet traitSet, RelNode left, 
RelNode right) {
+    ImmutableBitSet bitSet = 
ImmutableBitSet.range(left.getRowType().getFieldCount(),
+            left.getRowType().getFieldCount() + 
right.getRowType().getFieldCount());
+    for (RelTrait trait: traitSet) {
+      if (trait.getTraitDef().getTraitClass().equals(RelCollation.class)) {
+        RelCollation collationTrait = (RelCollation)trait;
+        for (RelFieldCollation field : collationTrait.getFieldCollations()) {
+          if (bitSet.indexOf(field.getFieldIndex()) > 0) {
+            return false;
+          }
+        }
+      } else if 
(trait.getTraitDef().getTraitClass().equals(DrillDistributionTrait.class)) {
+        DrillDistributionTrait distributionTrait = (DrillDistributionTrait) 
trait;
+        for (DrillDistributionTrait.DistributionField field : 
distributionTrait.getFields()) {
+          if (bitSet.indexOf(field.getFieldId()) > 0) {
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
+
   @Override
   public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, 
RelNode right, JoinRelType joinType, boolean semiJoinDone) {
     try {
       return new HashJoinPrel(this.getCluster(), traitSet, left, right, 
conditionExpr, joinType, this.swapped, this.runtimeFilterDef,
-          this.isRowKeyJoin, this.joinControl);
+          this.isRowKeyJoin, this.joinControl, this.isSemiJoin);
     }catch (InvalidRelException e) {
       throw new AssertionError(e);
     }
@@ -87,7 +126,7 @@ public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
   }
 
   @Override
-  public org.apache.drill.exec.physical.base.PhysicalOperator 
getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) 
throws IOException {
     // Depending on whether the left/right is swapped for hash inner join, 
pass in different
     // combinations of parameters.
     if (! swapped) {
@@ -150,4 +189,8 @@ public boolean isRowKeyJoin() {
     return this.isRowKeyJoin;
   }
 
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw).item("semi-join: ", isSemiJoin);
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
index d07cf51d3c1..0d7f5caa483 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
@@ -17,7 +17,9 @@
  */
 package org.apache.drill.exec.planner.physical;
 
+import org.apache.drill.exec.planner.logical.DrillJoin;
 import org.apache.drill.exec.planner.logical.DrillJoinRel;
+import org.apache.drill.exec.planner.logical.DrillSemiJoinRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelNode;
@@ -30,10 +32,14 @@
 public class HashJoinPrule extends JoinPruleBase {
   public static final RelOptRule DIST_INSTANCE = new 
HashJoinPrule("Prel.HashJoinDistPrule", RelOptHelper.any(DrillJoinRel.class), 
true);
   public static final RelOptRule BROADCAST_INSTANCE = new 
HashJoinPrule("Prel.HashJoinBroadcastPrule", 
RelOptHelper.any(DrillJoinRel.class), false);
+  public static final RelOptRule SEMI_DIST_INSTANCE = new 
HashJoinPrule("Prel.HashSemiJoinDistPrule", 
RelOptHelper.any(DrillSemiJoinRel.class), true);
+  public static final RelOptRule SEMI_BROADCAST_INSTANCE = new 
HashJoinPrule("Prel.HashSemiJoinBroadcastPrule", 
RelOptHelper.any(DrillSemiJoinRel.class), false);
+
 
   protected static final Logger tracer = CalciteTrace.getPlannerTracer();
 
   private final boolean isDist;
+  private boolean isSemi = false;
   private HashJoinPrule(String name, RelOptRuleOperand operand, boolean 
isDist) {
     super(operand, name);
     this.isDist = isDist;
@@ -42,17 +48,18 @@ private HashJoinPrule(String name, RelOptRuleOperand 
operand, boolean isDist) {
   @Override
   public boolean matches(RelOptRuleCall call) {
     PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+    isSemi = call.rel(0) instanceof DrillSemiJoinRel;
     return settings.isMemoryEstimationEnabled() || 
settings.isHashJoinEnabled();
   }
 
   @Override
   public void onMatch(RelOptRuleCall call) {
     PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
-    if (!settings.isHashJoinEnabled()) {
+    if (!settings.isHashJoinEnabled() || isSemi && 
!settings.isSemiJoinEnabled()) {
       return;
     }
 
-    final DrillJoinRel join = call.rel(0);
+    final DrillJoin join = call.rel(0);
     final RelNode left = join.getLeft();
     final RelNode right = join.getRight();
 
@@ -66,11 +73,11 @@ public void onMatch(RelOptRuleCall call) {
 
       if(isDist){
         createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN,
-            left, right, null /* left collation */, null /* right collation 
*/, hashSingleKey);
+            left, right, null /* left collation */, null /* right collation 
*/, hashSingleKey, isSemi);
       }else{
         if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
           createBroadcastPlan(call, join, join.getCondition(), 
PhysicalJoinType.HASH_JOIN,
-              left, right, null /* left collation */, null /* right collation 
*/);
+              left, right, null /* left collation */, null /* right collation 
*/, isSemi);
         }
       }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
index c40eeaa6a09..2581fa66738 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
@@ -17,9 +17,14 @@
  */
 package org.apache.drill.exec.planner.physical;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.calcite.rex.RexChecker;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.Litmus;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.JoinCondition;
@@ -37,7 +42,6 @@
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.Pair;
-
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 /**
@@ -48,11 +52,18 @@
 public abstract class JoinPrel extends DrillJoinRelBase implements Prel {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(JoinPrel.class);
 
+  protected final boolean isSemiJoin;
   protected JoinUtils.JoinCategory joincategory;
 
   public JoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, 
RelNode right, RexNode condition,
-      JoinRelType joinType) {
+                  JoinRelType joinType) {
+    this(cluster, traits, left, right, condition, joinType, false);
+  }
+
+  public JoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, 
RelNode right, RexNode condition,
+      JoinRelType joinType, boolean isSemiJoin) {
     super(cluster, traits, left, right, condition, joinType);
+    this.isSemiJoin = isSemiJoin;
   }
 
   @Override
@@ -73,7 +84,12 @@ public RelNode getJoinInput(int offset, RelNode input) {
     assert uniqueFieldNames(input.getRowType());
     final List<String> fields = getRowType().getFieldNames();
     final List<String> inputFields = input.getRowType().getFieldNames();
-    final List<String> outputFields = fields.subList(offset, offset + 
inputFields.size());
+    final List<String> outputFields;
+    if (fields.size() > offset) {
+      outputFields = fields.subList(offset, offset + inputFields.size());
+    } else {
+      outputFields = new ArrayList<>();
+    }
     if (!outputFields.equals(inputFields)) {
       // Ensure that input field names are the same as output field names.
       // If there are duplicate field names on left and right, fields will get
@@ -86,6 +102,9 @@ public RelNode getJoinInput(int offset, RelNode input) {
   }
 
   private RelNode rename(RelNode input, List<RelDataTypeField> inputFields, 
List<String> outputFieldNames) {
+    if (outputFieldNames.size() == 0) {
+      return input;
+    }
     List<RexNode> exprs = Lists.newArrayList();
 
     for (RelDataTypeField field : inputFields) {
@@ -139,4 +158,62 @@ protected void buildJoinConditions(List<JoinCondition> 
conditions,
     }
   }
 
+  public boolean isSemiJoin() {
+    return isSemiJoin;
+  }
+
+  /* A Drill physical rel which is semi join will have output row type with 
fields from only
+     left side of the join. Calcite's join rel expects to have the output row 
type from
+     left and right side of the join. This function is overloaded to not throw 
exceptions for
+     a Drill semi join physical rel.
+   */
+  @Override public boolean isValid(Litmus litmus, Context context) {
+    if (!this.isSemiJoin && !super.isValid(litmus, context)) {
+      return false;
+    }
+    if (getRowType().getFieldCount()
+            != getSystemFieldList().size()
+            + left.getRowType().getFieldCount()
+            + (this.isSemiJoin ? 0 : right.getRowType().getFieldCount())) {
+      return litmus.fail("field count mismatch");
+    }
+    if (condition != null) {
+      if (condition.getType().getSqlTypeName() != SqlTypeName.BOOLEAN) {
+        return litmus.fail("condition must be boolean: {}",
+                condition.getType());
+      }
+      // The input to the condition is a row type consisting of system
+      // fields, left fields, and right fields. Very similar to the
+      // output row type, except that fields have not yet been made due
+      // due to outer joins.
+      RexChecker checker =
+              new RexChecker(
+                      getCluster().getTypeFactory().builder()
+                              .addAll(getSystemFieldList())
+                              .addAll(getLeft().getRowType().getFieldList())
+                              .addAll(getRight().getRowType().getFieldList())
+                              .build(),
+                      context, litmus);
+      condition.accept(checker);
+      if (checker.getFailureCount() > 0) {
+        return litmus.fail(checker.getFailureCount()
+                + " failures in condition " + condition);
+      }
+    }
+    return litmus.succeed();
+  }
+
+  @Override public RelDataType deriveRowType() {
+    if (isSemiJoin) {
+      return SqlValidatorUtil.deriveJoinRowType(
+              left.getRowType(),
+              null,
+              this.joinType,
+              getCluster().getTypeFactory(),
+              null,
+              new ArrayList<>());
+    } else {
+      return super.deriveRowType();
+    }
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index 7588e2c1393..36654016044 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -22,7 +22,7 @@
 import org.apache.drill.exec.physical.impl.join.JoinUtils;
 import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
 import org.apache.drill.exec.planner.common.DrillJoinRelBase;
-import org.apache.drill.exec.planner.logical.DrillJoinRel;
+import org.apache.drill.exec.planner.logical.DrillJoin;
 import 
org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -47,8 +47,8 @@ protected JoinPruleBase(RelOptRuleOperand operand, String 
description) {
     super(operand, description);
   }
 
-  protected boolean checkPreconditions(DrillJoinRel join, RelNode left, 
RelNode right,
-      PlannerSettings settings) {
+  protected boolean checkPreconditions(DrillJoin join, RelNode left, RelNode 
right,
+                                       PlannerSettings settings) {
     List<Integer> leftKeys = Lists.newArrayList();
     List<Integer> rightKeys = Lists.newArrayList();
     List<Boolean> filterNulls = Lists.newArrayList();
@@ -66,7 +66,7 @@ protected boolean checkPreconditions(DrillJoinRel join, 
RelNode left, RelNode ri
     return distFields;
   }
 
-  protected boolean checkBroadcastConditions(RelOptPlanner planner, 
DrillJoinRel join, RelNode left, RelNode right) {
+  protected boolean checkBroadcastConditions(RelOptPlanner planner, DrillJoin 
join, RelNode left, RelNode right) {
 
     double estimatedRightRowCount = 
RelMetadataQuery.instance().getRowCount(right);
     if (estimatedRightRowCount < 
PrelUtil.getSettings(join.getCluster()).getBroadcastThreshold()
@@ -78,10 +78,11 @@ protected boolean checkBroadcastConditions(RelOptPlanner 
planner, DrillJoinRel j
     return false;
   }
 
-  protected void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
+  protected void createDistBothPlan(RelOptRuleCall call, DrillJoin join,
       PhysicalJoinType physicalJoinType,
       RelNode left, RelNode right,
-      RelCollation collationLeft, RelCollation collationRight, boolean 
hashSingleKey)throws InvalidRelException {
+      RelCollation collationLeft, RelCollation collationRight,
+      boolean hashSingleKey, boolean semiJoin)throws InvalidRelException {
 
     /* If join keys are  l1 = r1 and l2 = r2 and ... l_k = r_k, then consider 
the following options of plan:
      *   1) Plan1: distributed by (l1, l2, ..., l_k) for left side and by (r1, 
r2, ..., r_k) for right side.
@@ -93,10 +94,12 @@ protected void createDistBothPlan(RelOptRuleCall call, 
DrillJoinRel join,
      *   Whether enumerate plan 2, .., Plan_(k+1) depends on option : 
hashSingleKey.
      */
 
-    DrillDistributionTrait hashLeftPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
-    DrillDistributionTrait hashRightPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
+    DrillDistributionTrait hashLeftPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+            ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
+    DrillDistributionTrait hashRightPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+            ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
 
-    createDistBothPlan(call, join, physicalJoinType, left, right, 
collationLeft, collationRight, hashLeftPartition, hashRightPartition);
+    createDistBothPlan(call, join, physicalJoinType, left, right, 
collationLeft, collationRight, hashLeftPartition, hashRightPartition, semiJoin);
 
     assert (join.getLeftKeys().size() == join.getRightKeys().size());
 
@@ -110,7 +113,7 @@ protected void createDistBothPlan(RelOptRuleCall call, 
DrillJoinRel join,
         hashLeftPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getLeftKeys().subList(i, 
i+1))));
         hashRightPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getRightKeys().subList(i, 
i+1))));
 
-        createDistBothPlan(call, join, physicalJoinType, left, right, 
collationLeft, collationRight, hashLeftPartition, hashRightPartition);
+        createDistBothPlan(call, join, physicalJoinType, left, right, 
collationLeft, collationRight, hashLeftPartition, hashRightPartition, semiJoin);
       }
     }
   }
@@ -118,11 +121,11 @@ protected void createDistBothPlan(RelOptRuleCall call, 
DrillJoinRel join,
   // Create join plan with both left and right children hash distributed. If 
the physical join type
   // is MergeJoin, a collation must be provided for both left and right child 
and the plan will contain
   // sort converter if necessary to provide the collation.
-  private void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
+  private void createDistBothPlan(RelOptRuleCall call, DrillJoin join,
       PhysicalJoinType physicalJoinType,
       RelNode left, RelNode right,
       RelCollation collationLeft, RelCollation collationRight,
-      DrillDistributionTrait hashLeftPartition, DrillDistributionTrait 
hashRightPartition) throws InvalidRelException {
+      DrillDistributionTrait hashLeftPartition, DrillDistributionTrait 
hashRightPartition, boolean isSemiJoin) throws InvalidRelException {
 
     RelTraitSet traitsLeft = null;
     RelTraitSet traitsRight = null;
@@ -145,7 +148,7 @@ private void createDistBothPlan(RelOptRuleCall call, 
DrillJoinRel join,
       final RelTraitSet traitSet = PrelUtil.removeCollation(traitsLeft, call);
       newJoin = new HashJoinPrel(join.getCluster(), traitSet,
                                  convertedLeft, convertedRight, 
join.getCondition(),
-                                 join.getJoinType());
+                                 join.getJoinType(), isSemiJoin);
 
     } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
       newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft,
@@ -158,11 +161,11 @@ private void createDistBothPlan(RelOptRuleCall call, 
DrillJoinRel join,
   // Create join plan with left child ANY distributed and right child 
BROADCAST distributed. If the physical join type
   // is MergeJoin, a collation must be provided for both left and right child 
and the plan will contain sort converter
   // if necessary to provide the collation.
-  protected void createBroadcastPlan(final RelOptRuleCall call, final 
DrillJoinRel join,
+  protected void createBroadcastPlan(final RelOptRuleCall call, final 
DrillJoin join,
       final RexNode joinCondition,
       final PhysicalJoinType physicalJoinType,
       final RelNode left, final RelNode right,
-      final RelCollation collationLeft, final RelCollation collationRight) 
throws InvalidRelException {
+      final RelCollation collationLeft, final RelCollation collationRight, 
boolean semiJoin) throws InvalidRelException {
 
     DrillDistributionTrait distBroadcastRight = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
     RelTraitSet traitsRight = null;
@@ -184,10 +187,10 @@ protected void createBroadcastPlan(final RelOptRuleCall 
call, final DrillJoinRel
 
     if(traitProp){
       if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
-        new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+        new SubsetTransformer<DrillJoin, InvalidRelException>(call) {
 
           @Override
-          public RelNode convertChild(final DrillJoinRel join, final RelNode 
rel) throws InvalidRelException {
+          public RelNode convertChild(final DrillJoin join, final RelNode rel) 
throws InvalidRelException {
             DrillDistributionTrait toDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
             RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, 
collationLeft, toDist);
 
@@ -200,24 +203,24 @@ public RelNode convertChild(final DrillJoinRel join, 
final RelNode rel) throws I
 
 
       } else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
-        new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+        new SubsetTransformer<DrillJoin, InvalidRelException>(call) {
 
           @Override
-          public RelNode convertChild(final DrillJoinRel join,  final RelNode 
rel) throws InvalidRelException {
+          public RelNode convertChild(final DrillJoin join,  final RelNode 
rel) throws InvalidRelException {
             DrillDistributionTrait toDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
             RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, 
toDist);
             RelNode newLeft = convert(left, newTraitsLeft);
             return new HashJoinPrel(join.getCluster(), newTraitsLeft, newLeft, 
convertedRight, joinCondition,
-                                         join.getJoinType());
+                                         join.getJoinType(), semiJoin);
 
           }
 
         }.go(join, convertedLeft);
       } else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
-        new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+        new SubsetTransformer<DrillJoin, InvalidRelException>(call) {
 
           @Override
-          public RelNode convertChild(final DrillJoinRel join,  final RelNode 
rel) throws InvalidRelException {
+          public RelNode convertChild(final DrillJoin join,  final RelNode 
rel) throws InvalidRelException {
             DrillDistributionTrait toDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
             RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, 
toDist);
             RelNode newLeft = convert(left, newTraitsLeft);
@@ -235,7 +238,7 @@ public RelNode convertChild(final DrillJoinRel join,  final 
RelNode rel) throws
       } else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
         final RelTraitSet traitSet = 
PrelUtil.removeCollation(convertedLeft.getTraitSet(), call);
         call.transformTo(new HashJoinPrel(join.getCluster(), traitSet, 
convertedLeft,
-            convertedRight, joinCondition, join.getJoinType()));
+            convertedRight, joinCondition, join.getJoinType(), semiJoin));
       } else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
         call.transformTo(new NestedLoopJoinPrel(join.getCluster(), 
convertedLeft.getTraitSet(), convertedLeft,
             convertedRight, joinCondition, join.getJoinType()));
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
index f06b66d2bc5..0bd25685dd1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
@@ -70,11 +70,11 @@ public void onMatch(RelOptRuleCall call) {
       RelCollation collationRight = getCollation(join.getRightKeys());
 
       if(isDist){
-        createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, 
right, collationLeft, collationRight, hashSingleKey);
+        createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, 
right, collationLeft, collationRight, hashSingleKey, false);
       }else{
         if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
           createBroadcastPlan(call, join, join.getCondition(), 
PhysicalJoinType.MERGE_JOIN,
-              left, right, collationLeft, collationRight);
+              left, right, collationLeft, collationRight, false);
         }
       }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
index 848c8a16f50..e7fc032af33 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
@@ -21,6 +21,7 @@
 
 import org.apache.drill.exec.physical.impl.join.JoinUtils;
 import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
+import org.apache.drill.exec.planner.logical.DrillJoin;
 import org.apache.drill.exec.planner.logical.DrillJoinRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.calcite.rel.InvalidRelException;
@@ -45,8 +46,8 @@ private NestedLoopJoinPrule(String name, RelOptRuleOperand 
operand) {
   }
 
   @Override
-  protected boolean checkPreconditions(DrillJoinRel join, RelNode left, 
RelNode right,
-      PlannerSettings settings) {
+  protected boolean checkPreconditions(DrillJoin join, RelNode left, RelNode 
right,
+                                       PlannerSettings settings) {
     JoinRelType type = join.getJoinType();
 
     if (!(type == JoinRelType.INNER || type == JoinRelType.LEFT)) {
@@ -93,7 +94,7 @@ public void onMatch(RelOptRuleCall call) {
 
       if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
         createBroadcastPlan(call, join, join.getCondition(), 
PhysicalJoinType.NESTEDLOOP_JOIN,
-            left, right, null /* left collation */, null /* right collation 
*/);
+            left, right, null /* left collation */, null /* right collation 
*/, false);
       }
 
     } catch (InvalidRelException e) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 63f884cfd65..7577cf91829 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -64,6 +64,8 @@
       new OptionDescription("Generates the topN plan for queries with the 
ORDER BY and LIMIT clauses."));
   public static final OptionValidator HASHJOIN = new 
BooleanValidator("planner.enable_hashjoin",
       new OptionDescription("Enable the memory hungry hash join. Drill assumes 
that a query will have adequate memory to complete and tries to use the fastest 
operations possible to complete the planned inner, left, right, or full outer 
joins using a hash table. Does not write to disk. Disabling hash join allows 
Drill to manage arbitrarily large data in a small memory footprint."));
+  public static final OptionValidator SEMIJOIN = new 
BooleanValidator("planner.enable_semijoin",
+          new OptionDescription("Enable the semi join optimization. Planner 
removes the distinct processing below the hash join and sets the semi join flag 
in hash join."));
   public static final OptionValidator MERGEJOIN = new 
BooleanValidator("planner.enable_mergejoin",
       new OptionDescription("Sort-based operation. A merge join is used for 
inner join, left and right outer joins. Inputs to the merge join must be 
sorted. It reads the sorted input streams from both sides and finds matching 
rows. Writes to disk."));
   public static final OptionValidator NESTEDLOOPJOIN = new 
BooleanValidator("planner.enable_nestedloopjoin",
@@ -273,6 +275,10 @@ public boolean isHashJoinEnabled() {
     return options.getOption(HASHJOIN.getOptionName()).bool_val;
   }
 
+  public boolean isSemiJoinEnabled() {
+    return options.getOption(SEMIJOIN.getOptionName()).bool_val;
+  }
+
   public boolean isMergeJoinEnabled() {
     return options.getOption(MERGEJOIN.getOptionName()).bool_val;
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
index b7bc4bb77bc..0fe0f92b018 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
@@ -68,7 +68,7 @@ public Prel visitJoin(JoinPrel prel, Double value) throws 
RuntimeException {
       // Mark left/right is swapped, when INNER hash join's left row count < ( 
1+ margin factor) right row count.
       RelMetadataQuery mq = newJoin.getCluster().getMetadataQuery();
       if (newJoin.getLeft().estimateRowCount(mq) < (1 + value) * 
newJoin.getRight().estimateRowCount(mq) &&
-          newJoin.getJoinType() == JoinRelType.INNER) {
+          newJoin.getJoinType() == JoinRelType.INNER && !newJoin.isSemiJoin()) 
{
         ((HashJoinPrel) newJoin).setSwapped(true);
       }
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 23f35b5e17b..a33d8326489 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -81,6 +81,7 @@
       new OptionDefinition(PlannerSettings.STREAMAGG),
       new OptionDefinition(PlannerSettings.TOPN, new 
OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
       new OptionDefinition(PlannerSettings.HASHJOIN),
+      new OptionDefinition(PlannerSettings.SEMIJOIN),
       new OptionDefinition(PlannerSettings.MERGEJOIN),
       new OptionDefinition(PlannerSettings.NESTEDLOOPJOIN),
       new OptionDefinition(PlannerSettings.MULTIPHASE),
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index 76be050ba01..f083c6603c8 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -514,6 +514,7 @@ drill.exec.options: {
     planner.enable_hash_single_key: true,
     planner.enable_hashagg: true,
     planner.enable_hashjoin: true,
+    planner.enable_semijoin: false,
     planner.enable_hashjoin_swap: true,
     planner.enable_hep_opt: true,
     planner.enable_hep_partition_pruning: true,
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java
new file mode 100644
index 00000000000..a660fffee0f
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import org.junit.experimental.categories.Category;
+
+@Category({SlowTest.class, OperatorTest.class})
+public class TestSemiJoin extends BaseTestQuery {
+  @Test
+  public void testInClauseToSemiJoin() throws Exception {
+    String sql = "select employee_id, full_name from cp.`employee.json` where 
employee_id in (select employee_id from cp.`employee.json` )";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+
+  @Test
+  public void testInClauseWithSemiJoinDisabled() throws Exception {
+    String sql = "select employee_id, full_name from cp.`employee.json` where 
employee_id in (select employee_id from cp.`employee.json` )";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), false);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(!queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+
+  @Test
+  public void testSmallInClauseToSemiJoin() throws Exception {
+    String sql = "select employee_id, full_name from cp.`employee.json` " +
+            "where employee_id in (351, 352, 353, 451, 452, 453)";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(!queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+
+  @Test
+  public void testLargeInClauseToSemiJoin() throws Exception {
+    String sql = "select employee_id, full_name from cp.`employee.json` " +
+            "where employee_id in (351, 352, 353, 451, 452, 453, 551, 552, 
553, 651, 652, 653, 751, 752, 753, 851, 852, 853, 951, 952, 953)";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+
+  @Test
+  public void testStarWithInClauseToSemiJoin() throws Exception {
+    String sql = "select * from cp.`employee.json` where employee_id in 
(select employee_id from cp.`employee.json` )";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+
+  @Test
+  public void testMultiColumnInClauseWithSemiJoin() throws Exception {
+    String sql = "select * from cp.`employee.json` where (employee_id, 
full_name) in (select employee_id, full_name from cp.`employee.json` )";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+}
diff --git 
a/logical/src/main/java/org/apache/drill/common/logical/data/LogicalSemiJoin.java
 
b/logical/src/main/java/org/apache/drill/common/logical/data/LogicalSemiJoin.java
new file mode 100644
index 00000000000..a44ec9fc92f
--- /dev/null
+++ 
b/logical/src/main/java/org/apache/drill/common/logical/data/LogicalSemiJoin.java
@@ -0,0 +1,52 @@
+package org.apache.drill.common.logical.data;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.shaded.guava.com.google.common.collect.Iterators;
+import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
+
+import java.util.Iterator;
+import java.util.List;
+
+@JsonTypeName("semi-join")
+public class LogicalSemiJoin extends Join {
+
+  @JsonCreator
+  public LogicalSemiJoin(@JsonProperty("left") LogicalOperator left,
+                         @JsonProperty("right") LogicalOperator right,
+                         @JsonProperty("conditions") List<JoinCondition> 
conditions,
+                         @JsonProperty("type") JoinRelType type) {
+    super(left, right, conditions, type);
+  }
+
+
+  @Override
+  public Iterator<LogicalOperator> iterator() {
+    return Iterators.forArray(getLeft(), getRight());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> 
logicalVisitor, X value) throws E {
+    return logicalVisitor.visitJoin(this, value);
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Planner changes to support semi-join
> ------------------------------------
>
>                 Key: DRILL-6798
>                 URL: https://issues.apache.org/jira/browse/DRILL-6798
>             Project: Apache Drill
>          Issue Type: Sub-task
>          Components: Query Planning &amp; Optimization
>    Affects Versions: 1.14.0
>            Reporter: Boaz Ben-Zvi
>            Assignee: Hanumath Rao Maduri
>            Priority: Major
>             Fix For: 1.15.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to