Repository: phoenix Updated Branches: refs/heads/calcite 7f2d11784 -> 145db4a20
PHOENIX-2193 Add rules to push down Sort through Union Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/145db4a2 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/145db4a2 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/145db4a2 Branch: refs/heads/calcite Commit: 145db4a20b5fd4b8a8c030406525af25e0651948 Parents: 7f2d117 Author: maryannxue <wei....@intel.com> Authored: Tue Oct 20 22:41:18 2015 -0400 Committer: maryannxue <wei....@intel.com> Committed: Tue Oct 20 22:41:18 2015 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteIT.java | 36 ++++++--- .../calcite/jdbc/PhoenixPrepareImpl.java | 45 +++++++++++ .../calcite/metadata/PhoenixRelMdCollation.java | 58 ++++++++++++++ .../calcite/rel/PhoenixAbstractSort.java | 4 +- .../phoenix/calcite/rel/PhoenixClientSort.java | 2 +- .../calcite/rel/PhoenixCompactClientSort.java | 2 +- .../calcite/rel/PhoenixMergeSortUnion.java | 79 ++++++++++++++++++++ .../phoenix/calcite/rel/PhoenixServerSort.java | 2 +- .../calcite/rules/PhoenixConverterRules.java | 41 ++++++++++ 9 files changed, 254 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java index 0717d74..f8641da 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java @@ -1046,21 +1046,37 @@ public class CalciteIT extends BaseClientManagedTimeIT { start(false).sql("select entity_id, a_string from atable where a_string = 'a' union all select entity_id, a_string from atable where a_string = 'c' order by entity_id desc limit 3") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixLimit(fetch=[3])\n" + - " PhoenixClientSort(sort0=[$0], dir0=[DESC])\n" + - " PhoenixUnion(all=[true])\n" + - " PhoenixLimit(fetch=[3])\n" + - " PhoenixServerSort(sort0=[$0], dir0=[DESC])\n" + - " PhoenixServerProject(ENTITY_ID=[$1], A_STRING=[$2])\n" + - " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n" + - " PhoenixLimit(fetch=[3])\n" + - " PhoenixServerSort(sort0=[$0], dir0=[DESC])\n" + - " PhoenixServerProject(ENTITY_ID=[$1], A_STRING=[$2])\n" + - " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'c')])\n") + " PhoenixMergeSortUnion(all=[true])\n" + + " PhoenixLimit(fetch=[3])\n" + + " PhoenixServerSort(sort0=[$0], dir0=[DESC])\n" + + " PhoenixServerProject(ENTITY_ID=[$1], A_STRING=[$2])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n" + + " PhoenixLimit(fetch=[3])\n" + + " PhoenixServerSort(sort0=[$0], dir0=[DESC])\n" + + " PhoenixServerProject(ENTITY_ID=[$1], A_STRING=[$2])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'c')])\n") .resultIs(new Object[][] { {"00C923122312312", "c"}, {"00A423122312312", "a"}, {"00A323122312312", "a"}}) .close(); + + start(false).sql("select entity_id, a_string from atable where a_string = 'a' union all select entity_id, a_string from atable where a_string = 'c' order by entity_id desc") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixMergeSortUnion(all=[true])\n" + + " PhoenixServerSort(sort0=[$0], dir0=[DESC])\n" + + " PhoenixServerProject(ENTITY_ID=[$1], A_STRING=[$2])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n" + + " PhoenixServerSort(sort0=[$0], dir0=[DESC])\n" + + " PhoenixServerProject(ENTITY_ID=[$1], A_STRING=[$2])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'c')])\n") + .resultIs(new Object[][] { + {"00C923122312312", "c"}, + {"00A423122312312", "a"}, + {"00A323122312312", "a"}, + {"00A223122312312", "a"}, + {"00A123122312312", "a"}}) + .close(); } @Test public void testUnnest() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java index 84ac6bf..d3ed709 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java @@ -1,16 +1,27 @@ package org.apache.phoenix.calcite.jdbc; +import java.util.List; + import org.apache.calcite.adapter.enumerable.EnumerableRules; import org.apache.calcite.jdbc.CalcitePrepare; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.plan.RelOptCostFactory; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.prepare.CalcitePrepareImpl; +import org.apache.calcite.prepare.Prepare.Materialization; +import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.rules.JoinCommuteRule; +import org.apache.calcite.runtime.Hook; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.tools.Program; +import org.apache.calcite.tools.Programs; +import org.apache.calcite.util.Holder; +import org.apache.calcite.util.Pair; import org.apache.phoenix.calcite.PhoenixSchema; +import org.apache.phoenix.calcite.metadata.PhoenixRelMetadataProvider; import org.apache.phoenix.calcite.parse.SqlCreateView; import org.apache.phoenix.calcite.parser.PhoenixParserImpl; import org.apache.phoenix.calcite.rules.PhoenixAddScanLimitRule; @@ -19,6 +30,8 @@ import org.apache.phoenix.calcite.rules.PhoenixFilterScanMergeRule; import org.apache.phoenix.calcite.rules.PhoenixInnerSortRemoveRule; import org.apache.phoenix.calcite.rules.PhoenixJoinSingleValueAggregateMergeRule; +import com.google.common.base.Function; + public class PhoenixPrepareImpl extends CalcitePrepareImpl { public static final ThreadLocal<String> THREAD_SQL_STRING = new ThreadLocal<>(); @@ -73,6 +86,38 @@ public class PhoenixPrepareImpl extends CalcitePrepareImpl { } } } + + Hook.PROGRAM.add(new Function<Pair<List<Materialization>, Holder<Program>>, Object>() { + @Override + public Object apply( + Pair<List<Materialization>, Holder<Program>> input) { + final Program program1 = + new Program() { + public RelNode run(RelOptPlanner planner, RelNode rel, + RelTraitSet requiredOutputTraits) { + final RelNode rootRel2 = + rel.getTraitSet().equals(requiredOutputTraits) + ? rel + : planner.changeTraits(rel, requiredOutputTraits); + assert rootRel2 != null; + + planner.setRoot(rootRel2); + final RelOptPlanner planner2 = planner.chooseDelegate(); + final RelNode rootRel3 = planner2.findBestExp(); + assert rootRel3 != null : "could not implement exp"; + return rootRel3; + } + }; + + // Second planner pass to do physical "tweaks". This the first time that + // EnumerableCalcRel is introduced. + final Program program2 = Programs.hep(Programs.CALC_RULES, true, new PhoenixRelMetadataProvider());; + + Program p = Programs.sequence(program1, program2); + input.getValue().set(p); + return null; + } + }); return planner; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java index 1b559f0..cb6b232 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java @@ -1,6 +1,7 @@ package org.apache.phoenix.calcite.metadata; import java.util.List; +import java.util.Set; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollations; @@ -16,9 +17,11 @@ import org.apache.calcite.util.ImmutableIntList; import org.apache.phoenix.calcite.rel.PhoenixClientJoin; import org.apache.phoenix.calcite.rel.PhoenixCorrelate; import org.apache.phoenix.calcite.rel.PhoenixLimit; +import org.apache.phoenix.calcite.rel.PhoenixMergeSortUnion; import org.apache.phoenix.calcite.rel.PhoenixServerJoin; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; public class PhoenixRelMdCollation { public static final RelMetadataProvider SOURCE = @@ -42,6 +45,10 @@ public class PhoenixRelMdCollation { public ImmutableList<RelCollation> collations(PhoenixClientJoin join) { return ImmutableList.copyOf(PhoenixRelMdCollation.mergeJoin(join.getLeft(), join.getRight(), join.joinInfo.leftKeys, join.joinInfo.rightKeys)); } + + public ImmutableList<RelCollation> collations(PhoenixMergeSortUnion union) { + return ImmutableList.copyOf(PhoenixRelMdCollation.mergeSortUnion(union.getInputs(), union.all)); + } /** Helper method to determine a {@link PhoenixCorrelate}'s collation. */ public static List<RelCollation> correlate(RelNode left, RelNode right, SemiJoinType joinType) { @@ -75,5 +82,56 @@ public class PhoenixRelMdCollation { } return builder.build(); } + + public static List<RelCollation> mergeSortUnion(List<RelNode> inputs, boolean all) { + if (!all) { + return ImmutableList.of(RelCollations.EMPTY); + } + + Set<RelCollation> mergedCollations = null; + for (RelNode input : inputs) { + final ImmutableList<RelCollation> inputCollations = RelMetadataQuery.collations(input); + Set<RelCollation> nonEmptyInputCollations = Sets.newHashSet(); + for (RelCollation collation : inputCollations) { + if (!collation.getFieldCollations().isEmpty()) { + nonEmptyInputCollations.add(collation); + } + } + + if (nonEmptyInputCollations.isEmpty() || mergedCollations == null) { + mergedCollations = nonEmptyInputCollations; + } else { + Set<RelCollation> newCollations = Sets.newHashSet(); + for (RelCollation m : mergedCollations) { + for (RelCollation n : nonEmptyInputCollations) { + if (n.satisfies(m)) { + newCollations.add(m); + break; + } + } + } + for (RelCollation n : nonEmptyInputCollations) { + for (RelCollation m : mergedCollations) { + if (m.satisfies(n)) { + newCollations.add(n); + break; + } + } + } + mergedCollations = newCollations; + } + + if (mergedCollations.isEmpty()) { + break; + } + } + + // We only return the simplified collation here because PhoenixMergeSortUnion + // needs a definite way for implement(). + if (mergedCollations.size() != 1) { + return ImmutableList.of(RelCollations.EMPTY); + } + return ImmutableList.of(mergedCollations.iterator().next()); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java index c2ac235..66ad9f0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java @@ -32,9 +32,9 @@ abstract public class PhoenixAbstractSort extends Sort implements PhoenixRel { assert !getCollation().getFieldCollations().isEmpty(); } - protected OrderBy getOrderBy(Implementor implementor, TupleProjector tupleProjector) { + protected static OrderBy getOrderBy(RelCollation collation, Implementor implementor, TupleProjector tupleProjector) { List<OrderByExpression> orderByExpressions = Lists.newArrayList(); - for (RelFieldCollation fieldCollation : getCollation().getFieldCollations()) { + for (RelFieldCollation fieldCollation : collation.getFieldCollations()) { Expression expr = tupleProjector == null ? implementor.newColumnExpression(fieldCollation.getFieldIndex()) : tupleProjector.getExpressions()[fieldCollation.getFieldIndex()]; http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java index 09218c8..f5a65df 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java @@ -67,7 +67,7 @@ public class PhoenixClientSort extends PhoenixAbstractSort { throw new RuntimeException(e); } - OrderBy orderBy = super.getOrderBy(implementor, null); + OrderBy orderBy = super.getOrderBy(getCollation(), implementor, null); return new ClientScanPlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, null, null, orderBy, plan); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java index d881f3f..15372bd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java @@ -75,7 +75,7 @@ public class PhoenixCompactClientSort extends PhoenixAbstractSort { basePlan = (AggregatePlan) delegate; } - OrderBy orderBy = super.getOrderBy(implementor, tupleProjector); + OrderBy orderBy = super.getOrderBy(getCollation(), implementor, tupleProjector); QueryPlan newPlan = AggregatePlan.create((AggregatePlan) basePlan, orderBy); if (hashJoinPlan != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java new file mode 100644 index 0000000..f3e162b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java @@ -0,0 +1,79 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; + +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Union; +import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.execute.UnionPlan; +import org.apache.phoenix.parse.SelectStatement; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class PhoenixMergeSortUnion extends Union implements PhoenixRel { + public final RelCollation collation; + + public static PhoenixMergeSortUnion create(final List<RelNode> inputs, final boolean all) { + final List<RelCollation> collationList = PhoenixRelMdCollation.mergeSortUnion(inputs, all); + assert collationList.size() == 1; + final RelCollation collation = collationList.get(0); + RelOptCluster cluster = inputs.get(0).getCluster(); + RelTraitSet traits = + cluster.traitSetOf(PhoenixRel.CLIENT_CONVENTION) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + return ImmutableList.<RelCollation> of(collation); + } + }); + return new PhoenixMergeSortUnion(cluster, traits, inputs, all, collation); + } + + private PhoenixMergeSortUnion(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all, RelCollation collation) { + super(cluster, traits, inputs, all); + this.collation = collation; + } + + @Override + public PhoenixMergeSortUnion copy(RelTraitSet traits, List<RelNode> inputs, boolean all) { + return create(inputs, all); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + for (RelNode input : getInputs()) { + if (input.getConvention() != PhoenixRel.CLIENT_CONVENTION) { + return planner.getCostFactory().makeInfiniteCost(); + } + } + + double mergeSortFactor = 1.1; + return super.computeSelfCost(planner) + .multiplyBy(PHOENIX_FACTOR).multiplyBy(mergeSortFactor); + } + + @Override + public QueryPlan implement(Implementor implementor) { + List<QueryPlan> subPlans = Lists.newArrayListWithExpectedSize(inputs.size()); + for (Ord<RelNode> input : Ord.zip(inputs)) { + subPlans.add(implementor.visitInput(input.i, (PhoenixRel) input.e)); + } + + final OrderBy orderBy = PhoenixAbstractSort.getOrderBy(collation, implementor, null); + return new UnionPlan(subPlans.get(0).getContext(), SelectStatement.SELECT_ONE, subPlans.get(0).getTableRef(), RowProjector.EMPTY_PROJECTOR, + null, orderBy, GroupBy.EMPTY_GROUP_BY, subPlans, null); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java index 0818ce6..b43754c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java @@ -68,7 +68,7 @@ public class PhoenixServerSort extends PhoenixAbstractSort { basePlan = (ScanPlan) delegate; } - OrderBy orderBy = super.getOrderBy(implementor, null); + OrderBy orderBy = super.getOrderBy(getCollation(), implementor, null); QueryPlan newPlan; try { newPlan = ScanPlan.create((ScanPlan) basePlan, orderBy); http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java index c60c27b..bab9036 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java @@ -39,6 +39,7 @@ import org.apache.calcite.rel.logical.LogicalValues; import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.trace.CalciteTrace; import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation; import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate; import org.apache.phoenix.calcite.rel.PhoenixClientAggregate; import org.apache.phoenix.calcite.rel.PhoenixClientJoin; @@ -48,6 +49,7 @@ import org.apache.phoenix.calcite.rel.PhoenixClientSort; import org.apache.phoenix.calcite.rel.PhoenixCorrelate; import org.apache.phoenix.calcite.rel.PhoenixFilter; import org.apache.phoenix.calcite.rel.PhoenixLimit; +import org.apache.phoenix.calcite.rel.PhoenixMergeSortUnion; import org.apache.phoenix.calcite.rel.PhoenixRel; import org.apache.phoenix.calcite.rel.PhoenixServerAggregate; import org.apache.phoenix.calcite.rel.PhoenixServerJoin; @@ -89,6 +91,7 @@ public class PhoenixConverterRules { PhoenixServerAggregateRule.SERVER, PhoenixServerAggregateRule.SERVERJOIN, PhoenixUnionRule.INSTANCE, + PhoenixMergeSortUnionRule.INSTANCE, PhoenixClientJoinRule.INSTANCE, PhoenixServerJoinRule.INSTANCE, PhoenixClientSemiJoinRule.INSTANCE, @@ -113,6 +116,7 @@ public class PhoenixConverterRules { PhoenixServerAggregateRule.CONVERTIBLE_SERVER, PhoenixServerAggregateRule.CONVERTIBLE_SERVERJOIN, PhoenixUnionRule.CONVERTIBLE, + PhoenixMergeSortUnionRule.CONVERTIBLE, PhoenixClientJoinRule.CONVERTIBLE, PhoenixServerJoinRule.CONVERTIBLE, PhoenixClientSemiJoinRule.INSTANCE, @@ -484,6 +488,43 @@ public class PhoenixConverterRules { } /** + * Rule to convert a {@link org.apache.calcite.rel.core.Union} to a + * {@link PhoenixMergeSortUnion}. + */ + public static class PhoenixMergeSortUnionRule extends PhoenixConverterRule { + private static Predicate<LogicalUnion> IS_CONVERTIBLE = new Predicate<LogicalUnion>() { + @Override + public boolean apply(LogicalUnion input) { + return isConvertible(input); + } + }; + + private static Predicate<LogicalUnion> NON_EMPTY_COLLATION = new Predicate<LogicalUnion>() { + @Override + public boolean apply(LogicalUnion input) { + List<RelCollation> collations = PhoenixRelMdCollation.mergeSortUnion(input.getInputs(), input.all); + return collations.size() == 1 && !collations.get(0).getFieldCollations().isEmpty(); + } + }; + + public static final PhoenixMergeSortUnionRule INSTANCE = new PhoenixMergeSortUnionRule(NON_EMPTY_COLLATION); + + public static final PhoenixMergeSortUnionRule CONVERTIBLE = new PhoenixMergeSortUnionRule(Predicates.and(IS_CONVERTIBLE, NON_EMPTY_COLLATION)); + + private PhoenixMergeSortUnionRule(Predicate<LogicalUnion> predicate) { + super(LogicalUnion.class, predicate, Convention.NONE, + PhoenixRel.CLIENT_CONVENTION, "PhoenixMergeSortUnionRule"); + } + + public RelNode convert(RelNode rel) { + final LogicalUnion union = (LogicalUnion) rel; + return PhoenixMergeSortUnion.create( + convertList(union.getInputs(), out), + union.all); + } + } + + /** * Rule to convert a {@link org.apache.calcite.rel.core.Join} to a * {@link PhoenixClientJoin}. */