Repository: phoenix
Updated Branches:
  refs/heads/calcite 1ee1f2011 -> 53dab808a


PHOENIX-1786 Implement sort-merge-join with Calcite integration


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/53dab808
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/53dab808
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/53dab808

Branch: refs/heads/calcite
Commit: 53dab808a0593b86e50a0f7c42c9a39d25b95e06
Parents: 1ee1f20
Author: maryannxue <wei....@intel.com>
Authored: Thu Apr 30 23:46:02 2015 -0400
Committer: maryannxue <wei....@intel.com>
Committed: Thu Apr 30 23:46:02 2015 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteTest.java |  27 +++-
 .../apache/phoenix/calcite/CalciteUtils.java    |  70 +++++++-
 .../calcite/rel/PhoenixAbstractJoin.java        |  23 +++
 .../phoenix/calcite/rel/PhoenixClientJoin.java  |  67 +++++++-
 .../phoenix/calcite/rel/PhoenixServerJoin.java  |  24 +--
 .../calcite/rules/PhoenixConverterRules.java    | 158 +++++++++++++++++--
 6 files changed, 317 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/53dab808/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java 
b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index bff6706..acb02f3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -306,15 +306,26 @@ public class CalciteTest extends BaseClientManagedTimeIT {
     }
     
     @Test public void testClientJoin() throws Exception {        
-        start().sql("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", 
supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item FULL OUTER JOIN " + 
JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = 
supp.\"supplier_id\"")
+        start().sql("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", 
supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item FULL OUTER JOIN " + 
JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = 
supp.\"supplier_id\" order by \"item_id\", supp.name")
                 .explainIs("PhoenixToEnumerableConverter\n" +
-                           "  PhoenixClientProject(item_id=[$0], NAME=[$1], 
supplier_id=[$3], NAME0=[$4])\n" +
-                           "    PhoenixClientJoin(condition=[=($2, $3)], 
joinType=[full])\n" +
-                           "      PhoenixServerSort(sort0=[$2], dir0=[ASC])\n" 
+
-                           "        PhoenixServerProject(item_id=[$0], 
NAME=[$1], supplier_id=[$5])\n" +
-                           "          PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
-                           "      PhoenixServerProject(supplier_id=[$0], 
NAME=[$1])\n" +
-                           "        PhoenixTableScan(table=[[phoenix, Join, 
SupplierTable]])\n")
+                           "  PhoenixClientSort(sort0=[$0], sort1=[$3], 
dir0=[ASC], dir1=[ASC])\n" +
+                           "    PhoenixClientProject(item_id=[$0], NAME=[$1], 
supplier_id=[$3], NAME0=[$4])\n" +
+                           "      PhoenixClientJoin(condition=[=($2, $3)], 
joinType=[full])\n" +
+                           "        PhoenixServerSort(sort0=[$2], 
dir0=[ASC])\n" +
+                           "          PhoenixServerProject(item_id=[$0], 
NAME=[$1], supplier_id=[$5])\n" +
+                           "            PhoenixTableScan(table=[[phoenix, 
Join, ItemTable]])\n" +
+                           "        PhoenixServerProject(supplier_id=[$0], 
NAME=[$1])\n" +
+                           "          PhoenixTableScan(table=[[phoenix, Join, 
SupplierTable]])\n")
+                .resultIs(new Object[][] {
+                        {null, null, "0000000003", "S3"},
+                        {null, null, "0000000004", "S4"},
+                        {"0000000001", "T1", "0000000001", "S1"},
+                        {"0000000002", "T2", "0000000001", "S1"},
+                        {"0000000003", "T3", "0000000002", "S2"},
+                        {"0000000004", "T4", "0000000002", "S2"},
+                        {"0000000005", "T5", "0000000005", "S5"},
+                        {"0000000006", "T6", "0000000006", "S6"},
+                        {"invalid001", "INVALID-1", null, null}})
                 .close();        
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53dab808/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
index 1b2e4b4..93be79d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
@@ -15,15 +15,18 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.calcite.rel.PhoenixRel.Implementor;
+import org.apache.phoenix.expression.AndExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.OrExpression;
 import org.apache.phoenix.expression.function.AggregateFunction;
 import org.apache.phoenix.expression.function.CountAggregateFunction;
 import org.apache.phoenix.expression.function.FunctionExpression;
 import org.apache.phoenix.expression.function.MaxAggregateFunction;
-import org.apache.phoenix.expression.function.SumAggregateFunction;
+import org.apache.phoenix.expression.function.MinAggregateFunction;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -51,6 +54,26 @@ public class CalciteUtils {
                return eFactory;
        }
        static {
+        EXPRESSION_MAP.put(SqlKind.AND, new ExpressionFactory() {
+
+            @Override
+            public Expression newExpression(RexNode node, Implementor 
implementor) {
+                try {
+                    return AndExpression.create(convertChildren((RexCall) 
node, implementor));
+                } catch (SQLException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            
+        });
+        EXPRESSION_MAP.put(SqlKind.OR, new ExpressionFactory() {
+
+            @Override
+            public Expression newExpression(RexNode node, Implementor 
implementor) {
+                return new OrExpression(convertChildren((RexCall) node, 
implementor));
+            }
+            
+        });
                EXPRESSION_MAP.put(SqlKind.EQUALS, new ExpressionFactory() {
 
                        @Override
@@ -182,18 +205,26 @@ public class CalciteUtils {
                 return new CountAggregateFunction(args);
             }
         });
-        FUNCTION_MAP.put("$SUM0", new FunctionFactory() {
+        // TODO Buggy. Disable for now.
+        //FUNCTION_MAP.put("$SUM0", new FunctionFactory() {
+        //    @Override
+        //    public FunctionExpression newFunction(SqlFunction sqlFunc,
+        //            List<Expression> args) {
+        //        return new SumAggregateFunction(args);
+        //    }
+        //});
+        FUNCTION_MAP.put("MAX", new FunctionFactory() {
             @Override
             public FunctionExpression newFunction(SqlFunction sqlFunc,
                     List<Expression> args) {
-                return new SumAggregateFunction(args);
+                return new MaxAggregateFunction(args, null);
             }
         });
-        FUNCTION_MAP.put("MAX", new FunctionFactory() {
+        FUNCTION_MAP.put("MIN", new FunctionFactory() {
             @Override
             public FunctionExpression newFunction(SqlFunction sqlFunc,
                     List<Expression> args) {
-                return new MaxAggregateFunction(args, null);
+                return new MinAggregateFunction(args, null);
             }
         });
     }
@@ -207,6 +238,33 @@ public class CalciteUtils {
         return children;
     }
 
+    public static boolean isExpressionSupported(RexNode node) {
+        try {
+            getFactory(node);
+        } catch (UnsupportedOperationException e) {
+            return false;
+        }
+        if (node instanceof RexCall) {
+            for (RexNode op : ((RexCall) node).getOperands()) {
+                if (!isExpressionSupported(op)) {
+                    return false;
+                }
+            }
+        }
+        
+        return true;
+    }
+    
+    public static boolean isAggregateFunctionSupported(SqlAggFunction aggFunc) 
{
+        try {
+            getFactory(aggFunc);
+        } catch (UnsupportedOperationException e) {
+            return false;
+        }
+
+        return true;
+    }
+
        public static Expression toExpression(RexNode node, Implementor 
implementor) {
                ExpressionFactory eFactory = getFactory(node);
                Expression expression = eFactory.newExpression(node, 
implementor);
@@ -226,7 +284,7 @@ public class CalciteUtils {
        public static Object evaluateStatelessExpression(RexNode node) {
            try {
                Expression expression = toExpression(node, null);
-               if (expression.isStateless()) {
+               if (expression.isStateless() && expression.getDeterminism() == 
Determinism.ALWAYS) {
                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                    expression.evaluate(null, ptr);
                    return expression.getDataType().toObject(ptr);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53dab808/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
index 39426f4..01f3536 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
@@ -1,5 +1,7 @@
 package org.apache.phoenix.calcite.rel;
 
+import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.calcite.plan.RelOptCluster;
@@ -10,6 +12,10 @@ import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 
 /**
@@ -36,6 +42,23 @@ abstract public class PhoenixAbstractJoin extends Join 
implements PhoenixRel {
             .itemIf("isSingleValueRhs", isSingleValueRhs, isSingleValueRhs);
     }
     
+    protected QueryPlan implementInput(Implementor implementor, int index, 
List<Expression> conditionExprs) {
+        assert index <= 1;
+        
+        PhoenixRel input = index == 0 ? (PhoenixRel) left : (PhoenixRel) right;
+        ImmutableIntList keys = index == 0 ? joinInfo.leftKeys : 
joinInfo.rightKeys;
+        QueryPlan plan = implementor.visitInput(0, input);
+        for (Iterator<Integer> iter = keys.iterator(); iter.hasNext();) {
+            Integer i = iter.next();
+            conditionExprs.add(implementor.newColumnExpression(i));
+        }
+        if (conditionExprs.isEmpty()) {
+            conditionExprs.add(LiteralExpression.newConstant(0));
+        }
+
+        return plan;
+    }
+    
     protected static JoinType convertJoinType(JoinRelType type) {
         JoinType ret = null;
         switch (type) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53dab808/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
index decc723..2acd11f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
@@ -1,5 +1,6 @@
 package org.apache.phoenix.calcite.rel;
 
+import java.sql.SQLException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -19,8 +20,24 @@ import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.calcite.CalciteUtils;
 import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.JoinCompiler;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
 
 import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
@@ -102,7 +119,55 @@ public class PhoenixClientJoin extends PhoenixAbstractJoin 
{
 
     @Override
     public QueryPlan implement(Implementor implementor) {
-        throw new UnsupportedOperationException();
+        assert getLeft().getConvention() == PhoenixRel.CONVENTION;
+        assert getRight().getConvention() == PhoenixRel.CONVENTION;
+        
+        List<Expression> leftExprs = Lists.<Expression> newArrayList();
+        List<Expression> rightExprs = Lists.<Expression> newArrayList();
+
+        implementor.pushContext(new 
ImplementorContext(implementor.getCurrentContext().isRetainPKColumns() && 
getJoinType() != JoinRelType.FULL, true));
+        QueryPlan leftPlan = implementInput(implementor, 0, leftExprs);
+        PTable leftTable = implementor.getTableRef().getTable();
+        implementor.popContext();
+
+        implementor.pushContext(new ImplementorContext(false, true));
+        QueryPlan rightPlan = implementInput(implementor, 1, rightExprs);
+        PTable rightTable = implementor.getTableRef().getTable();
+        implementor.popContext();
+        
+        JoinType type = convertJoinType(getJoinType());
+        PTable joinedTable;
+        try {
+            joinedTable = JoinCompiler.joinProjectedTables(leftTable, 
rightTable, type);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+        TableRef tableRef = new TableRef(joinedTable);
+        implementor.setTableRef(tableRef);
+        ColumnResolver resolver;
+        try {
+            resolver = FromCompiler.getResolver(tableRef);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+        PhoenixStatement stmt = leftPlan.getContext().getStatement();
+        StatementContext context = new StatementContext(stmt, resolver, new 
Scan(), new SequenceManager(stmt));
+
+        QueryPlan plan = new SortMergeJoinPlan(context, 
leftPlan.getStatement(), 
+                tableRef, type, leftPlan, rightPlan, leftExprs, rightExprs, 
+                joinedTable, leftTable, rightTable, 
+                leftTable.getColumns().size() - 
leftTable.getPKColumns().size(), 
+                isSingleValueRhs);
+        
+        RexNode postFilter = 
joinInfo.getRemaining(getCluster().getRexBuilder());
+        Expression postFilterExpr = postFilter.isAlwaysTrue() ? null : 
CalciteUtils.toExpression(postFilter, implementor);
+        if (postFilter != null) {
+            plan = new ClientScanPlan(context, plan.getStatement(), tableRef, 
+                    RowProjector.EMPTY_PROJECTOR, null, postFilterExpr, 
+                    OrderBy.EMPTY_ORDER_BY, plan);
+        }
+        
+        return plan;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53dab808/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
index 57b9ad0..f73527a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
@@ -1,7 +1,6 @@
 package org.apache.phoenix.calcite.rel;
 
 import java.sql.SQLException;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -22,7 +21,6 @@ import org.apache.phoenix.compile.JoinCompiler;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
 import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.parse.SelectStatement;
@@ -96,32 +94,18 @@ public class PhoenixServerJoin extends PhoenixAbstractJoin {
     public QueryPlan implement(Implementor implementor) {
         assert getLeft().getConvention() == PhoenixRel.CONVENTION;
         assert getRight().getConvention() == PhoenixRel.CONVENTION;
-        PhoenixRel left = (PhoenixRel) getLeft();
-        PhoenixRel right = (PhoenixRel) getRight();
         
         List<Expression> leftExprs = Lists.<Expression> newArrayList();
         List<Expression> rightExprs = Lists.<Expression> newArrayList();
+
         implementor.pushContext(new 
ImplementorContext(implementor.getCurrentContext().isRetainPKColumns(), true));
-        QueryPlan leftPlan = implementor.visitInput(0, left);
+        QueryPlan leftPlan = implementInput(implementor, 0, leftExprs);
         PTable leftTable = implementor.getTableRef().getTable();
-        for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); 
iter.hasNext();) {
-            Integer index = iter.next();
-            leftExprs.add(implementor.newColumnExpression(index));
-        }
-        if (leftExprs.isEmpty()) {
-            leftExprs.add(LiteralExpression.newConstant(0));
-        }
         implementor.popContext();
+
         implementor.pushContext(new ImplementorContext(false, true));
-        QueryPlan rightPlan = implementor.visitInput(1, right);
+        QueryPlan rightPlan = implementInput(implementor, 1, rightExprs);
         PTable rightTable = implementor.getTableRef().getTable();
-        for (Iterator<Integer> iter = joinInfo.rightKeys.iterator(); 
iter.hasNext();) {
-            Integer index = iter.next();
-            rightExprs.add(implementor.newColumnExpression(index));
-        }
-        if (rightExprs.isEmpty()) {
-            rightExprs.add(LiteralExpression.newConstant(0));
-        }
         implementor.popContext();
         
         JoinType type = convertJoinType(getJoinType());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53dab808/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
index 016ad0b..7748709 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
@@ -1,16 +1,32 @@
 package org.apache.phoenix.calcite.rules;
 
+import java.util.Arrays;
+import java.util.logging.Logger;
+
 import org.apache.calcite.adapter.enumerable.EnumerableConvention;
-import org.apache.calcite.plan.*;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Aggregate.Group;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.Union;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.trace.CalciteTrace;
+import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate;
 import org.apache.phoenix.calcite.rel.PhoenixClientAggregate;
 import org.apache.phoenix.calcite.rel.PhoenixClientProject;
 import org.apache.phoenix.calcite.rel.PhoenixClientSort;
@@ -22,8 +38,7 @@ import 
org.apache.phoenix.calcite.rel.PhoenixToEnumerableConverter;
 import org.apache.phoenix.calcite.rel.PhoenixUnion;
 
 import com.google.common.base.Predicate;
-
-import java.util.logging.Logger;
+import com.google.common.base.Predicates;
 
 /**
  * Rules and relational operators for
@@ -75,6 +90,12 @@ public class PhoenixConverterRules {
      * {@link PhoenixClientSort}.
      */
     private static class PhoenixSortRule extends PhoenixConverterRule {
+        private static Predicate<LogicalSort> IS_CONVERTIBLE = new 
Predicate<LogicalSort>() {
+            @Override
+            public boolean apply(LogicalSort input) {
+                return isConvertible(input);
+            }            
+        };
         private static Predicate<LogicalSort> SORT_ONLY = new 
Predicate<LogicalSort>() {
             @Override
             public boolean apply(LogicalSort input) {
@@ -87,8 +108,9 @@ public class PhoenixConverterRules {
         public static final PhoenixSortRule INSTANCE = new PhoenixSortRule();
 
         private PhoenixSortRule() {
-            super(LogicalSort.class, SORT_ONLY, Convention.NONE, 
PhoenixRel.CONVENTION,
-                "PhoenixSortRule");
+            super(LogicalSort.class, 
+                    Predicates.and(Arrays.asList(SORT_ONLY, IS_CONVERTIBLE)), 
+                    Convention.NONE, PhoenixRel.CONVENTION, "PhoenixSortRule");
         }
 
         public RelNode convert(RelNode rel) {
@@ -106,6 +128,12 @@ public class PhoenixConverterRules {
      * {@link PhoenixLimit}.
      */
     private static class PhoenixLimitRule extends PhoenixConverterRule {
+        private static Predicate<LogicalSort> IS_CONVERTIBLE = new 
Predicate<LogicalSort>() {
+            @Override
+            public boolean apply(LogicalSort input) {
+                return isConvertible(input);
+            }            
+        };
         private static Predicate<LogicalSort> OFFSET_OR_FETCH = new 
Predicate<LogicalSort>() {
             @Override
             public boolean apply(LogicalSort input) {
@@ -117,8 +145,9 @@ public class PhoenixConverterRules {
         public static final PhoenixLimitRule INSTANCE = new PhoenixLimitRule();
 
         private PhoenixLimitRule() {
-            super(LogicalSort.class, OFFSET_OR_FETCH, Convention.NONE, 
PhoenixRel.CONVENTION,
-                "PhoenixLimitRule");
+            super(LogicalSort.class, 
+                    Predicates.and(Arrays.asList(OFFSET_OR_FETCH, 
IS_CONVERTIBLE)), 
+                    Convention.NONE, PhoenixRel.CONVENTION, 
"PhoenixLimitRule");
         }
 
         public RelNode convert(RelNode rel) {
@@ -141,11 +170,18 @@ public class PhoenixConverterRules {
      * {@link PhoenixFilter}.
      */
     private static class PhoenixFilterRule extends PhoenixConverterRule {
+        private static Predicate<LogicalFilter> IS_CONVERTIBLE = new 
Predicate<LogicalFilter>() {
+            @Override
+            public boolean apply(LogicalFilter input) {
+                return isConvertible(input);
+            }            
+        };
+        
         private static final PhoenixFilterRule INSTANCE = new 
PhoenixFilterRule();
 
         private PhoenixFilterRule() {
-            super(LogicalFilter.class, Convention.NONE, PhoenixRel.CONVENTION,
-                "PhoenixFilterRule");
+            super(LogicalFilter.class, IS_CONVERTIBLE, Convention.NONE, 
+                    PhoenixRel.CONVENTION, "PhoenixFilterRule");
         }
 
         public RelNode convert(RelNode rel) {
@@ -163,11 +199,18 @@ public class PhoenixConverterRules {
      * to a {@link PhoenixClientProject}.
      */
     private static class PhoenixProjectRule extends PhoenixConverterRule {
+        private static Predicate<LogicalProject> IS_CONVERTIBLE = new 
Predicate<LogicalProject>() {
+            @Override
+            public boolean apply(LogicalProject input) {
+                return isConvertible(input);
+            }            
+        };
+        
         private static final PhoenixProjectRule INSTANCE = new 
PhoenixProjectRule();
 
         private PhoenixProjectRule() {
-            super(LogicalProject.class, Convention.NONE, PhoenixRel.CONVENTION,
-                "PhoenixProjectRule");
+            super(LogicalProject.class, IS_CONVERTIBLE, Convention.NONE, 
+                    PhoenixRel.CONVENTION, "PhoenixProjectRule");
         }
 
         public RelNode convert(RelNode rel) {
@@ -186,11 +229,18 @@ public class PhoenixConverterRules {
      * to an {@link PhoenixClientAggregate}.
      */
     private static class PhoenixAggregateRule extends PhoenixConverterRule {
+        private static Predicate<LogicalAggregate> IS_CONVERTIBLE = new 
Predicate<LogicalAggregate>() {
+            @Override
+            public boolean apply(LogicalAggregate input) {
+                return isConvertible(input);
+            }            
+        };
+        
         public static final RelOptRule INSTANCE = new PhoenixAggregateRule();
 
         private PhoenixAggregateRule() {
-            super(LogicalAggregate.class, Convention.NONE, 
PhoenixRel.CONVENTION,
-                "PhoenixAggregateRule");
+            super(LogicalAggregate.class, IS_CONVERTIBLE, Convention.NONE, 
+                    PhoenixRel.CONVENTION, "PhoenixAggregateRule");
         }
 
         public RelNode convert(RelNode rel) {
@@ -211,11 +261,18 @@ public class PhoenixConverterRules {
      * {@link PhoenixUnion}.
      */
     private static class PhoenixUnionRule extends PhoenixConverterRule {
+        private static Predicate<LogicalUnion> IS_CONVERTIBLE = new 
Predicate<LogicalUnion>() {
+            @Override
+            public boolean apply(LogicalUnion input) {
+                return isConvertible(input);
+            }            
+        };
+        
         public static final PhoenixUnionRule INSTANCE = new PhoenixUnionRule();
 
         private PhoenixUnionRule() {
-            super(LogicalUnion.class, Convention.NONE, PhoenixRel.CONVENTION,
-                "PhoenixUnionRule");
+            super(LogicalUnion.class, IS_CONVERTIBLE, Convention.NONE, 
+                    PhoenixRel.CONVENTION, "PhoenixUnionRule");
         }
 
         public RelNode convert(RelNode rel) {
@@ -231,11 +288,17 @@ public class PhoenixConverterRules {
      * {@link PhoenixJoin}.
      */
     private static class PhoenixJoinRule extends PhoenixConverterRule {
+        private static Predicate<LogicalJoin> IS_CONVERTIBLE = new 
Predicate<LogicalJoin>() {
+            @Override
+            public boolean apply(LogicalJoin input) {
+                return isConvertible(input);
+            }            
+        };
         public static final PhoenixJoinRule INSTANCE = new PhoenixJoinRule();
 
         private PhoenixJoinRule() {
-            super(LogicalJoin.class, Convention.NONE, PhoenixRel.CONVENTION,
-                "PhoenixJoinRule");
+            super(LogicalJoin.class, IS_CONVERTIBLE, Convention.NONE, 
+                    PhoenixRel.CONVENTION, "PhoenixJoinRule");
         }
 
         public RelNode convert(RelNode rel) {
@@ -414,6 +477,67 @@ public class PhoenixConverterRules {
             return PhoenixToEnumerableConverter.create(rel);
         }
     }
+    
+    
+    //-------------------------------------------------------------------
+    // Helper functions that check if a RelNode would be implementable by
+    // its corresponding PhoenixRel.
+    
+    public static boolean isConvertible(Aggregate input) {
+        if (PhoenixAbstractAggregate.isSingleValueCheckAggregate(input))
+            return true;
+        
+        if (input.getGroupSets().size() > 1)
+            return false;
+        
+        if (input.containsDistinctCall())
+            return false;
+        
+        if (input.getGroupType() != Group.SIMPLE)
+            return false;
+        
+        for (AggregateCall aggCall : input.getAggCallList()) {
+            if 
(!CalciteUtils.isAggregateFunctionSupported(aggCall.getAggregation())) {
+                return false;
+            }
+        }        
+        
+        return true;
+    }
+    
+    public static boolean isConvertible(Filter input) {
+        return CalciteUtils.isExpressionSupported(input.getCondition());
+    }
+    
+    public static boolean isConvertible(Join input) {
+        return CalciteUtils.isExpressionSupported(input.getCondition());
+    }
+    
+    public static boolean isConvertible(Project input) {
+        for (RexNode project : input.getProjects()) {
+            if (!CalciteUtils.isExpressionSupported(project)) {
+                return false;
+            }
+        }
+        
+        return true;
+    }
+    
+    public static boolean isConvertible(Sort sort) {
+        if (sort.offset != null)
+            return false;
+        
+        if (sort.fetch != null 
+                && CalciteUtils.evaluateStatelessExpression(sort.fetch) == 
null)
+            return false;
+        
+        return true;
+    }
+    
+    public static boolean isConvertible(Union input) {
+        // TODO disable for now since PhoenixUnion is not implemented yet.
+        return false;
+    }
 }
 
 // End PhoenixRules.java

Reply via email to