Repository: phoenix
Updated Branches:
  refs/heads/calcite 8c19e1c13 -> 1a18e8f87


PHOENIX-2225 Support Correlate (Nested-loop Join) in Phoenix/Calcite 
Integration; PHOENIX-2226 Support Semi-Join in Phoenix/Calcite Integration


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1a18e8f8
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1a18e8f8
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1a18e8f8

Branch: refs/heads/calcite
Commit: 1a18e8f87aeb88ecca78134fb98a994f1caddf63
Parents: 8c19e1c
Author: maryannxue <wei....@intel.com>
Authored: Thu Sep 10 10:46:37 2015 -0400
Committer: maryannxue <wei....@intel.com>
Committed: Thu Sep 10 10:46:37 2015 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteIT.java   | 237 +++++++++++++++++--
 .../apache/phoenix/calcite/CalciteUtils.java    |  64 +++++
 .../calcite/jdbc/PhoenixPrepareImpl.java        |   2 +
 .../calcite/metadata/PhoenixRelMdCollation.java |  11 +
 .../calcite/rel/PhoenixAbstractJoin.java        |  22 --
 .../calcite/rel/PhoenixAbstractSemiJoin.java    |  41 ++++
 .../phoenix/calcite/rel/PhoenixClientJoin.java  |   2 +-
 .../calcite/rel/PhoenixClientSemiJoin.java      | 119 ++++++++++
 .../phoenix/calcite/rel/PhoenixCorrelate.java   |  98 ++++++++
 .../apache/phoenix/calcite/rel/PhoenixRel.java  |   5 +
 .../calcite/rel/PhoenixRelImplementorImpl.java  |  21 +-
 .../calcite/rel/PhoenixServerAggregate.java     |   2 +-
 .../phoenix/calcite/rel/PhoenixServerJoin.java  |   2 +-
 .../calcite/rel/PhoenixServerSemiJoin.java      | 120 ++++++++++
 .../phoenix/calcite/rel/PhoenixTableScan.java   |  31 ++-
 .../rel/PhoenixToEnumerableConverter.java       |   3 +-
 .../phoenix/calcite/rel/PhoenixUncollect.java   |   4 +
 .../phoenix/calcite/rel/PhoenixValues.java      |   3 +
 .../calcite/rules/PhoenixConverterRules.java    | 112 ++++++++-
 .../rules/PhoenixFilterScanMergeRule.java       |  23 +-
 .../apache/phoenix/execute/AggregatePlan.java   |   2 +-
 .../org/apache/phoenix/execute/ScanPlan.java    |   2 +-
 .../phoenix/execute/SortMergeJoinPlan.java      |   8 +-
 .../phoenix/calcite/ToExpressionTest.java       |   3 +-
 24 files changed, 867 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
index 2b8352d..cfec6fb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
@@ -328,18 +328,21 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         try {
             conn.createStatement().execute(
                     "CREATE TABLE " + SCORES_TABLE_NAME
-                    + "(student_id INTEGER PRIMARY KEY, scores INTEGER[])");
+                    + "(student_id INTEGER NOT NULL, subject_id INTEGER NOT 
NULL, scores INTEGER[] CONSTRAINT pk PRIMARY KEY (student_id, subject_id))");
             PreparedStatement stmt = conn.prepareStatement(
                     "UPSERT INTO " + SCORES_TABLE_NAME
-                    + " VALUES(?, ?)");
+                    + " VALUES(?, ?, ?)");
             stmt.setInt(1, 1);
-            stmt.setArray(2, conn.createArrayOf("INTEGER", new Integer[] {85, 
80, 82}));
+            stmt.setInt(2, 1);
+            stmt.setArray(3, conn.createArrayOf("INTEGER", new Integer[] {85, 
80, 82}));
             stmt.execute();
             stmt.setInt(1, 2);
-            stmt.setArray(2, null);
+            stmt.setInt(2, 1);
+            stmt.setArray(3, null);
             stmt.execute();
             stmt.setInt(1, 3);
-            stmt.setArray(2, conn.createArrayOf("INTEGER", new Integer[] {87, 
88, 80}));
+            stmt.setInt(2, 2);
+            stmt.setArray(3, conn.createArrayOf("INTEGER", new Integer[] {87, 
88, 80}));
             stmt.execute();
             conn.commit();
         } catch (TableAlreadyExistsException e) {
@@ -860,26 +863,6 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .resultIs(new Object[][] {{1}, {2}})
                 .close();
     }
-    
-    @Test public void testSubquery() {
-        start().sql("SELECT \"order_id\", quantity FROM " + 
JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " 
+ JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")")
-               .explainIs("PhoenixToEnumerableConverter\n" +
-                          "  PhoenixClientProject(order_id=[$0], 
QUANTITY=[$4])\n" +
-                          "    PhoenixToClientConverter\n" +
-                          "      PhoenixServerJoin(condition=[AND(=($2, $7), 
=($4, $8))], joinType=[inner])\n" +
-                          "        PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
-                          "        PhoenixServerAggregate(group=[{7}], 
EXPR$0=[MAX($4)])\n" +
-                          "          PhoenixServerJoin(condition=[=($7, $2)], 
joinType=[inner])\n" +
-                          "            PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
-                          "            PhoenixServerAggregate(group=[{2}])\n" +
-                          "              PhoenixTableScan(table=[[phoenix, 
Join, OrderTable]])\n")
-               .resultIs(new Object[][]{
-                         {"000000000000001", 1000},
-                         {"000000000000003", 3000},
-                         {"000000000000004", 4000},
-                         {"000000000000005", 5000}})
-               .close();
-    }
 
     @Test public void testScalarSubquery() {
         start().sql("select \"item_id\", name, (select max(quantity) sq \n"
@@ -1039,7 +1022,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixUncollect\n" +
                            "    PhoenixToClientConverter\n" +
-                           "      PhoenixServerProject(EXPR$0=[$1])\n" +
+                           "      PhoenixServerProject(EXPR$0=[$2])\n" +
                            "        PhoenixTableScan(table=[[phoenix, 
SCORES]])\n")
                 .resultIs(new Object[][] {
                         {85}, 
@@ -1049,6 +1032,208 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                         {88}, 
                         {80}})
                 .close();
+        start().sql("SELECT s.student_id, t.score FROM " + SCORES_TABLE_NAME + 
" s, UNNEST((SELECT scores FROM " + SCORES_TABLE_NAME + " s2 where s.student_id 
= s2.student_id)) AS t(score)")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixClientProject(STUDENT_ID=[$0], 
SCORE=[$3])\n" +
+                           "    PhoenixCorrelate(correlation=[$cor0], 
joinType=[INNER], requiredColumns=[{0}])\n" +
+                           "      PhoenixToClientConverter\n" +
+                           "        PhoenixTableScan(table=[[phoenix, 
SCORES]])\n" +
+                           "      PhoenixUncollect\n" +
+                           "        PhoenixToClientConverter\n" +
+                           "          PhoenixServerProject(EXPR$0=[$2])\n" +
+                           "            PhoenixTableScan(table=[[phoenix, 
SCORES]], filter=[=($cor0.STUDENT_ID, $0)])\n")
+                .resultIs(new Object[][] {
+                        {1, 85}, 
+                        {1, 80}, 
+                        {1, 82}, 
+                        {3, 87}, 
+                        {3, 88}, 
+                        {3, 80}})
+                .close();
+    }
+    
+    @Test public void testCorrelateAndDecorrelation() {
+        Properties correlProps = new Properties();
+        correlProps.setProperty("forceDecorrelate", Boolean.FALSE.toString());
+        Properties decorrelProps = new Properties();
+        decorrelProps.setProperty("forceDecorrelate", Boolean.TRUE.toString());
+        
+        String q1 = "select \"order_id\", quantity from " + 
JOIN_ORDER_TABLE_FULL_NAME + " o where quantity = (select max(quantity) from " 
+ JOIN_ORDER_TABLE_FULL_NAME + " o2 where o.\"item_id\" = o2.\"item_id\")";
+        Object[][] r1 = new Object[][] {
+                {"000000000000001", 1000},
+                {"000000000000003", 3000},
+                {"000000000000004", 4000},
+                {"000000000000005", 5000}};
+        String p1Correlate = 
+                "PhoenixToEnumerableConverter\n" +
+                "  PhoenixClientProject(order_id=[$0], QUANTITY=[$4])\n" +
+                "    PhoenixFilter(condition=[=($4, $7)])\n" +
+                "      PhoenixCorrelate(correlation=[$cor0], joinType=[LEFT], 
requiredColumns=[{2}])\n" +
+                "        PhoenixToClientConverter\n" +
+                "          PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
+                "        PhoenixServerAggregate(group=[{}], 
EXPR$0=[MAX($4)])\n" +
+                "          PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]], filter=[=($cor0.item_id, $2)])\n";
+        String p1Decorrelated = 
+                "PhoenixToEnumerableConverter\n" +
+                "  PhoenixClientProject(order_id=[$0], QUANTITY=[$4])\n" +
+                "    PhoenixToClientConverter\n" +
+                "      PhoenixServerJoin(condition=[AND(=($2, $7), =($4, 
$8))], joinType=[inner])\n" +
+                "        PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
+                "        PhoenixServerAggregate(group=[{7}], 
EXPR$0=[MAX($4)])\n" +
+                "          PhoenixServerJoin(condition=[=($7, $2)], 
joinType=[inner])\n" +
+                "            PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
+                "            PhoenixServerAggregate(group=[{2}])\n" +
+                "              PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n";
+        start(correlProps, 
false).sql(q1).explainIs(p1Correlate).resultIs(r1).close();
+        start(decorrelProps, 
false).sql(q1).explainIs(p1Decorrelated).resultIs(r1).close();
+                
+        String q2 = "select name from " + JOIN_ITEM_TABLE_FULL_NAME + " i 
where price = (select max(price) from " + JOIN_ITEM_TABLE_FULL_NAME + " i2 
where i.\"item_id\" = i2.\"item_id\" and i.name = i2.name and i2.\"item_id\" <> 
'invalid001')";
+        Object[][] r2 = new Object[][]{
+                {"T1"},
+                {"T2"},
+                {"T3"},
+                {"T4"},
+                {"T5"},
+                {"T6"}};
+        String p2Correlate = 
+                "PhoenixToEnumerableConverter\n" +
+                "  PhoenixClientProject(NAME=[$1])\n" +
+                "    PhoenixFilter(condition=[=($2, $7)])\n" +
+                "      PhoenixCorrelate(correlation=[$cor0], joinType=[LEFT], 
requiredColumns=[{0, 1}])\n" +
+                "        PhoenixToClientConverter\n" +
+                "          PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
+                "        PhoenixServerAggregate(group=[{}], 
EXPR$0=[MAX($2)])\n" +
+                "          PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]], filter=[AND(=($cor0.item_id, $0), =($cor0.NAME, $1), <>($0, 
'invalid001'))])\n";
+        String p2Decorrelated = 
+                "PhoenixToEnumerableConverter\n" +
+                "  PhoenixClientProject(NAME=[$1])\n" +
+                "    PhoenixToClientConverter\n" +
+                "      PhoenixServerJoin(condition=[AND(=($0, $7), =($1, $8), 
=($2, $9))], joinType=[inner])\n" +
+                "        PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
+                "        PhoenixServerAggregate(group=[{0, 1}], 
EXPR$0=[MAX($4)])\n" +
+                "          PhoenixServerJoin(condition=[AND(=($0, $2), =($1, 
$3))], joinType=[inner])\n" +
+                "            PhoenixServerProject(item_id=[$0], NAME=[$1])\n" +
+                "              PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
+                "            PhoenixToClientConverter\n" +
+                "              PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]], filter=[<>($0, 'invalid001')])\n";
+        start(correlProps, 
false).sql(q2).explainIs(p2Correlate).resultIs(r2).close();
+        start(decorrelProps, 
false).sql(q2).explainIs(p2Decorrelated).resultIs(r2).close();
+        
+        String q3a = "select \"item_id\", name from " + 
JOIN_ITEM_TABLE_FULL_NAME + " i where exists (select 1 from " + 
JOIN_ORDER_TABLE_FULL_NAME + " o where i.\"item_id\" = o.\"item_id\")";
+        Object[][] r3a = new Object[][] {
+                {"0000000001", "T1"},
+                {"0000000002", "T2"},
+                {"0000000003", "T3"},
+                {"0000000006", "T6"}};
+        String p3aCorrelate = 
+                "PhoenixToEnumerableConverter\n" +
+                "  PhoenixClientProject(item_id=[$0], NAME=[$1])\n" +
+                "    PhoenixFilter(condition=[IS NOT NULL($7)])\n" +
+                "      PhoenixCorrelate(correlation=[$cor0], joinType=[LEFT], 
requiredColumns=[{0}])\n" +
+                "        PhoenixToClientConverter\n" +
+                "          PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
+                "        PhoenixServerAggregate(group=[{}], 
agg#0=[MIN($0)])\n" +
+                "          PhoenixServerProject($f0=[true])\n" +
+                "            PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]], filter=[=($cor0.item_id, $2)])\n";
+        String p3aDecorrelated = 
+                "PhoenixToEnumerableConverter\n" +
+                "  PhoenixClientProject(item_id=[$0], NAME=[$1])\n" +
+                "    PhoenixToClientConverter\n" +
+                "      PhoenixServerSemiJoin(condition=[=($0, $8)], 
joinType=[inner])\n" +
+                "        PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
+                "        PhoenixClientProject($f0=[true], item_id0=[$7])\n" +
+                "          PhoenixToClientConverter\n" +
+                "            PhoenixServerJoin(condition=[=($7, $2)], 
joinType=[inner])\n" +
+                "              PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
+                "              PhoenixToClientConverter\n" +
+                "                PhoenixServerProject(item_id=[$0])\n" +
+                "                  PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n";
+        start(correlProps, 
false).sql(q3a).explainIs(p3aCorrelate).resultIs(r3a).close();
+        start(decorrelProps, 
false).sql(q3a).explainIs(p3aDecorrelated).resultIs(r3a).close();
+        // Test PhoenixClientSemiJoin
+        String q3b = "select \"item_id\", name from " + 
JOIN_ITEM_TABLE_FULL_NAME + " i where exists (select 1 from " + 
JOIN_ITEM_TABLE_FULL_NAME + " o where i.\"item_id\" = o.\"item_id\" and name <> 
'INVALID-1')";
+        Object[][] r3b = new Object[][] {
+                {"0000000001", "T1"},
+                {"0000000002", "T2"},
+                {"0000000003", "T3"},
+                {"0000000004", "T4"},
+                {"0000000005", "T5"},
+                {"0000000006", "T6"}};
+        String p3bDecorrelated =
+                "PhoenixToEnumerableConverter\n" +
+                "  PhoenixClientProject(item_id=[$0], NAME=[$1])\n" +
+                "    PhoenixClientSemiJoin(condition=[=($0, $8)], 
joinType=[inner])\n" +
+                "      PhoenixToClientConverter\n" +
+                "        PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
+                "      PhoenixClientProject($f0=[true], item_id0=[$0])\n" +
+                "        PhoenixClientJoin(condition=[=($0, $1)], 
joinType=[inner])\n" +
+                "          PhoenixToClientConverter\n" +
+                "            PhoenixServerProject(item_id=[$0])\n" +
+                "              PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
+                "          PhoenixToClientConverter\n" +
+                "            PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]], filter=[<>(CAST($1):VARCHAR(9) CHARACTER SET \"ISO-8859-1\" 
COLLATE \"ISO-8859-1$en_US$primary\", 'INVALID-1')])\n";
+        start(decorrelProps, 
false).sql(q3b).explainIs(p3bDecorrelated).resultIs(r3b).close();
+        
+        String q4 = "select \"item_id\", name from " + 
JOIN_ITEM_TABLE_FULL_NAME + " i where \"item_id\" in (select \"item_id\" from " 
+ JOIN_ORDER_TABLE_FULL_NAME + ")";
+        Object[][] r4 = new Object[][] {
+                {"0000000001", "T1"},
+                {"0000000002", "T2"},
+                {"0000000003", "T3"},
+                {"0000000006", "T6"}};
+        String p4Decorrelated = 
+                "PhoenixToEnumerableConverter\n" +
+                "  PhoenixClientProject(item_id=[$0], NAME=[$1])\n" +
+                "    PhoenixToClientConverter\n" +
+                "      PhoenixServerSemiJoin(condition=[=($2, $5)], 
joinType=[inner])\n" +
+                "        PhoenixServerProject($f0=[$0], $f1=[$1], $f7=[$0])\n" 
+
+                "          PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
+                "        PhoenixToClientConverter\n" +
+                "          PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n";
+        start(decorrelProps, 
false).sql(q4).explainIs(p4Decorrelated).resultIs(r4).close();
+        
+        // CALCITE-864: switching orders and items in the first join wouldn't 
work.
+        String q5 = "select \"order_id\" from " + JOIN_ITEM_TABLE_FULL_NAME + 
" i JOIN " + JOIN_ORDER_TABLE_FULL_NAME + " o on o.\"item_id\" = i.\"item_id\" 
where quantity = (select max(quantity) from " + JOIN_ORDER_TABLE_FULL_NAME + " 
o2 JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i2 on o2.\"item_id\" = i2.\"item_id\" 
where i.\"supplier_id\" = i2.\"supplier_id\")";
+        Object [][] r5 = new Object[][] {
+                {"000000000000003"},
+                {"000000000000005"},
+                {"000000000000004"}};
+        String p5Correlate = 
+                "PhoenixToEnumerableConverter\n" +
+                "  PhoenixClientProject(order_id=[$7])\n" +
+                "    PhoenixFilter(condition=[=($11, $14)])\n" +
+                "      PhoenixCorrelate(correlation=[$cor0], joinType=[LEFT], 
requiredColumns=[{5}])\n" +
+                "        PhoenixToClientConverter\n" +
+                "          PhoenixServerJoin(condition=[=($9, $0)], 
joinType=[inner])\n" +
+                "            PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
+                "            PhoenixToClientConverter\n" +
+                "              PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
+                "        PhoenixServerAggregate(group=[{}], 
EXPR$0=[MAX($4)])\n" +
+                "          PhoenixServerJoin(condition=[=($2, $7)], 
joinType=[inner])\n" +
+                "            PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
+                "            PhoenixToClientConverter\n" +
+                "              PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]], filter=[=($cor0.supplier_id, $5)])\n";
+        String p5Decorrelated =
+                "PhoenixToEnumerableConverter\n" +
+                "  PhoenixClientProject(order_id=[$7])\n" +
+                "    PhoenixToClientConverter\n" +
+                "      PhoenixServerJoin(condition=[AND(=($9, $0), =($5, 
$14))], joinType=[inner])\n" +
+                "        PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
+                "        PhoenixToClientConverter\n" +
+                "          PhoenixServerJoin(condition=[=($4, $8)], 
joinType=[inner])\n" +
+                "            PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
+                "            PhoenixServerAggregate(group=[{14}], 
EXPR$0=[MAX($4)])\n" +
+                "              PhoenixServerJoin(condition=[=($2, $7)], 
joinType=[inner])\n" +
+                "                PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
+                "                PhoenixToClientConverter\n" +
+                "                  PhoenixServerJoin(condition=[=($7, $5)], 
joinType=[inner])\n" +
+                "                    PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
+                "                    PhoenixServerAggregate(group=[{5}])\n" +
+                "                      PhoenixServerJoin(condition=[=($9, 
$0)], joinType=[inner])\n" +
+                "                        PhoenixTableScan(table=[[phoenix, 
Join, ItemTable]])\n" +
+                "                        PhoenixToClientConverter\n" +
+                "                          PhoenixTableScan(table=[[phoenix, 
Join, OrderTable]])\n";
+        start(correlProps, 
false).sql(q5).explainIs(p5Correlate).resultIs(r5).close();
+        start(decorrelProps, 
false).sql(q5).explainIs(p5Decorrelated).resultIs(r5).close();
     }
     
     @Test public void testSelectFromView() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
index d3666d2..ef67de0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
@@ -7,10 +7,14 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.calcite.avatica.util.ByteString;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexFieldAccess;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SemiJoinType;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlKind;
@@ -71,6 +75,7 @@ import 
org.apache.phoenix.expression.function.RoundTimestampExpression;
 import org.apache.phoenix.expression.function.SqrtFunction;
 import org.apache.phoenix.expression.function.TrimFunction;
 import org.apache.phoenix.expression.function.UpperFunction;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TypeMismatchException;
 import org.apache.phoenix.schema.types.PDataType;
@@ -100,6 +105,48 @@ public class CalciteUtils {
     public static PDataType sqlTypeNameToPDataType(SqlTypeName sqlTypeName) {
         return PDataType.fromTypeId(sqlTypeName.getJdbcOrdinal());
     }
+    
+    public static JoinType convertJoinType(JoinRelType type) {
+        JoinType ret = null;
+        switch (type) {
+        case INNER:
+            ret = JoinType.Inner;
+            break;
+        case LEFT:
+            ret = JoinType.Left;
+            break;
+        case RIGHT:
+            ret = JoinType.Right;
+            break;
+        case FULL:
+            ret = JoinType.Full;
+            break;
+        default:
+        }
+        
+        return ret;
+    }
+    
+    public static JoinType convertSemiJoinType(SemiJoinType type) {
+        JoinType ret = null;
+        switch (type) {
+        case INNER:
+            ret = JoinType.Inner;
+            break;
+        case LEFT:
+            ret = JoinType.Left;
+            break;
+        case SEMI:
+            ret = JoinType.Semi;
+            break;
+        case ANTI:
+            ret = JoinType.Anti;
+            break;
+        default:
+        }
+        
+        return ret;
+    }
 
        private static final Map<SqlKind, ExpressionFactory> EXPRESSION_MAP = 
Maps
                        
.newHashMapWithExpectedSize(ExpressionType.values().length);
@@ -513,6 +560,21 @@ public class CalciteUtils {
                        }
                        
                });
+               EXPRESSION_MAP.put(SqlKind.FIELD_ACCESS, new 
ExpressionFactory() {
+            @SuppressWarnings("rawtypes")
+            @Override
+            public Expression newExpression(RexNode node, Implementor 
implementor) {
+                RexFieldAccess fieldAccess = (RexFieldAccess) node;
+                RexNode refExpr = fieldAccess.getReferenceExpr();
+                if (refExpr.getKind() != SqlKind.CORREL_VARIABLE) {
+                    throw new 
UnsupportedOperationException("Non-correl-variable as reference expression of 
RexFieldAccess.");
+                }
+                String varId = ((RexCorrelVariable) refExpr).getName();
+                int index = fieldAccess.getField().getIndex();
+                PDataType type = 
sqlTypeNameToPDataType(node.getType().getSqlTypeName());
+                return implementor.newFieldAccessExpression(varId, index, 
type);
+            }              
+               });
                EXPRESSION_MAP.put(SqlKind.CAST, new ExpressionFactory() {
 
             @SuppressWarnings("rawtypes")
@@ -629,6 +691,7 @@ public class CalciteUtils {
             }
         });
         EXPRESSION_MAP.put(SqlKind.CEIL, new ExpressionFactory() {
+            @SuppressWarnings("rawtypes")
             @Override
             public Expression newExpression(RexNode node, Implementor 
implementor) {
                 //TODO Phoenix only support separate arguments.
@@ -652,6 +715,7 @@ public class CalciteUtils {
             }
         });
         EXPRESSION_MAP.put(SqlKind.FLOOR, new ExpressionFactory() {
+            @SuppressWarnings("rawtypes")
             @Override
             public Expression newExpression(RexNode node, Implementor 
implementor) {
                 // TODO Phoenix only support separate arguments.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
index 226cf70..c0d3383 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
@@ -1,5 +1,6 @@
 package org.apache.phoenix.calcite.jdbc;
 
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
 import org.apache.calcite.jdbc.CalcitePrepare;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.plan.RelOptCostFactory;
@@ -48,6 +49,7 @@ public class PhoenixPrepareImpl extends CalcitePrepareImpl {
             RelOptCostFactory costFactory) {
         RelOptPlanner planner = super.createPlanner(prepareContext, 
externalContext, costFactory);
         
+        planner.removeRule(EnumerableRules.ENUMERABLE_SEMI_JOIN_RULE);
         planner.removeRule(JoinCommuteRule.INSTANCE);
         planner.addRule(JoinCommuteRule.SWAP_OUTER);
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java
index 821d7b9..1b559f0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java
@@ -10,9 +10,11 @@ import 
org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdCollation;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.sql.SemiJoinType;
 import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.phoenix.calcite.rel.PhoenixClientJoin;
+import org.apache.phoenix.calcite.rel.PhoenixCorrelate;
 import org.apache.phoenix.calcite.rel.PhoenixLimit;
 import org.apache.phoenix.calcite.rel.PhoenixServerJoin;
 
@@ -25,6 +27,10 @@ public class PhoenixRelMdCollation {
 
     private PhoenixRelMdCollation() { }
 
+    public ImmutableList<RelCollation> collations(PhoenixCorrelate correlate) {
+        return ImmutableList.copyOf(correlate(correlate.getLeft(), 
correlate.getRight(), correlate.getJoinType()));
+    }
+
     public ImmutableList<RelCollation> collations(PhoenixLimit limit) {
         return ImmutableList.copyOf(RelMdCollation.limit(limit.getInput()));
     }
@@ -37,6 +43,11 @@ public class PhoenixRelMdCollation {
         return 
ImmutableList.copyOf(PhoenixRelMdCollation.mergeJoin(join.getLeft(), 
join.getRight(), join.joinInfo.leftKeys, join.joinInfo.rightKeys));
     }
     
+    /** Helper method to determine a {@link PhoenixCorrelate}'s collation. */
+    public static List<RelCollation> correlate(RelNode left, RelNode right, 
SemiJoinType joinType) {
+        return RelMetadataQuery.collations(left);
+    }
+    
     /** Helper method to determine a {@link PhoenixServerJoin}'s collation. */
     public static List<RelCollation> hashJoin(RelNode left, RelNode right, 
JoinRelType joinType) {
         return RelMetadataQuery.collations(left);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
index 829c401..3355ee2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
@@ -16,7 +16,6 @@ import org.apache.calcite.util.ImmutableIntList;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
-import org.apache.phoenix.parse.JoinTableNode.JoinType;
 
 /**
  * Implementation of {@link org.apache.calcite.rel.core.Join}
@@ -57,25 +56,4 @@ abstract public class PhoenixAbstractJoin extends Join 
implements PhoenixRel {
 
         return plan;
     }
-    
-    protected static JoinType convertJoinType(JoinRelType type) {
-        JoinType ret = null;
-        switch (type) {
-        case INNER:
-            ret = JoinType.Inner;
-            break;
-        case LEFT:
-            ret = JoinType.Left;
-            break;
-        case RIGHT:
-            ret = JoinType.Right;
-            break;
-        case FULL:
-            ret = JoinType.Full;
-            break;
-        default:
-        }
-        
-        return ret;
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java
new file mode 100644
index 0000000..e788a75
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java
@@ -0,0 +1,41 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+
+abstract public class PhoenixAbstractSemiJoin extends SemiJoin implements 
PhoenixRel {
+
+    protected PhoenixAbstractSemiJoin(RelOptCluster cluster, RelTraitSet 
traitSet,
+            RelNode left, RelNode right, RexNode condition,
+            ImmutableIntList leftKeys, ImmutableIntList rightKeys) {
+        super(cluster, traitSet, left, right, condition, leftKeys, rightKeys);
+    }
+    
+    protected QueryPlan implementInput(Implementor implementor, int index, 
List<Expression> conditionExprs) {
+        assert index <= 1;
+        
+        PhoenixRel input = index == 0 ? (PhoenixRel) left : (PhoenixRel) right;
+        ImmutableIntList keys = index == 0 ? leftKeys : rightKeys;
+        QueryPlan plan = implementor.visitInput(0, input);
+        for (Iterator<Integer> iter = keys.iterator(); iter.hasNext();) {
+            Integer i = iter.next();
+            conditionExprs.add(implementor.newColumnExpression(i));
+        }
+        if (conditionExprs.isEmpty()) {
+            conditionExprs.add(LiteralExpression.newConstant(0));
+        }
+
+        return plan;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
index c6cf214..599661c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
@@ -120,7 +120,7 @@ public class PhoenixClientJoin extends PhoenixAbstractJoin {
         PTable rightTable = implementor.getTableRef().getTable();
         implementor.popContext();
         
-        JoinType type = convertJoinType(getJoinType());
+        JoinType type = CalciteUtils.convertJoinType(getJoinType());
         PTable joinedTable;
         try {
             joinedTable = JoinCompiler.joinProjectedTables(leftTable, 
rightTable, type);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
new file mode 100644
index 0000000..87dc44d
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
@@ -0,0 +1,119 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.schema.TableRef;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+
+public class PhoenixClientSemiJoin extends PhoenixAbstractSemiJoin implements
+        PhoenixRel {
+    
+    public static PhoenixClientSemiJoin create(
+            final RelNode left, final RelNode right, RexNode condition) {
+        RelOptCluster cluster = left.getCluster();
+        final JoinInfo joinInfo = JoinInfo.of(left, right, condition);
+        final RelTraitSet traits =
+                cluster.traitSet().replace(PhoenixRel.CLIENT_CONVENTION)
+                .replaceIfs(RelCollationTraitDef.INSTANCE,
+                        new Supplier<List<RelCollation>>() {
+                    public List<RelCollation> get() {
+                        return PhoenixRelMdCollation.mergeJoin(left, right, 
joinInfo.leftKeys, joinInfo.rightKeys);
+                    }
+                });
+        return new PhoenixClientSemiJoin(cluster, traits, left, right, 
condition, 
+                joinInfo.leftKeys, joinInfo.rightKeys);
+    }
+
+    private PhoenixClientSemiJoin(RelOptCluster cluster, RelTraitSet traitSet,
+            RelNode left, RelNode right, RexNode condition,
+            ImmutableIntList leftKeys, ImmutableIntList rightKeys) {
+        super(cluster, traitSet, left, right, condition, leftKeys, rightKeys);
+    }
+    
+    @Override
+    public SemiJoin copy(RelTraitSet traitSet, RexNode condition,
+            RelNode left, RelNode right, JoinRelType joinType, boolean 
semiJoinDone) {
+        assert joinType == JoinRelType.INNER;
+        return create(left, right, condition);
+    }    
+
+    @Override
+    public RelOptCost computeSelfCost(RelOptPlanner planner) {
+        if (getLeft().getConvention() != PhoenixRel.CLIENT_CONVENTION 
+                || getRight().getConvention() != PhoenixRel.CLIENT_CONVENTION)
+            return planner.getCostFactory().makeInfiniteCost();            
+        
+        if ((!leftKeys.isEmpty() && 
!RelCollations.contains(RelMetadataQuery.collations(getLeft()), leftKeys))
+                || (!rightKeys.isEmpty() && 
!RelCollations.contains(RelMetadataQuery.collations(getRight()), rightKeys)))
+            return planner.getCostFactory().makeInfiniteCost();
+        
+        double rowCount = RelMetadataQuery.getRowCount(this);        
+
+        double leftRowCount = RelMetadataQuery.getRowCount(getLeft());
+        if (Double.isInfinite(leftRowCount)) {
+            rowCount = leftRowCount;
+        } else {
+            rowCount += leftRowCount;
+            double rightRowCount = RelMetadataQuery.getRowCount(getRight());
+            if (Double.isInfinite(rightRowCount)) {
+                rowCount = rightRowCount;
+            } else {
+                rowCount += rightRowCount;
+            }
+        }            
+        RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0);
+
+        return cost.multiplyBy(SERVER_FACTOR).multiplyBy(PHOENIX_FACTOR);
+    }
+
+    @Override
+    public QueryPlan implement(Implementor implementor) {
+        List<Expression> leftExprs = Lists.<Expression> newArrayList();
+        List<Expression> rightExprs = Lists.<Expression> newArrayList();
+
+        implementor.pushContext(new 
ImplementorContext(implementor.getCurrentContext().isRetainPKColumns() && 
getJoinType() != JoinRelType.FULL, true));
+        QueryPlan leftPlan = implementInput(implementor, 0, leftExprs);
+        TableRef joinedTable = implementor.getTableRef();
+        implementor.popContext();
+
+        implementor.pushContext(new ImplementorContext(false, true));
+        QueryPlan rightPlan = implementInput(implementor, 1, rightExprs);
+        implementor.popContext();
+        
+        JoinType type = JoinType.Semi;
+        implementor.setTableRef(joinedTable);
+        PhoenixStatement stmt = leftPlan.getContext().getStatement();
+        ColumnResolver resolver = leftPlan.getContext().getResolver();
+        StatementContext context = new StatementContext(stmt, resolver, new 
Scan(), new SequenceManager(stmt));
+
+        return new SortMergeJoinPlan(context, leftPlan.getStatement(), 
+                joinedTable, type, leftPlan, rightPlan, leftExprs, rightExprs, 
+                joinedTable.getTable(), joinedTable.getTable(), null, 0, 
false);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
new file mode 100644
index 0000000..430e282
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
@@ -0,0 +1,98 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.compile.JoinCompiler;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.CorrelatePlan;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+
+import com.google.common.base.Supplier;
+
+public class PhoenixCorrelate extends Correlate implements PhoenixRel {
+    
+    public static PhoenixCorrelate create(final RelNode left, final RelNode 
right, 
+            CorrelationId correlationId, ImmutableBitSet requiredColumns, 
+            final SemiJoinType joinType) {
+        RelOptCluster cluster = left.getCluster();
+        final RelTraitSet traits =
+                cluster.traitSet().replace(PhoenixRel.CLIENT_CONVENTION)
+                .replaceIfs(RelCollationTraitDef.INSTANCE,
+                        new Supplier<List<RelCollation>>() {
+                    public List<RelCollation> get() {
+                        return PhoenixRelMdCollation.correlate(left, right, 
joinType);
+                    }
+                });
+        return new PhoenixCorrelate(cluster, traits, left, right, 
correlationId,
+                requiredColumns, joinType);
+    }
+
+    private PhoenixCorrelate(RelOptCluster cluster, RelTraitSet traits,
+            RelNode left, RelNode right, CorrelationId correlationId,
+            ImmutableBitSet requiredColumns, SemiJoinType joinType) {
+        super(cluster, traits, left, right, correlationId, requiredColumns,
+                joinType);
+    }
+
+    @Override
+    public Correlate copy(RelTraitSet traitSet, RelNode left, RelNode right,
+            CorrelationId correlationId, ImmutableBitSet requiredColumns,
+            SemiJoinType joinType) {
+        return create(left, right, correlationId, requiredColumns, joinType);
+    }
+
+    @Override
+    public RelOptCost computeSelfCost(RelOptPlanner planner) {
+        if (getLeft().getConvention() != PhoenixRel.CLIENT_CONVENTION 
+                || getRight().getConvention() != PhoenixRel.CLIENT_CONVENTION)
+            return planner.getCostFactory().makeInfiniteCost();   
+        
+        return super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR);
+    }
+    
+    @Override
+    public QueryPlan implement(Implementor implementor) {
+        implementor.pushContext(new 
ImplementorContext(implementor.getCurrentContext().isRetainPKColumns(), true));
+        QueryPlan leftPlan = implementor.visitInput(0, (PhoenixRel) getLeft());
+        PTable leftTable = implementor.getTableRef().getTable();
+        implementor.popContext();
+
+        
implementor.getRuntimeContext().defineCorrelateVariable(getCorrelVariable(), 
implementor.getTableRef());
+
+        implementor.pushContext(new ImplementorContext(false, true));
+        QueryPlan rightPlan = implementor.visitInput(1, (PhoenixRel) 
getRight());
+        PTable rightTable = implementor.getTableRef().getTable();
+        implementor.popContext();
+                
+        JoinType type = CalciteUtils.convertSemiJoinType(getJoinType());
+        PTable joinedTable;
+        try {
+            joinedTable = JoinCompiler.joinProjectedTables(leftTable, 
rightTable, type);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+        TableRef tableRef = new TableRef(joinedTable);
+        implementor.setTableRef(tableRef);
+
+        return new CorrelatePlan(leftPlan, rightPlan, getCorrelVariable(), 
+                type, false, implementor.getRuntimeContext(), joinedTable, 
+                leftTable, rightTable, leftTable.getColumns().size() - 
leftTable.getPKColumns().size());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
index 1136ea6..305af62 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
@@ -8,11 +8,13 @@ import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.phoenix.calcite.metadata.PhoenixRelMetadataProvider;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.execute.RuntimeContext;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.ColumnExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PDataType;
 
 /**
  * Relational expression in Phoenix.
@@ -68,6 +70,9 @@ public interface PhoenixRel extends RelNode {
   interface Implementor {
     QueryPlan visitInput(int i, PhoenixRel input);
     ColumnExpression newColumnExpression(int index);
+    @SuppressWarnings("rawtypes")
+    Expression newFieldAccessExpression(String variableId, int index, 
PDataType type);
+    RuntimeContext getRuntimeContext();
     void setTableRef(TableRef tableRef);
     TableRef getTableRef();
     void pushContext(ImplementorContext context);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
index 28b4f51..d4b304a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
@@ -14,8 +14,10 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.TupleProjectionCompiler;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.execute.RuntimeContext;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.ColumnExpression;
+import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.schema.ColumnRef;
@@ -28,14 +30,17 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PDataType;
 
 import com.google.common.collect.Lists;
 
 public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
+    private final RuntimeContext runtimeContext;
        private TableRef tableRef;
        private Stack<ImplementorContext> contextStack;
        
-       public PhoenixRelImplementorImpl() {
+       public PhoenixRelImplementorImpl(RuntimeContext runtimeContext) {
+           this.runtimeContext = runtimeContext;
            this.contextStack = new Stack<ImplementorContext>();
            pushContext(new ImplementorContext(true, false));
        }
@@ -50,7 +55,19 @@ public class PhoenixRelImplementorImpl implements 
PhoenixRel.Implementor {
                ColumnRef colRef = new ColumnRef(this.tableRef, index);
                return colRef.newColumnExpression();
        }
-
+    
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Expression newFieldAccessExpression(String variableId, int index, 
PDataType type) {
+        TableRef variableDef = 
runtimeContext.getCorrelateVariableDef(variableId);
+        Expression fieldAccessExpr = new ColumnRef(variableDef, 
index).newColumnExpression();
+        return new CorrelateVariableFieldAccessExpression(runtimeContext, 
variableId, fieldAccessExpr);
+    }
+    
+    @Override
+    public RuntimeContext getRuntimeContext() {
+        return runtimeContext;
+    }
 
     @Override
        public void setTableRef(TableRef tableRef) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
index 8c0c83a..a707b14 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
@@ -74,7 +74,7 @@ public class PhoenixServerAggregate extends 
PhoenixAbstractAggregate {
         GroupBy groupBy = super.getGroupBy(implementor);       
         super.serializeAggregators(implementor, context, groupBy.isEmpty());
         
-        QueryPlan aggPlan = new AggregatePlan(context, 
basePlan.getStatement(), basePlan.getTableRef(), RowProjector.EMPTY_PROJECTOR, 
null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null);
+        QueryPlan aggPlan = new AggregatePlan(context, 
basePlan.getStatement(), basePlan.getTableRef(), RowProjector.EMPTY_PROJECTOR, 
null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null, basePlan.getDynamicFilter());
         if (hashJoinPlan != null) {        
             aggPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), 
aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
index 1935251..047deac 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
@@ -114,7 +114,7 @@ public class PhoenixServerJoin extends PhoenixAbstractJoin {
         PTable rightTable = implementor.getTableRef().getTable();
         implementor.popContext();
         
-        JoinType type = convertJoinType(getJoinType());
+        JoinType type = CalciteUtils.convertJoinType(getJoinType());
         PTable joinedTable;
         try {
             joinedTable = JoinCompiler.joinProjectedTables(leftTable, 
rightTable, type);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
new file mode 100644
index 0000000..4593bdb
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
@@ -0,0 +1,120 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+
+public class PhoenixServerSemiJoin extends PhoenixAbstractSemiJoin {
+    
+    public static PhoenixServerSemiJoin create(
+            final RelNode left, final RelNode right, RexNode condition) {
+        RelOptCluster cluster = left.getCluster();
+        final RelTraitSet traits =
+                cluster.traitSet().replace(PhoenixRel.SERVERJOIN_CONVENTION)
+                .replaceIfs(RelCollationTraitDef.INSTANCE,
+                        new Supplier<List<RelCollation>>() {
+                    public List<RelCollation> get() {
+                        return PhoenixRelMdCollation.hashJoin(left, right, 
JoinRelType.INNER);
+                    }
+                });
+        final JoinInfo joinInfo = JoinInfo.of(left, right, condition);
+        assert joinInfo.isEqui();
+        return new PhoenixServerSemiJoin(cluster, traits, left, right, 
condition, 
+                joinInfo.leftKeys, joinInfo.rightKeys);
+    }
+
+    private PhoenixServerSemiJoin(RelOptCluster cluster, RelTraitSet traitSet,
+            RelNode left, RelNode right, RexNode condition,
+            ImmutableIntList leftKeys, ImmutableIntList rightKeys) {
+        super(cluster, traitSet, left, right, condition, leftKeys, rightKeys);
+    }
+    
+    @Override
+    public SemiJoin copy(RelTraitSet traitSet, RexNode condition,
+            RelNode left, RelNode right, JoinRelType joinType, boolean 
semiJoinDone) {
+        assert joinType == JoinRelType.INNER;
+        return create(left, right, condition);
+    }    
+
+    @Override
+    public RelOptCost computeSelfCost(RelOptPlanner planner) {
+        if (getLeft().getConvention() != PhoenixRel.SERVER_CONVENTION 
+                || getRight().getConvention() != PhoenixRel.CLIENT_CONVENTION)
+            return planner.getCostFactory().makeInfiniteCost();            
+        
+        //TODO return infinite cost if RHS size exceeds memory limit.
+        
+        double rowCount = RelMetadataQuery.getRowCount(this);
+
+        double leftRowCount = RelMetadataQuery.getRowCount(getLeft());
+        if (Double.isInfinite(leftRowCount)) {
+            rowCount = leftRowCount;
+        } else {
+            rowCount += leftRowCount;
+            double rightRowCount = RelMetadataQuery.getRowCount(getRight());
+            if (Double.isInfinite(rightRowCount)) {
+                rowCount = rightRowCount;
+            } else {
+                rowCount += Util.nLogN(rightRowCount);
+            }
+        }            
+        
+        RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0);
+
+        return cost.multiplyBy(SERVER_FACTOR).multiplyBy(PHOENIX_FACTOR);
+    }
+
+    @Override
+    public QueryPlan implement(Implementor implementor) {
+        List<Expression> leftExprs = Lists.<Expression> newArrayList();
+        List<Expression> rightExprs = Lists.<Expression> newArrayList();
+
+        implementor.pushContext(new 
ImplementorContext(implementor.getCurrentContext().isRetainPKColumns(), true));
+        QueryPlan leftPlan = implementInput(implementor, 0, leftExprs);
+        TableRef joinedTable = implementor.getTableRef();
+        implementor.popContext();
+
+        implementor.pushContext(new ImplementorContext(false, true));
+        QueryPlan rightPlan = implementInput(implementor, 1, rightExprs);
+        implementor.popContext();
+        
+        JoinType type = JoinType.Semi;
+        implementor.setTableRef(joinedTable);
+        @SuppressWarnings("unchecked")
+        HashJoinInfo hashJoinInfo = new HashJoinInfo(
+                joinedTable.getTable(), 
+                new ImmutableBytesPtr[] {new ImmutableBytesPtr()}, 
+                (List<Expression>[]) (new List[] {leftExprs}), 
+                new JoinType[] {type}, new boolean[] {true}, 
+                new PTable[] {null}, new int[] {0}, null, null);
+        
+        return HashJoinPlan.create((SelectStatement) 
(leftPlan.getStatement()), leftPlan, hashJoinInfo, new 
HashJoinPlan.HashSubPlan[] {new HashJoinPlan.HashSubPlan(0, rightPlan, 
rightExprs, false, null, null)});
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
index 171dd26..5bf9569 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.compile.WhereOptimizer;
 import org.apache.phoenix.execute.ScanPlan;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.SelectStatement;
@@ -40,6 +41,7 @@ import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.SchemaUtil;
 
 import com.google.common.base.Supplier;
@@ -88,7 +90,24 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
                 final PhoenixTable phoenixTable = 
table.unwrap(PhoenixTable.class);
                 PTable pTable = phoenixTable.getTable();
                 TableRef tableRef = new 
TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, 
false);
-                Implementor tmpImplementor = new PhoenixRelImplementorImpl();
+                // We use a implementor with a special implementation for 
field access
+                // here, which translates RexFieldAccess into a 
LiteralExpression
+                // with a sample value. This will achieve 3 goals at a time:
+                // 1) avoid getting exception when translating RexFieldAccess 
at this 
+                //    time when the correlate variable has not been defined 
yet.
+                // 2) get a guess of ScanRange even if the runtime value is 
absent.
+                // 3) test whether this dynamic filter is worth a recompile at 
runtime.
+                Implementor tmpImplementor = new 
PhoenixRelImplementorImpl(null) {                    
+                    @SuppressWarnings("rawtypes")
+                    @Override
+                    public Expression newFieldAccessExpression(String 
variableId, int index, PDataType type) {
+                        try {
+                            return 
LiteralExpression.newConstant(type.getSampleValue(), type);
+                        } catch (SQLException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }                    
+                };
                 tmpImplementor.setTableRef(tableRef);
                 SelectStatement select = SelectStatement.SELECT_ONE;
                 PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc);
@@ -163,10 +182,18 @@ public class PhoenixTableScan extends TableScan 
implements PhoenixRel {
             ColumnResolver resolver = FromCompiler.getResolver(tableRef);
             StatementContext context = new StatementContext(stmt, resolver, 
new Scan(), new SequenceManager(stmt));
             SelectStatement select = SelectStatement.SELECT_ONE;
+            Expression dynamicFilter = null;
             if (filter != null) {
                 Expression filterExpr = CalciteUtils.toExpression(filter, 
implementor);
                 filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, 
select, filterExpr);
                 WhereCompiler.setScanFilter(context, select, filterExpr, true, 
false);
+                // TODO This is not absolutely strict. We may have a filter 
like:
+                // pk = '0' and pk = $cor0 where $cor0 happens to get a sample 
value
+                // as '0', thus making the below test return false and adding 
an
+                // unnecessary dynamic filter. This would only be a 
performance bug though.
+                if (!context.getScanRanges().equals(this.scanRanges)) {
+                    dynamicFilter = filterExpr;
+                }
             }
             projectAllColumnFamilies(context.getScan(), 
phoenixTable.getTable());
             if (implementor.getCurrentContext().forceProject()) {
@@ -178,7 +205,7 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
             Integer limit = null;
             OrderBy orderBy = OrderBy.EMPTY_ORDER_BY;
             ParallelIteratorFactory iteratorFactory = null;
-            return new ScanPlan(context, select, tableRef, 
RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true);
+            return new ScanPlan(context, select, tableRef, 
RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true, 
dynamicFilter);
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
index e4cd07d..b171dac 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
@@ -27,6 +27,7 @@ import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.execute.DelegateQueryPlan;
+import org.apache.phoenix.execute.RuntimeContextImpl;
 import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -83,7 +84,7 @@ public class PhoenixToEnumerableConverter extends 
ConverterImpl implements Enume
     }
     
     static QueryPlan makePlan(PhoenixRel rel) {
-        final PhoenixRel.Implementor phoenixImplementor = new 
PhoenixRelImplementorImpl();
+        final PhoenixRel.Implementor phoenixImplementor = new 
PhoenixRelImplementorImpl(new RuntimeContextImpl());
         final QueryPlan plan = phoenixImplementor.visitInput(0, rel);
         return new DelegateQueryPlan(plan) {
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
index aa53ae4..d39f3a6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
@@ -13,6 +13,8 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.UnnestArrayPlan;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PDataType;
 
 public class PhoenixUncollect extends Uncollect implements PhoenixRel {
@@ -53,6 +55,8 @@ public class PhoenixUncollect extends Uncollect implements 
PhoenixRel {
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }
+        PTable projectedTable = implementor.createProjectedTable();
+        implementor.setTableRef(new TableRef(projectedTable));
         return new UnnestArrayPlan(plan, arrayExpression, false);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
index 89aaa07..22284e6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
@@ -40,6 +40,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -116,6 +117,8 @@ public class PhoenixValues extends Values implements 
PhoenixRel {
             TupleProjector projector = implementor.project(exprs);
             literalResult.add(projector.projectResults(baseTuple));
         }
+        PTable projectedTable = implementor.createProjectedTable();
+        implementor.setTableRef(new TableRef(projectedTable));
         
         try {
             PhoenixStatement stmt = new PhoenixStatement(phoenixConnection);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
index 210306d..c60c27b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
@@ -18,15 +18,18 @@ import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Aggregate.Group;
 import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Correlate;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.SemiJoin;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.Uncollect;
 import org.apache.calcite.rel.core.Union;
 import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
@@ -40,13 +43,16 @@ import 
org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate;
 import org.apache.phoenix.calcite.rel.PhoenixClientAggregate;
 import org.apache.phoenix.calcite.rel.PhoenixClientJoin;
 import org.apache.phoenix.calcite.rel.PhoenixClientProject;
+import org.apache.phoenix.calcite.rel.PhoenixClientSemiJoin;
 import org.apache.phoenix.calcite.rel.PhoenixClientSort;
+import org.apache.phoenix.calcite.rel.PhoenixCorrelate;
 import org.apache.phoenix.calcite.rel.PhoenixFilter;
 import org.apache.phoenix.calcite.rel.PhoenixLimit;
 import org.apache.phoenix.calcite.rel.PhoenixRel;
 import org.apache.phoenix.calcite.rel.PhoenixServerAggregate;
 import org.apache.phoenix.calcite.rel.PhoenixServerJoin;
 import org.apache.phoenix.calcite.rel.PhoenixServerProject;
+import org.apache.phoenix.calcite.rel.PhoenixServerSemiJoin;
 import org.apache.phoenix.calcite.rel.PhoenixServerSort;
 import org.apache.phoenix.calcite.rel.PhoenixToClientConverter;
 import org.apache.phoenix.calcite.rel.PhoenixToEnumerableConverter;
@@ -85,8 +91,11 @@ public class PhoenixConverterRules {
         PhoenixUnionRule.INSTANCE,
         PhoenixClientJoinRule.INSTANCE,
         PhoenixServerJoinRule.INSTANCE,
+        PhoenixClientSemiJoinRule.INSTANCE,
+        PhoenixServerSemiJoinRule.INSTANCE,
         PhoenixValuesRule.INSTANCE,
         PhoenixUncollectRule.INSTANCE,
+        PhoenixCorrelateRule.INSTANCE,
     };
 
     public static final RelOptRule[] CONVERTIBLE_RULES = {
@@ -106,8 +115,11 @@ public class PhoenixConverterRules {
         PhoenixUnionRule.CONVERTIBLE,
         PhoenixClientJoinRule.CONVERTIBLE,
         PhoenixServerJoinRule.CONVERTIBLE,
+        PhoenixClientSemiJoinRule.INSTANCE,
+        PhoenixServerSemiJoinRule.INSTANCE,
         PhoenixValuesRule.INSTANCE,
         PhoenixUncollectRule.INSTANCE,
+        PhoenixCorrelateRule.INSTANCE,
     };
 
     /** Base class for planner rules that convert a relational expression to
@@ -275,7 +287,7 @@ public class PhoenixConverterRules {
      * {@link PhoenixFilter}.
      */
     public static class PhoenixFilterRule extends PhoenixConverterRule {
-        private static Predicate<LogicalFilter> IS_CONVERTIBLE = new 
Predicate<LogicalFilter>() {
+        protected static Predicate<LogicalFilter> IS_CONVERTIBLE = new 
Predicate<LogicalFilter>() {
             @Override
             public boolean apply(LogicalFilter input) {
                 return isConvertible(input);
@@ -582,6 +594,78 @@ public class PhoenixConverterRules {
     }
 
     /**
+     * Rule to convert a {@link org.apache.calcite.rel.core.SemiJoin} to a
+     * {@link PhoenixClientSemiJoin}.
+     */
+    public static class PhoenixClientSemiJoinRule extends PhoenixConverterRule 
{
+        
+        public static final PhoenixClientSemiJoinRule INSTANCE = new 
PhoenixClientSemiJoinRule();
+
+        private PhoenixClientSemiJoinRule() {
+            super(SemiJoin.class, Convention.NONE, 
+                    PhoenixRel.CLIENT_CONVENTION, "PhoenixClientSemiJoinRule");
+        }
+
+        public RelNode convert(RelNode rel) {
+            final SemiJoin join = (SemiJoin) rel;
+            RelNode left = join.getLeft();
+            RelNode right = join.getRight();
+            
+            JoinInfo joinInfo = JoinInfo.of(join.getLeft(), join.getRight(), 
join.getCondition());
+            if (!joinInfo.leftKeys.isEmpty()) {
+                List<RelFieldCollation> leftFieldCollations = 
Lists.newArrayList();
+                for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); 
iter.hasNext();) {
+                    leftFieldCollations.add(new RelFieldCollation(iter.next(), 
Direction.ASCENDING));
+                }
+                RelCollation leftCollation = 
RelCollations.of(leftFieldCollations);
+                left = LogicalSort.create(left, leftCollation, null, null);
+                
+                List<RelFieldCollation> rightFieldCollations = 
Lists.newArrayList();
+                for (Iterator<Integer> iter = joinInfo.rightKeys.iterator(); 
iter.hasNext();) {
+                    rightFieldCollations.add(new 
RelFieldCollation(iter.next(), Direction.ASCENDING));
+                }
+                RelCollation rightCollation = 
RelCollations.of(rightFieldCollations);
+                right = LogicalSort.create(right, rightCollation, null, null);
+            }
+            
+            return PhoenixClientSemiJoin.create(
+                    convert(
+                            left, 
+                            
left.getTraitSet().replace(PhoenixRel.CLIENT_CONVENTION)),
+                    convert(
+                            right, 
+                            
right.getTraitSet().replace(PhoenixRel.CLIENT_CONVENTION)),
+                    join.getCondition());
+        }
+    }
+
+    /**
+     * Rule to convert a {@link org.apache.calcite.rel.core.SemiJoin} to a
+     * {@link PhoenixServerSemiJoin}.
+     */
+    public static class PhoenixServerSemiJoinRule extends PhoenixConverterRule 
{
+        
+        public static final PhoenixServerSemiJoinRule INSTANCE = new 
PhoenixServerSemiJoinRule();
+
+        private PhoenixServerSemiJoinRule() {
+            super(SemiJoin.class, Convention.NONE, 
+                    PhoenixRel.SERVERJOIN_CONVENTION, 
"PhoenixServerSemiJoinRule");
+        }
+
+        public RelNode convert(RelNode rel) {
+            final SemiJoin join = (SemiJoin) rel;
+            return PhoenixServerSemiJoin.create(
+                    convert(
+                            join.getLeft(), 
+                            
join.getLeft().getTraitSet().replace(PhoenixRel.SERVER_CONVENTION)),
+                    convert(
+                            join.getRight(), 
+                            
join.getRight().getTraitSet().replace(PhoenixRel.CLIENT_CONVENTION)),
+                    join.getCondition());
+        }
+    }
+
+    /**
      * Rule to convert a {@link org.apache.calcite.rel.core.Values} to a
      * {@link PhoenixValues}.
      */
@@ -624,6 +708,32 @@ public class PhoenixConverterRules {
     }
 
     /**
+     * Rule to convert a {@link 
org.apache.calcite.rel.logical.LogicalCorrelate} to a
+     * {@link PhoenixCorrelate}.
+     */
+    public static class PhoenixCorrelateRule extends PhoenixConverterRule {
+        
+        private static final PhoenixCorrelateRule INSTANCE = new 
PhoenixCorrelateRule();
+
+        private PhoenixCorrelateRule() {
+            super(LogicalCorrelate.class, Convention.NONE, 
+                    PhoenixRel.CLIENT_CONVENTION, "PhoenixCorrelateRule");
+        }
+
+        public RelNode convert(RelNode rel) {
+            final Correlate correlate = (Correlate) rel;
+            return PhoenixCorrelate.create(
+                convert(correlate.getLeft(), 
+                        correlate.getLeft().getTraitSet().replace(out)),
+                convert(correlate.getRight(), 
+                        correlate.getRight().getTraitSet().replace(out)),
+                correlate.getCorrelationId(),
+                correlate.getRequiredColumns(),
+                correlate.getJoinType());
+        }
+    }
+
+    /**
      * Rule to convert an {@link 
org.apache.calcite.rel.logical.LogicalIntersect}
      * to an {@link PhoenixIntersectRel}.
      o/

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
index d717a1e..f846d45 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
@@ -11,18 +11,27 @@ public class PhoenixFilterScanMergeRule extends RelOptRule {
 
     /** Predicate that returns true if a table scan has no filter. */
     private static final Predicate<PhoenixTableScan> NO_FILTER =
-        new Predicate<PhoenixTableScan>() {
-            @Override
-            public boolean apply(PhoenixTableScan phoenixTableScan) {
-                return phoenixTableScan.filter == null;
-            }
-        };
+            new Predicate<PhoenixTableScan>() {
+        @Override
+        public boolean apply(PhoenixTableScan phoenixTableScan) {
+            return phoenixTableScan.filter == null;
+        }
+    };
+
+    /** Predicate that returns true if a filter is Phoenix implementable. */
+    private static Predicate<Filter> IS_CONVERTIBLE = 
+            new Predicate<Filter>() {
+        @Override
+        public boolean apply(Filter input) {
+            return PhoenixConverterRules.isConvertible(input);
+        }            
+    };
 
     public static final PhoenixFilterScanMergeRule INSTANCE = new 
PhoenixFilterScanMergeRule();
 
     private PhoenixFilterScanMergeRule() {
         super(
-            operand(Filter.class,
+            operand(Filter.class, null, IS_CONVERTIBLE,
                 operand(PhoenixTableScan.class, null, NO_FILTER, any())));
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index da55fb5..f38c637 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -85,7 +85,7 @@ public class AggregatePlan extends BaseQueryPlan {
         this(context, statement, table, projector, limit, orderBy, 
parallelIteratorFactory, groupBy, having, null);
     }
     
-    private AggregatePlan(
+    public AggregatePlan(
             StatementContext context, FilterableStatement statement, TableRef 
table, RowProjector projector,
             Integer limit, OrderBy orderBy, ParallelIteratorFactory 
parallelIteratorFactory, GroupBy groupBy,
             Expression having, Expression dynamicFilter) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 2d408bc..cf4dce9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -86,7 +86,7 @@ public class ScanPlan extends BaseQueryPlan {
         this(context, statement, table, projector, limit, orderBy, 
parallelIteratorFactory, allowPageFilter, null);
     }
     
-    private ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, 
ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, 
Expression dynamicFilter) throws SQLException {
+    public ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, 
ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, 
Expression dynamicFilter) throws SQLException {
         super(context, statement, table, projector, 
context.getBindManager().getParameterMetaData(), limit, orderBy, 
GroupBy.EMPTY_GROUP_BY,
                 parallelIteratorFactory != null ? parallelIteratorFactory :
                         buildResultIteratorFactory(context, table, orderBy, 
limit, allowPageFilter), dynamicFilter);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index da6e541..c582cd2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -647,9 +647,11 @@ public class SortMergeJoinPlan implements QueryPlan {
 
     @Override
     public QueryPlan limit(Integer limit) {
-        // This should never be reached, since SortMergeJoinPlan should always 
be
-        // wrapped inside a ClientProcessingPlan.
-        throw new UnsupportedOperationException();
+        if (limit == null)
+            return this;
+        
+        return new ClientScanPlan(this.getContext(), this.getStatement(), 
this.getTableRef(),
+                this.getProjector(), limit, null, OrderBy.EMPTY_ORDER_BY, 
this);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java
index 89578f6..3734b4c 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java
@@ -28,6 +28,7 @@ import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.ExpressionCompiler;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.RuntimeContextImpl;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -79,7 +80,7 @@ public class ToExpressionTest extends 
BaseConnectionlessQueryTest {
            }
 
            public ExpressionChecker checkExpressionEquality() {        
-               Implementor implementor = new PhoenixRelImplementorImpl();
+               Implementor implementor = new PhoenixRelImplementorImpl(new 
RuntimeContextImpl());
                implementor.setTableRef(new TableRef(table));
                Expression e = CalciteUtils.toExpression(this.calciteExpr, 
implementor);
                assertEquals(this.phoenixExpr,e);

Reply via email to