Repository: phoenix
Updated Branches:
  refs/heads/calcite 7f2d11784 -> 145db4a20


PHOENIX-2193 Add rules to push down Sort through Union


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/145db4a2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/145db4a2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/145db4a2

Branch: refs/heads/calcite
Commit: 145db4a20b5fd4b8a8c030406525af25e0651948
Parents: 7f2d117
Author: maryannxue <wei....@intel.com>
Authored: Tue Oct 20 22:41:18 2015 -0400
Committer: maryannxue <wei....@intel.com>
Committed: Tue Oct 20 22:41:18 2015 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteIT.java   | 36 ++++++---
 .../calcite/jdbc/PhoenixPrepareImpl.java        | 45 +++++++++++
 .../calcite/metadata/PhoenixRelMdCollation.java | 58 ++++++++++++++
 .../calcite/rel/PhoenixAbstractSort.java        |  4 +-
 .../phoenix/calcite/rel/PhoenixClientSort.java  |  2 +-
 .../calcite/rel/PhoenixCompactClientSort.java   |  2 +-
 .../calcite/rel/PhoenixMergeSortUnion.java      | 79 ++++++++++++++++++++
 .../phoenix/calcite/rel/PhoenixServerSort.java  |  2 +-
 .../calcite/rules/PhoenixConverterRules.java    | 41 ++++++++++
 9 files changed, 254 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
index 0717d74..f8641da 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
@@ -1046,21 +1046,37 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         start(false).sql("select entity_id, a_string from atable where 
a_string = 'a' union all select entity_id, a_string from atable where a_string 
= 'c' order by entity_id desc limit 3")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixLimit(fetch=[3])\n" +
-                           "    PhoenixClientSort(sort0=[$0], dir0=[DESC])\n" +
-                           "      PhoenixUnion(all=[true])\n" +
-                           "        PhoenixLimit(fetch=[3])\n" +
-                           "          PhoenixServerSort(sort0=[$0], 
dir0=[DESC])\n" +
-                           "            PhoenixServerProject(ENTITY_ID=[$1], 
A_STRING=[$2])\n" +
-                           "              PhoenixTableScan(table=[[phoenix, 
ATABLE]], filter=[=($2, 'a')])\n" +
-                           "        PhoenixLimit(fetch=[3])\n" +
-                           "          PhoenixServerSort(sort0=[$0], 
dir0=[DESC])\n" +
-                           "            PhoenixServerProject(ENTITY_ID=[$1], 
A_STRING=[$2])\n" +
-                           "              PhoenixTableScan(table=[[phoenix, 
ATABLE]], filter=[=($2, 'c')])\n")
+                           "    PhoenixMergeSortUnion(all=[true])\n" +
+                           "      PhoenixLimit(fetch=[3])\n" +
+                           "        PhoenixServerSort(sort0=[$0], 
dir0=[DESC])\n" +
+                           "          PhoenixServerProject(ENTITY_ID=[$1], 
A_STRING=[$2])\n" +
+                           "            PhoenixTableScan(table=[[phoenix, 
ATABLE]], filter=[=($2, 'a')])\n" +
+                           "      PhoenixLimit(fetch=[3])\n" +
+                           "        PhoenixServerSort(sort0=[$0], 
dir0=[DESC])\n" +
+                           "          PhoenixServerProject(ENTITY_ID=[$1], 
A_STRING=[$2])\n" +
+                           "            PhoenixTableScan(table=[[phoenix, 
ATABLE]], filter=[=($2, 'c')])\n")
                 .resultIs(new Object[][] {
                         {"00C923122312312", "c"},
                         {"00A423122312312", "a"},
                         {"00A323122312312", "a"}})
                 .close();
+        
+        start(false).sql("select entity_id, a_string from atable where 
a_string = 'a' union all select entity_id, a_string from atable where a_string 
= 'c' order by entity_id desc")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixMergeSortUnion(all=[true])\n" +
+                           "    PhoenixServerSort(sort0=[$0], dir0=[DESC])\n" +
+                           "      PhoenixServerProject(ENTITY_ID=[$1], 
A_STRING=[$2])\n" +
+                           "        PhoenixTableScan(table=[[phoenix, 
ATABLE]], filter=[=($2, 'a')])\n" +
+                           "    PhoenixServerSort(sort0=[$0], dir0=[DESC])\n" +
+                           "      PhoenixServerProject(ENTITY_ID=[$1], 
A_STRING=[$2])\n" +
+                           "        PhoenixTableScan(table=[[phoenix, 
ATABLE]], filter=[=($2, 'c')])\n")
+                .resultIs(new Object[][] {
+                        {"00C923122312312", "c"},
+                        {"00A423122312312", "a"},
+                        {"00A323122312312", "a"},
+                        {"00A223122312312", "a"},
+                        {"00A123122312312", "a"}})
+                .close();
     }
     
     @Test public void testUnnest() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
index 84ac6bf..d3ed709 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
@@ -1,16 +1,27 @@
 package org.apache.phoenix.calcite.jdbc;
 
+import java.util.List;
+
 import org.apache.calcite.adapter.enumerable.EnumerableRules;
 import org.apache.calcite.jdbc.CalcitePrepare;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.plan.RelOptCostFactory;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.prepare.CalcitePrepareImpl;
+import org.apache.calcite.prepare.Prepare.Materialization;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.rules.JoinCommuteRule;
+import org.apache.calcite.runtime.Hook;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.tools.Program;
+import org.apache.calcite.tools.Programs;
+import org.apache.calcite.util.Holder;
+import org.apache.calcite.util.Pair;
 import org.apache.phoenix.calcite.PhoenixSchema;
+import org.apache.phoenix.calcite.metadata.PhoenixRelMetadataProvider;
 import org.apache.phoenix.calcite.parse.SqlCreateView;
 import org.apache.phoenix.calcite.parser.PhoenixParserImpl;
 import org.apache.phoenix.calcite.rules.PhoenixAddScanLimitRule;
@@ -19,6 +30,8 @@ import 
org.apache.phoenix.calcite.rules.PhoenixFilterScanMergeRule;
 import org.apache.phoenix.calcite.rules.PhoenixInnerSortRemoveRule;
 import 
org.apache.phoenix.calcite.rules.PhoenixJoinSingleValueAggregateMergeRule;
 
+import com.google.common.base.Function;
+
 public class PhoenixPrepareImpl extends CalcitePrepareImpl {
     public static final ThreadLocal<String> THREAD_SQL_STRING =
         new ThreadLocal<>();
@@ -73,6 +86,38 @@ public class PhoenixPrepareImpl extends CalcitePrepareImpl {
                 }
             }
         }
+        
+        Hook.PROGRAM.add(new Function<Pair<List<Materialization>, 
Holder<Program>>, Object>() {
+                       @Override
+                       public Object apply(
+                                       Pair<List<Materialization>, 
Holder<Program>> input) {
+                               final Program program1 =
+                                               new Program() {
+                                       public RelNode run(RelOptPlanner 
planner, RelNode rel,
+                                                       RelTraitSet 
requiredOutputTraits) {
+                                               final RelNode rootRel2 =
+                                                               
rel.getTraitSet().equals(requiredOutputTraits)
+                                                               ? rel
+                                                                               
: planner.changeTraits(rel, requiredOutputTraits);
+                                               assert rootRel2 != null;
+
+                                               planner.setRoot(rootRel2);
+                                               final RelOptPlanner planner2 = 
planner.chooseDelegate();
+                                               final RelNode rootRel3 = 
planner2.findBestExp();
+                                               assert rootRel3 != null : 
"could not implement exp";
+                                               return rootRel3;
+                                       }
+                               };
+
+                               // Second planner pass to do physical "tweaks". 
This the first time that
+                               // EnumerableCalcRel is introduced.
+                               final Program program2 = 
Programs.hep(Programs.CALC_RULES, true, new PhoenixRelMetadataProvider());;
+
+                               Program p = Programs.sequence(program1, 
program2);
+                               input.getValue().set(p);
+                               return null;
+                       }
+        });
 
         return planner;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java
index 1b559f0..cb6b232 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java
@@ -1,6 +1,7 @@
 package org.apache.phoenix.calcite.metadata;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
@@ -16,9 +17,11 @@ import org.apache.calcite.util.ImmutableIntList;
 import org.apache.phoenix.calcite.rel.PhoenixClientJoin;
 import org.apache.phoenix.calcite.rel.PhoenixCorrelate;
 import org.apache.phoenix.calcite.rel.PhoenixLimit;
+import org.apache.phoenix.calcite.rel.PhoenixMergeSortUnion;
 import org.apache.phoenix.calcite.rel.PhoenixServerJoin;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
 
 public class PhoenixRelMdCollation {
     public static final RelMetadataProvider SOURCE =
@@ -42,6 +45,10 @@ public class PhoenixRelMdCollation {
     public ImmutableList<RelCollation> collations(PhoenixClientJoin join) {
         return 
ImmutableList.copyOf(PhoenixRelMdCollation.mergeJoin(join.getLeft(), 
join.getRight(), join.joinInfo.leftKeys, join.joinInfo.rightKeys));
     }
+
+    public ImmutableList<RelCollation> collations(PhoenixMergeSortUnion union) 
{
+        return 
ImmutableList.copyOf(PhoenixRelMdCollation.mergeSortUnion(union.getInputs(), 
union.all));
+    }
     
     /** Helper method to determine a {@link PhoenixCorrelate}'s collation. */
     public static List<RelCollation> correlate(RelNode left, RelNode right, 
SemiJoinType joinType) {
@@ -75,5 +82,56 @@ public class PhoenixRelMdCollation {
         }
         return builder.build();
     }
+    
+    public static List<RelCollation> mergeSortUnion(List<RelNode> inputs, 
boolean all) {
+       if (!all) {
+               return ImmutableList.of(RelCollations.EMPTY);
+       }
+       
+       Set<RelCollation> mergedCollations = null;
+       for (RelNode input : inputs) {
+               final ImmutableList<RelCollation> inputCollations = 
RelMetadataQuery.collations(input);
+               Set<RelCollation> nonEmptyInputCollations = Sets.newHashSet();
+                       for (RelCollation collation : inputCollations) {
+                               if (!collation.getFieldCollations().isEmpty()) {
+                                       nonEmptyInputCollations.add(collation);
+                               }
+                       }
+               
+                       if (nonEmptyInputCollations.isEmpty() || 
mergedCollations == null) {
+                       mergedCollations = nonEmptyInputCollations;
+               } else {
+                       Set<RelCollation> newCollations = Sets.newHashSet();
+                       for (RelCollation m : mergedCollations) {
+                               for (RelCollation n : nonEmptyInputCollations) {
+                                       if (n.satisfies(m)) {
+                                               newCollations.add(m);
+                                               break;
+                                       }
+                               }
+                       }
+                       for (RelCollation n : nonEmptyInputCollations) {
+                               for (RelCollation m : mergedCollations) {
+                                       if (m.satisfies(n)) {
+                                               newCollations.add(n);
+                                               break;
+                                       }
+                               }
+                       }
+                       mergedCollations = newCollations;
+               }
+                       
+               if (mergedCollations.isEmpty()) {
+                       break;
+               }
+       }
+       
+       // We only return the simplified collation here because 
PhoenixMergeSortUnion
+       // needs a definite way for implement().
+               if (mergedCollations.size() != 1) {
+                       return ImmutableList.of(RelCollations.EMPTY);
+               }
+        return ImmutableList.of(mergedCollations.iterator().next());
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
index c2ac235..66ad9f0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
@@ -32,9 +32,9 @@ abstract public class PhoenixAbstractSort extends Sort 
implements PhoenixRel {
         assert !getCollation().getFieldCollations().isEmpty();
     }
     
-    protected OrderBy getOrderBy(Implementor implementor, TupleProjector 
tupleProjector) {
+    protected static OrderBy getOrderBy(RelCollation collation, Implementor 
implementor, TupleProjector tupleProjector) {
         List<OrderByExpression> orderByExpressions = Lists.newArrayList();
-        for (RelFieldCollation fieldCollation : 
getCollation().getFieldCollations()) {
+        for (RelFieldCollation fieldCollation : 
collation.getFieldCollations()) {
             Expression expr = tupleProjector == null ? 
                       
implementor.newColumnExpression(fieldCollation.getFieldIndex()) 
                     : 
tupleProjector.getExpressions()[fieldCollation.getFieldIndex()];

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
index 09218c8..f5a65df 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
@@ -67,7 +67,7 @@ public class PhoenixClientSort extends PhoenixAbstractSort {
             throw new RuntimeException(e);
         }
         
-        OrderBy orderBy = super.getOrderBy(implementor, null);
+        OrderBy orderBy = super.getOrderBy(getCollation(), implementor, null);
         
         return new ClientScanPlan(context, plan.getStatement(), tableRef, 
RowProjector.EMPTY_PROJECTOR, null, null, orderBy, plan);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
index d881f3f..15372bd 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
@@ -75,7 +75,7 @@ public class PhoenixCompactClientSort extends 
PhoenixAbstractSort {
             basePlan = (AggregatePlan) delegate;
         }
         
-        OrderBy orderBy = super.getOrderBy(implementor, tupleProjector);
+        OrderBy orderBy = super.getOrderBy(getCollation(), implementor, 
tupleProjector);
         QueryPlan newPlan = AggregatePlan.create((AggregatePlan) basePlan, 
orderBy);
         
         if (hashJoinPlan != null) {        

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
new file mode 100644
index 0000000..f3e162b
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
@@ -0,0 +1,79 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.UnionPlan;
+import org.apache.phoenix.parse.SelectStatement;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class PhoenixMergeSortUnion extends Union implements PhoenixRel {
+       public final RelCollation collation;
+    
+    public static PhoenixMergeSortUnion create(final List<RelNode> inputs, 
final boolean all) {
+       final List<RelCollation> collationList = 
PhoenixRelMdCollation.mergeSortUnion(inputs, all);
+       assert collationList.size() == 1;
+       final RelCollation collation = collationList.get(0);
+        RelOptCluster cluster = inputs.get(0).getCluster();
+        RelTraitSet traits = 
+                       cluster.traitSetOf(PhoenixRel.CLIENT_CONVENTION)
+                .replaceIfs(RelCollationTraitDef.INSTANCE,
+                        new Supplier<List<RelCollation>>() {
+                    public List<RelCollation> get() {
+                        return ImmutableList.<RelCollation> of(collation);
+                    }
+                });
+        return new PhoenixMergeSortUnion(cluster, traits, inputs, all, 
collation);
+    }
+    
+    private PhoenixMergeSortUnion(RelOptCluster cluster, RelTraitSet traits, 
List<RelNode> inputs, boolean all, RelCollation collation) {
+        super(cluster, traits, inputs, all);
+        this.collation = collation;
+    }
+
+    @Override
+    public PhoenixMergeSortUnion copy(RelTraitSet traits, List<RelNode> 
inputs, boolean all) {
+        return create(inputs, all);
+    }
+
+    @Override
+    public RelOptCost computeSelfCost(RelOptPlanner planner) {
+        for (RelNode input : getInputs()) {
+            if (input.getConvention() != PhoenixRel.CLIENT_CONVENTION) {
+                return planner.getCostFactory().makeInfiniteCost();
+            }
+        }
+        
+        double mergeSortFactor = 1.1;
+        return super.computeSelfCost(planner)
+                .multiplyBy(PHOENIX_FACTOR).multiplyBy(mergeSortFactor);
+    }
+
+    @Override
+    public QueryPlan implement(Implementor implementor) {
+        List<QueryPlan> subPlans = 
Lists.newArrayListWithExpectedSize(inputs.size());
+        for (Ord<RelNode> input : Ord.zip(inputs)) {
+            subPlans.add(implementor.visitInput(input.i, (PhoenixRel) 
input.e));
+        }
+        
+        final OrderBy orderBy = PhoenixAbstractSort.getOrderBy(collation, 
implementor, null);
+        return new UnionPlan(subPlans.get(0).getContext(), 
SelectStatement.SELECT_ONE, subPlans.get(0).getTableRef(), 
RowProjector.EMPTY_PROJECTOR,
+                null, orderBy, GroupBy.EMPTY_GROUP_BY, subPlans, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
index 0818ce6..b43754c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
@@ -68,7 +68,7 @@ public class PhoenixServerSort extends PhoenixAbstractSort {
             basePlan = (ScanPlan) delegate;
         }
         
-        OrderBy orderBy = super.getOrderBy(implementor, null);
+        OrderBy orderBy = super.getOrderBy(getCollation(), implementor, null);
         QueryPlan newPlan;
         try {
             newPlan = ScanPlan.create((ScanPlan) basePlan, orderBy);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
index c60c27b..bab9036 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
@@ -39,6 +39,7 @@ import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.trace.CalciteTrace;
 import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
 import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate;
 import org.apache.phoenix.calcite.rel.PhoenixClientAggregate;
 import org.apache.phoenix.calcite.rel.PhoenixClientJoin;
@@ -48,6 +49,7 @@ import org.apache.phoenix.calcite.rel.PhoenixClientSort;
 import org.apache.phoenix.calcite.rel.PhoenixCorrelate;
 import org.apache.phoenix.calcite.rel.PhoenixFilter;
 import org.apache.phoenix.calcite.rel.PhoenixLimit;
+import org.apache.phoenix.calcite.rel.PhoenixMergeSortUnion;
 import org.apache.phoenix.calcite.rel.PhoenixRel;
 import org.apache.phoenix.calcite.rel.PhoenixServerAggregate;
 import org.apache.phoenix.calcite.rel.PhoenixServerJoin;
@@ -89,6 +91,7 @@ public class PhoenixConverterRules {
         PhoenixServerAggregateRule.SERVER,
         PhoenixServerAggregateRule.SERVERJOIN,
         PhoenixUnionRule.INSTANCE,
+        PhoenixMergeSortUnionRule.INSTANCE,
         PhoenixClientJoinRule.INSTANCE,
         PhoenixServerJoinRule.INSTANCE,
         PhoenixClientSemiJoinRule.INSTANCE,
@@ -113,6 +116,7 @@ public class PhoenixConverterRules {
         PhoenixServerAggregateRule.CONVERTIBLE_SERVER,
         PhoenixServerAggregateRule.CONVERTIBLE_SERVERJOIN,
         PhoenixUnionRule.CONVERTIBLE,
+        PhoenixMergeSortUnionRule.CONVERTIBLE,
         PhoenixClientJoinRule.CONVERTIBLE,
         PhoenixServerJoinRule.CONVERTIBLE,
         PhoenixClientSemiJoinRule.INSTANCE,
@@ -484,6 +488,43 @@ public class PhoenixConverterRules {
     }
 
     /**
+     * Rule to convert a {@link org.apache.calcite.rel.core.Union} to a
+     * {@link PhoenixMergeSortUnion}.
+     */
+    public static class PhoenixMergeSortUnionRule extends PhoenixConverterRule 
{
+        private static Predicate<LogicalUnion> IS_CONVERTIBLE = new 
Predicate<LogicalUnion>() {
+            @Override
+            public boolean apply(LogicalUnion input) {
+                return isConvertible(input);
+            }            
+        };
+        
+        private static Predicate<LogicalUnion> NON_EMPTY_COLLATION = new 
Predicate<LogicalUnion>() {
+                       @Override
+                       public boolean apply(LogicalUnion input) {
+                               List<RelCollation> collations = 
PhoenixRelMdCollation.mergeSortUnion(input.getInputs(), input.all);
+                               return collations.size() == 1 && 
!collations.get(0).getFieldCollations().isEmpty();
+                       }
+        };
+        
+        public static final PhoenixMergeSortUnionRule INSTANCE = new 
PhoenixMergeSortUnionRule(NON_EMPTY_COLLATION);
+        
+        public static final PhoenixMergeSortUnionRule CONVERTIBLE = new 
PhoenixMergeSortUnionRule(Predicates.and(IS_CONVERTIBLE, NON_EMPTY_COLLATION));
+
+        private PhoenixMergeSortUnionRule(Predicate<LogicalUnion> predicate) {
+            super(LogicalUnion.class, predicate, Convention.NONE, 
+                    PhoenixRel.CLIENT_CONVENTION, "PhoenixMergeSortUnionRule");
+        }
+
+        public RelNode convert(RelNode rel) {
+            final LogicalUnion union = (LogicalUnion) rel;
+            return PhoenixMergeSortUnion.create(
+                    convertList(union.getInputs(), out),
+                    union.all);
+        }
+    }
+
+    /**
      * Rule to convert a {@link org.apache.calcite.rel.core.Join} to a
      * {@link PhoenixClientJoin}.
      */

Reply via email to