Repository: phoenix Updated Branches: refs/heads/calcite 1ee1f2011 -> 53dab808a
PHOENIX-1786 Implement sort-merge-join with Calcite integration Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/53dab808 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/53dab808 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/53dab808 Branch: refs/heads/calcite Commit: 53dab808a0593b86e50a0f7c42c9a39d25b95e06 Parents: 1ee1f20 Author: maryannxue <wei....@intel.com> Authored: Thu Apr 30 23:46:02 2015 -0400 Committer: maryannxue <wei....@intel.com> Committed: Thu Apr 30 23:46:02 2015 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteTest.java | 27 +++- .../apache/phoenix/calcite/CalciteUtils.java | 70 +++++++- .../calcite/rel/PhoenixAbstractJoin.java | 23 +++ .../phoenix/calcite/rel/PhoenixClientJoin.java | 67 +++++++- .../phoenix/calcite/rel/PhoenixServerJoin.java | 24 +-- .../calcite/rules/PhoenixConverterRules.java | 158 +++++++++++++++++-- 6 files changed, 317 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/53dab808/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java index bff6706..acb02f3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java @@ -306,15 +306,26 @@ public class CalciteTest extends BaseClientManagedTimeIT { } @Test public void testClientJoin() throws Exception { - start().sql("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item FULL OUTER JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\"") + start().sql("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item FULL OUTER JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" order by \"item_id\", supp.name") .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixClientProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" + - " PhoenixClientJoin(condition=[=($2, $3)], joinType=[full])\n" + - " PhoenixServerSort(sort0=[$2], dir0=[ASC])\n" + - " PhoenixServerProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" + - " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + - " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" + - " PhoenixTableScan(table=[[phoenix, Join, SupplierTable]])\n") + " PhoenixClientSort(sort0=[$0], sort1=[$3], dir0=[ASC], dir1=[ASC])\n" + + " PhoenixClientProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" + + " PhoenixClientJoin(condition=[=($2, $3)], joinType=[full])\n" + + " PhoenixServerSort(sort0=[$2], dir0=[ASC])\n" + + " PhoenixServerProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + + " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" + + " PhoenixTableScan(table=[[phoenix, Join, SupplierTable]])\n") + .resultIs(new Object[][] { + {null, null, "0000000003", "S3"}, + {null, null, "0000000004", "S4"}, + {"0000000001", "T1", "0000000001", "S1"}, + {"0000000002", "T2", "0000000001", "S1"}, + {"0000000003", "T3", "0000000002", "S2"}, + {"0000000004", "T4", "0000000002", "S2"}, + {"0000000005", "T5", "0000000005", "S5"}, + {"0000000006", "T6", "0000000006", "S6"}, + {"invalid001", "INVALID-1", null, null}}) .close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/53dab808/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java index 1b2e4b4..93be79d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java @@ -15,15 +15,18 @@ import org.apache.calcite.sql.SqlKind; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.calcite.rel.PhoenixRel.Implementor; +import org.apache.phoenix.expression.AndExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.Determinism; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.expression.OrExpression; import org.apache.phoenix.expression.function.AggregateFunction; import org.apache.phoenix.expression.function.CountAggregateFunction; import org.apache.phoenix.expression.function.FunctionExpression; import org.apache.phoenix.expression.function.MaxAggregateFunction; -import org.apache.phoenix.expression.function.SumAggregateFunction; +import org.apache.phoenix.expression.function.MinAggregateFunction; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -51,6 +54,26 @@ public class CalciteUtils { return eFactory; } static { + EXPRESSION_MAP.put(SqlKind.AND, new ExpressionFactory() { + + @Override + public Expression newExpression(RexNode node, Implementor implementor) { + try { + return AndExpression.create(convertChildren((RexCall) node, implementor)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + }); + EXPRESSION_MAP.put(SqlKind.OR, new ExpressionFactory() { + + @Override + public Expression newExpression(RexNode node, Implementor implementor) { + return new OrExpression(convertChildren((RexCall) node, implementor)); + } + + }); EXPRESSION_MAP.put(SqlKind.EQUALS, new ExpressionFactory() { @Override @@ -182,18 +205,26 @@ public class CalciteUtils { return new CountAggregateFunction(args); } }); - FUNCTION_MAP.put("$SUM0", new FunctionFactory() { + // TODO Buggy. Disable for now. + //FUNCTION_MAP.put("$SUM0", new FunctionFactory() { + // @Override + // public FunctionExpression newFunction(SqlFunction sqlFunc, + // List<Expression> args) { + // return new SumAggregateFunction(args); + // } + //}); + FUNCTION_MAP.put("MAX", new FunctionFactory() { @Override public FunctionExpression newFunction(SqlFunction sqlFunc, List<Expression> args) { - return new SumAggregateFunction(args); + return new MaxAggregateFunction(args, null); } }); - FUNCTION_MAP.put("MAX", new FunctionFactory() { + FUNCTION_MAP.put("MIN", new FunctionFactory() { @Override public FunctionExpression newFunction(SqlFunction sqlFunc, List<Expression> args) { - return new MaxAggregateFunction(args, null); + return new MinAggregateFunction(args, null); } }); } @@ -207,6 +238,33 @@ public class CalciteUtils { return children; } + public static boolean isExpressionSupported(RexNode node) { + try { + getFactory(node); + } catch (UnsupportedOperationException e) { + return false; + } + if (node instanceof RexCall) { + for (RexNode op : ((RexCall) node).getOperands()) { + if (!isExpressionSupported(op)) { + return false; + } + } + } + + return true; + } + + public static boolean isAggregateFunctionSupported(SqlAggFunction aggFunc) { + try { + getFactory(aggFunc); + } catch (UnsupportedOperationException e) { + return false; + } + + return true; + } + public static Expression toExpression(RexNode node, Implementor implementor) { ExpressionFactory eFactory = getFactory(node); Expression expression = eFactory.newExpression(node, implementor); @@ -226,7 +284,7 @@ public class CalciteUtils { public static Object evaluateStatelessExpression(RexNode node) { try { Expression expression = toExpression(node, null); - if (expression.isStateless()) { + if (expression.isStateless() && expression.getDeterminism() == Determinism.ALWAYS) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); expression.evaluate(null, ptr); return expression.getDataType().toObject(ptr); http://git-wip-us.apache.org/repos/asf/phoenix/blob/53dab808/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java index 39426f4..01f3536 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java @@ -1,5 +1,7 @@ package org.apache.phoenix.calcite.rel; +import java.util.Iterator; +import java.util.List; import java.util.Set; import org.apache.calcite.plan.RelOptCluster; @@ -10,6 +12,10 @@ import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinInfo; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.parse.JoinTableNode.JoinType; /** @@ -36,6 +42,23 @@ abstract public class PhoenixAbstractJoin extends Join implements PhoenixRel { .itemIf("isSingleValueRhs", isSingleValueRhs, isSingleValueRhs); } + protected QueryPlan implementInput(Implementor implementor, int index, List<Expression> conditionExprs) { + assert index <= 1; + + PhoenixRel input = index == 0 ? (PhoenixRel) left : (PhoenixRel) right; + ImmutableIntList keys = index == 0 ? joinInfo.leftKeys : joinInfo.rightKeys; + QueryPlan plan = implementor.visitInput(0, input); + for (Iterator<Integer> iter = keys.iterator(); iter.hasNext();) { + Integer i = iter.next(); + conditionExprs.add(implementor.newColumnExpression(i)); + } + if (conditionExprs.isEmpty()) { + conditionExprs.add(LiteralExpression.newConstant(0)); + } + + return plan; + } + protected static JoinType convertJoinType(JoinRelType type) { JoinType ret = null; switch (type) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/53dab808/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java index decc723..2acd11f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java @@ -1,5 +1,6 @@ package org.apache.phoenix.calcite.rel; +import java.sql.SQLException; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -19,8 +20,24 @@ import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.ImmutableIntList; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.calcite.CalciteUtils; import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.JoinCompiler; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.execute.ClientScanPlan; +import org.apache.phoenix.execute.SortMergeJoinPlan; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.JoinTableNode.JoinType; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; import com.google.common.base.Supplier; import com.google.common.collect.Lists; @@ -102,7 +119,55 @@ public class PhoenixClientJoin extends PhoenixAbstractJoin { @Override public QueryPlan implement(Implementor implementor) { - throw new UnsupportedOperationException(); + assert getLeft().getConvention() == PhoenixRel.CONVENTION; + assert getRight().getConvention() == PhoenixRel.CONVENTION; + + List<Expression> leftExprs = Lists.<Expression> newArrayList(); + List<Expression> rightExprs = Lists.<Expression> newArrayList(); + + implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns() && getJoinType() != JoinRelType.FULL, true)); + QueryPlan leftPlan = implementInput(implementor, 0, leftExprs); + PTable leftTable = implementor.getTableRef().getTable(); + implementor.popContext(); + + implementor.pushContext(new ImplementorContext(false, true)); + QueryPlan rightPlan = implementInput(implementor, 1, rightExprs); + PTable rightTable = implementor.getTableRef().getTable(); + implementor.popContext(); + + JoinType type = convertJoinType(getJoinType()); + PTable joinedTable; + try { + joinedTable = JoinCompiler.joinProjectedTables(leftTable, rightTable, type); + } catch (SQLException e) { + throw new RuntimeException(e); + } + TableRef tableRef = new TableRef(joinedTable); + implementor.setTableRef(tableRef); + ColumnResolver resolver; + try { + resolver = FromCompiler.getResolver(tableRef); + } catch (SQLException e) { + throw new RuntimeException(e); + } + PhoenixStatement stmt = leftPlan.getContext().getStatement(); + StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + + QueryPlan plan = new SortMergeJoinPlan(context, leftPlan.getStatement(), + tableRef, type, leftPlan, rightPlan, leftExprs, rightExprs, + joinedTable, leftTable, rightTable, + leftTable.getColumns().size() - leftTable.getPKColumns().size(), + isSingleValueRhs); + + RexNode postFilter = joinInfo.getRemaining(getCluster().getRexBuilder()); + Expression postFilterExpr = postFilter.isAlwaysTrue() ? null : CalciteUtils.toExpression(postFilter, implementor); + if (postFilter != null) { + plan = new ClientScanPlan(context, plan.getStatement(), tableRef, + RowProjector.EMPTY_PROJECTOR, null, postFilterExpr, + OrderBy.EMPTY_ORDER_BY, plan); + } + + return plan; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/53dab808/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java index 57b9ad0..f73527a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java @@ -1,7 +1,6 @@ package org.apache.phoenix.calcite.rel; import java.sql.SQLException; -import java.util.Iterator; import java.util.List; import java.util.Set; @@ -22,7 +21,6 @@ import org.apache.phoenix.compile.JoinCompiler; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.HashJoinPlan; import org.apache.phoenix.expression.Expression; -import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.parse.SelectStatement; @@ -96,32 +94,18 @@ public class PhoenixServerJoin extends PhoenixAbstractJoin { public QueryPlan implement(Implementor implementor) { assert getLeft().getConvention() == PhoenixRel.CONVENTION; assert getRight().getConvention() == PhoenixRel.CONVENTION; - PhoenixRel left = (PhoenixRel) getLeft(); - PhoenixRel right = (PhoenixRel) getRight(); List<Expression> leftExprs = Lists.<Expression> newArrayList(); List<Expression> rightExprs = Lists.<Expression> newArrayList(); + implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns(), true)); - QueryPlan leftPlan = implementor.visitInput(0, left); + QueryPlan leftPlan = implementInput(implementor, 0, leftExprs); PTable leftTable = implementor.getTableRef().getTable(); - for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); iter.hasNext();) { - Integer index = iter.next(); - leftExprs.add(implementor.newColumnExpression(index)); - } - if (leftExprs.isEmpty()) { - leftExprs.add(LiteralExpression.newConstant(0)); - } implementor.popContext(); + implementor.pushContext(new ImplementorContext(false, true)); - QueryPlan rightPlan = implementor.visitInput(1, right); + QueryPlan rightPlan = implementInput(implementor, 1, rightExprs); PTable rightTable = implementor.getTableRef().getTable(); - for (Iterator<Integer> iter = joinInfo.rightKeys.iterator(); iter.hasNext();) { - Integer index = iter.next(); - rightExprs.add(implementor.newColumnExpression(index)); - } - if (rightExprs.isEmpty()) { - rightExprs.add(LiteralExpression.newConstant(0)); - } implementor.popContext(); JoinType type = convertJoinType(getJoinType()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/53dab808/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java index 016ad0b..7748709 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java @@ -1,16 +1,32 @@ package org.apache.phoenix.calcite.rules; +import java.util.Arrays; +import java.util.logging.Logger; + import org.apache.calcite.adapter.enumerable.EnumerableConvention; -import org.apache.calcite.plan.*; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelTrait; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Aggregate.Group; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.core.Union; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalSort; import org.apache.calcite.rel.logical.LogicalUnion; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.trace.CalciteTrace; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate; import org.apache.phoenix.calcite.rel.PhoenixClientAggregate; import org.apache.phoenix.calcite.rel.PhoenixClientProject; import org.apache.phoenix.calcite.rel.PhoenixClientSort; @@ -22,8 +38,7 @@ import org.apache.phoenix.calcite.rel.PhoenixToEnumerableConverter; import org.apache.phoenix.calcite.rel.PhoenixUnion; import com.google.common.base.Predicate; - -import java.util.logging.Logger; +import com.google.common.base.Predicates; /** * Rules and relational operators for @@ -75,6 +90,12 @@ public class PhoenixConverterRules { * {@link PhoenixClientSort}. */ private static class PhoenixSortRule extends PhoenixConverterRule { + private static Predicate<LogicalSort> IS_CONVERTIBLE = new Predicate<LogicalSort>() { + @Override + public boolean apply(LogicalSort input) { + return isConvertible(input); + } + }; private static Predicate<LogicalSort> SORT_ONLY = new Predicate<LogicalSort>() { @Override public boolean apply(LogicalSort input) { @@ -87,8 +108,9 @@ public class PhoenixConverterRules { public static final PhoenixSortRule INSTANCE = new PhoenixSortRule(); private PhoenixSortRule() { - super(LogicalSort.class, SORT_ONLY, Convention.NONE, PhoenixRel.CONVENTION, - "PhoenixSortRule"); + super(LogicalSort.class, + Predicates.and(Arrays.asList(SORT_ONLY, IS_CONVERTIBLE)), + Convention.NONE, PhoenixRel.CONVENTION, "PhoenixSortRule"); } public RelNode convert(RelNode rel) { @@ -106,6 +128,12 @@ public class PhoenixConverterRules { * {@link PhoenixLimit}. */ private static class PhoenixLimitRule extends PhoenixConverterRule { + private static Predicate<LogicalSort> IS_CONVERTIBLE = new Predicate<LogicalSort>() { + @Override + public boolean apply(LogicalSort input) { + return isConvertible(input); + } + }; private static Predicate<LogicalSort> OFFSET_OR_FETCH = new Predicate<LogicalSort>() { @Override public boolean apply(LogicalSort input) { @@ -117,8 +145,9 @@ public class PhoenixConverterRules { public static final PhoenixLimitRule INSTANCE = new PhoenixLimitRule(); private PhoenixLimitRule() { - super(LogicalSort.class, OFFSET_OR_FETCH, Convention.NONE, PhoenixRel.CONVENTION, - "PhoenixLimitRule"); + super(LogicalSort.class, + Predicates.and(Arrays.asList(OFFSET_OR_FETCH, IS_CONVERTIBLE)), + Convention.NONE, PhoenixRel.CONVENTION, "PhoenixLimitRule"); } public RelNode convert(RelNode rel) { @@ -141,11 +170,18 @@ public class PhoenixConverterRules { * {@link PhoenixFilter}. */ private static class PhoenixFilterRule extends PhoenixConverterRule { + private static Predicate<LogicalFilter> IS_CONVERTIBLE = new Predicate<LogicalFilter>() { + @Override + public boolean apply(LogicalFilter input) { + return isConvertible(input); + } + }; + private static final PhoenixFilterRule INSTANCE = new PhoenixFilterRule(); private PhoenixFilterRule() { - super(LogicalFilter.class, Convention.NONE, PhoenixRel.CONVENTION, - "PhoenixFilterRule"); + super(LogicalFilter.class, IS_CONVERTIBLE, Convention.NONE, + PhoenixRel.CONVENTION, "PhoenixFilterRule"); } public RelNode convert(RelNode rel) { @@ -163,11 +199,18 @@ public class PhoenixConverterRules { * to a {@link PhoenixClientProject}. */ private static class PhoenixProjectRule extends PhoenixConverterRule { + private static Predicate<LogicalProject> IS_CONVERTIBLE = new Predicate<LogicalProject>() { + @Override + public boolean apply(LogicalProject input) { + return isConvertible(input); + } + }; + private static final PhoenixProjectRule INSTANCE = new PhoenixProjectRule(); private PhoenixProjectRule() { - super(LogicalProject.class, Convention.NONE, PhoenixRel.CONVENTION, - "PhoenixProjectRule"); + super(LogicalProject.class, IS_CONVERTIBLE, Convention.NONE, + PhoenixRel.CONVENTION, "PhoenixProjectRule"); } public RelNode convert(RelNode rel) { @@ -186,11 +229,18 @@ public class PhoenixConverterRules { * to an {@link PhoenixClientAggregate}. */ private static class PhoenixAggregateRule extends PhoenixConverterRule { + private static Predicate<LogicalAggregate> IS_CONVERTIBLE = new Predicate<LogicalAggregate>() { + @Override + public boolean apply(LogicalAggregate input) { + return isConvertible(input); + } + }; + public static final RelOptRule INSTANCE = new PhoenixAggregateRule(); private PhoenixAggregateRule() { - super(LogicalAggregate.class, Convention.NONE, PhoenixRel.CONVENTION, - "PhoenixAggregateRule"); + super(LogicalAggregate.class, IS_CONVERTIBLE, Convention.NONE, + PhoenixRel.CONVENTION, "PhoenixAggregateRule"); } public RelNode convert(RelNode rel) { @@ -211,11 +261,18 @@ public class PhoenixConverterRules { * {@link PhoenixUnion}. */ private static class PhoenixUnionRule extends PhoenixConverterRule { + private static Predicate<LogicalUnion> IS_CONVERTIBLE = new Predicate<LogicalUnion>() { + @Override + public boolean apply(LogicalUnion input) { + return isConvertible(input); + } + }; + public static final PhoenixUnionRule INSTANCE = new PhoenixUnionRule(); private PhoenixUnionRule() { - super(LogicalUnion.class, Convention.NONE, PhoenixRel.CONVENTION, - "PhoenixUnionRule"); + super(LogicalUnion.class, IS_CONVERTIBLE, Convention.NONE, + PhoenixRel.CONVENTION, "PhoenixUnionRule"); } public RelNode convert(RelNode rel) { @@ -231,11 +288,17 @@ public class PhoenixConverterRules { * {@link PhoenixJoin}. */ private static class PhoenixJoinRule extends PhoenixConverterRule { + private static Predicate<LogicalJoin> IS_CONVERTIBLE = new Predicate<LogicalJoin>() { + @Override + public boolean apply(LogicalJoin input) { + return isConvertible(input); + } + }; public static final PhoenixJoinRule INSTANCE = new PhoenixJoinRule(); private PhoenixJoinRule() { - super(LogicalJoin.class, Convention.NONE, PhoenixRel.CONVENTION, - "PhoenixJoinRule"); + super(LogicalJoin.class, IS_CONVERTIBLE, Convention.NONE, + PhoenixRel.CONVENTION, "PhoenixJoinRule"); } public RelNode convert(RelNode rel) { @@ -414,6 +477,67 @@ public class PhoenixConverterRules { return PhoenixToEnumerableConverter.create(rel); } } + + + //------------------------------------------------------------------- + // Helper functions that check if a RelNode would be implementable by + // its corresponding PhoenixRel. + + public static boolean isConvertible(Aggregate input) { + if (PhoenixAbstractAggregate.isSingleValueCheckAggregate(input)) + return true; + + if (input.getGroupSets().size() > 1) + return false; + + if (input.containsDistinctCall()) + return false; + + if (input.getGroupType() != Group.SIMPLE) + return false; + + for (AggregateCall aggCall : input.getAggCallList()) { + if (!CalciteUtils.isAggregateFunctionSupported(aggCall.getAggregation())) { + return false; + } + } + + return true; + } + + public static boolean isConvertible(Filter input) { + return CalciteUtils.isExpressionSupported(input.getCondition()); + } + + public static boolean isConvertible(Join input) { + return CalciteUtils.isExpressionSupported(input.getCondition()); + } + + public static boolean isConvertible(Project input) { + for (RexNode project : input.getProjects()) { + if (!CalciteUtils.isExpressionSupported(project)) { + return false; + } + } + + return true; + } + + public static boolean isConvertible(Sort sort) { + if (sort.offset != null) + return false; + + if (sort.fetch != null + && CalciteUtils.evaluateStatelessExpression(sort.fetch) == null) + return false; + + return true; + } + + public static boolean isConvertible(Union input) { + // TODO disable for now since PhoenixUnion is not implemented yet. + return false; + } } // End PhoenixRules.java