[ASTERIXDB-2244][RT] Implement micro union-all operator

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- implement support for binary micro operators in subplans
- implement micro union-all operator
- fix free variables visitor

Change-Id: I11be926f175889978c144dd4483ec565d3d86e2d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2277
Reviewed-by: Till Westmann <ti...@apache.org>
Contrib: Till Westmann <ti...@apache.org>
Integration-Tests: Till Westmann <ti...@apache.org>
Tested-by: Till Westmann <ti...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/10e5ad1a
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/10e5ad1a
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/10e5ad1a

Branch: refs/heads/master
Commit: 10e5ad1a789d436639863273028f565960dface0
Parents: caf4306
Author: Dmitry Lychagin <dmitry.lycha...@couchbase.com>
Authored: Mon Jan 29 11:14:45 2018 -0800
Committer: Till Westmann <ti...@apache.org>
Committed: Wed Jan 31 16:22:47 2018 -0800

----------------------------------------------------------------------
 .../rules/SetAsterixPhysicalOperatorsRule.java  |  18 +-
 ...ineSubplanInputForNestedTupleSourceRule.java |  79 ++++---
 .../rules/subplan/SubplanFlatteningUtil.java    |   3 +-
 .../app/bootstrap/TestNodeController.java       |   4 +-
 .../non_unary_subplan_01.1.ddl.sqlpp            |  27 +++
 .../non_unary_subplan_01.2.update.sqlpp         |  28 +++
 .../non_unary_subplan_01.3.query.sqlpp          |  33 +++
 .../non_unary_subplan_01.1.adm                  |   2 +
 .../resources/runtimets/testsuite_sqlpp.xml     |   5 +
 .../main/resources/asx_errormsg/en.properties   |   2 +-
 .../lang/sqlpp/visitor/FreeVariableVisitor.java |   2 +-
 .../runtime/CommitRuntimeFactory.java           |   8 +-
 .../api/HeuristicCompilerFactoryBuilder.java    |   2 +-
 .../core/algebra/base/PhysicalOperatorTag.java  |   1 +
 .../physical/AbstractPhysicalOperator.java      |  60 +++--
 .../physical/AbstractUnionAllPOperator.java     |  78 +++++++
 .../physical/MicroUnionAllPOperator.java        |  56 +++++
 .../operators/physical/SubplanPOperator.java    |  12 +-
 .../operators/physical/UnionAllPOperator.java   |  51 +---
 .../algebra/util/OperatorManipulationUtil.java  |  46 +++-
 .../algebricks/core/jobgen/impl/JobBuilder.java |  66 +++++-
 .../core/jobgen/impl/PlanCompiler.java          |  19 +-
 .../SetAlgebricksPhysicalOperatorsRule.java     |  17 +-
 .../subplan/PushSubplanIntoGroupByRule.java     |  19 +-
 .../algebricks/rewriter/util/JoinUtils.java     |   7 +-
 .../algebricks/algebricks-runtime/pom.xml       |   4 +
 .../runtime/base/AlgebricksPipeline.java        |  16 +-
 .../runtime/base/IPushRuntimeFactory.java       |   2 +-
 ...estedPlansAccumulatingAggregatorFactory.java |   2 +-
 .../NestedPlansRunningAggregatorFactory.java    |   2 +-
 ...AbstractOneInputOneOutputRuntimeFactory.java |   4 +-
 .../operators/base/SinkRuntimeFactory.java      |   7 +-
 .../meta/AlgebricksMetaOperatorDescriptor.java  |  11 +-
 .../operators/meta/PipelineAssembler.java       |  39 +++-
 .../operators/meta/SubplanRuntimeFactory.java   | 232 +++++++++++++------
 .../std/EmptyTupleSourceRuntimeFactory.java     |   6 +-
 .../std/NestedTupleSourceRuntimeFactory.java    |   4 +-
 .../operators/std/PrinterRuntimeFactory.java    |   4 +-
 .../operators/std/SinkWriterRuntimeFactory.java |   4 +-
 .../union/MicroUnionAllRuntimeFactory.java      | 112 +++++++++
 .../tests/pushruntime/PushRuntimeTest.java      |  11 +-
 .../hyracks/api/job/JobSpecification.java       |  10 +
 42 files changed, 848 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 464476b..4afccb0 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -80,20 +80,20 @@ public class SetAsterixPhysicalOperatorsRule implements 
IAlgebraicRewriteRule {
             return false;
         }
 
-        computeDefaultPhysicalOp(op, context);
+        computeDefaultPhysicalOp(op, true, context);
         context.addToDontApplySet(this, op);
         return true;
     }
 
-    private static void setPhysicalOperators(ILogicalPlan plan, 
IOptimizationContext context)
+    private static void setPhysicalOperators(ILogicalPlan plan, boolean 
topLevelOp, IOptimizationContext context)
             throws AlgebricksException {
         for (Mutable<ILogicalOperator> root : plan.getRoots()) {
-            computeDefaultPhysicalOp((AbstractLogicalOperator) 
root.getValue(), context);
+            computeDefaultPhysicalOp((AbstractLogicalOperator) 
root.getValue(), topLevelOp, context);
         }
     }
 
-    private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, 
IOptimizationContext context)
-            throws AlgebricksException {
+    private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, 
boolean topLevelOp,
+            IOptimizationContext context) throws AlgebricksException {
         PhysicalOptimizationConfig physicalOptimizationConfig = 
context.getPhysicalOptimizationConfig();
         if (op.getOperatorTag().equals(LogicalOperatorTag.GROUP)) {
             GroupByOperator gby = (GroupByOperator) op;
@@ -207,11 +207,11 @@ public class SetAsterixPhysicalOperatorsRule implements 
IAlgebraicRewriteRule {
         if (op.getPhysicalOperator() == null) {
             switch (op.getOperatorTag()) {
                 case INNERJOIN: {
-                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context);
+                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp, 
context);
                     break;
                 }
                 case LEFTOUTERJOIN: {
-                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, context);
+                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, 
topLevelOp, context);
                     break;
                 }
                 case UNNEST_MAP:
@@ -277,11 +277,11 @@ public class SetAsterixPhysicalOperatorsRule implements 
IAlgebraicRewriteRule {
         if (op.hasNestedPlans()) {
             AbstractOperatorWithNestedPlans nested = 
(AbstractOperatorWithNestedPlans) op;
             for (ILogicalPlan p : nested.getNestedPlans()) {
-                setPhysicalOperators(p, context);
+                setPhysicalOperators(p, false, context);
             }
         }
         for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
-            computeDefaultPhysicalOp((AbstractLogicalOperator) 
opRef.getValue(), context);
+            computeDefaultPhysicalOp((AbstractLogicalOperator) 
opRef.getValue(), topLevelOp, context);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
index a9cd806..3edccec 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.optimizer.rules.subplan;
 
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -56,21 +57,21 @@ import 
org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import 
org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
-import com.google.common.collect.ImmutableSet;
-
 /*
-This rule  is to remove SubplanOperators containing DataScan, InnerJoin, 
LeftOuterJoin, UnionAll or Distinct. Given a qualified Subplan operator called 
S1,
-Let's call its input operator O1.
+This rule  is to remove SubplanOperators containing DataScan, InnerJoin, 
LeftOuterJoin.
+Given a qualified Subplan operator called S1, Let's call its input operator O1.
 
 General Cases
 We have the following rewritings for general cases:
 R1. Replace all NestedTupleSourceOperators in S1 with deep-copies (with new 
variables) of the query plan rooted at O1;
-R2. Add a LeftOuterOperatorJoinOperator (let's call it LJ) between O1 and the 
SubplanOperator's root operator's input (let's call it SO1),
-    where O1 is the left branch and SO1 is the right branch;
-R3. The deep copy of the primary key variables in O1 should be preserved from 
an inlined NestedTupleSourceOperator to SO1.
-    The join condition of LJ is the equality between the primary key variables 
in O1 and its deep copied version at SO1;
+R2. Add a LeftOuterOperatorJoinOperator (let's call it LJ) between O1 and the 
SubplanOperator's root operator's input
+    (let's call it SO1),where O1 is the left branch and SO1 is the right 
branch;
+R3. The deep copy of the primary key variables in O1 should be preserved from 
an inlined NestedTupleSourceOperator
+    to SO1. The join condition of LJ is the equality between the primary key 
variables in O1 and its deep copied
+    version at SO1;
 R4. A variable v indicating non-match tuples is assigned to TRUE between LJ 
and SO1;
-R5. On top of the LJ, add a GroupByOperaptor in which the nested plan consists 
of the S1's root operator, i.e., an aggregate operator.
+R5. On top of the LJ, add a GroupByOperaptor in which the nested plan consists 
of the S1's root operator,
+    i.e., an aggregate operator.
     Below the aggregate, there is a not-null-filter on variable v. The group 
key is the primary key variables in O1.
 
 This is an abstract example for the rewriting mechanism described above:
@@ -102,10 +103,12 @@ After rewriting:
              .....
                
--Deepcopy_The_Plan_Rooted_At_InputOp_With_New_Variables(InputOp)
 
-In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp, 
while v_rc_1, ..., v_rc_n are their corresponding variables populated from the 
deepcopy of InputOp.
-"Covering" variables form a set of variables that can imply all live 
variables. v_l1, ....v_ln in the decoration part of the added group-by operator 
are all live variables
-at InputOp except the covering variables v_lc_1, ..., v_lc_n.  In the current 
implementation, we use "covering" variables as primary key variables. In the 
next version, we
-will use the real primary key variables, which will fix ASTERIXDB-1168.
+In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp, 
while v_rc_1, ..., v_rc_n are their
+corresponding variables populated from the deepcopy of InputOp.
+"Covering" variables form a set of variables that can imply all live 
variables. v_l1, ....v_ln in the decoration part
+of the added group-by operator are all live variables at InputOp except the 
covering variables v_lc_1, ..., v_lc_n.
+In the current implementation, we use "covering" variables as primary key 
variables.
+In the next version, we will use the real primary key variables, which will 
fix ASTERIXDB-1168.
 
 Here is a concrete example of the general case rewriting 
(optimizerts/queries/nested_loj4.aql).
 Before plan:
@@ -157,14 +160,19 @@ b. if J1 is an inner join, one input pipeline of J1 has a 
NestedTupleSource desc
 c. if J1 is a left outer join, the left branch of J1 has a NestedTupleSource 
descendant (let's call it N1),
 d. there is no tuple dropping from N1 to J1
 
-Rewriting R2 is not necessary since before J1, all tuples from N1 are 
preserved. But the following rewritings are needed:
+Rewriting R2 is not necessary since before J1, all tuples from N1 are 
preserved. But the following rewritings are
+needed:
 R1'. Replace N1 by the O1 (no additional deep copy);
 R2'. All inner joins on the path from N1 to J1 (including J1) become 
left-outer joins with the same join conditions;
-R3'. If N1 resides in the right branch of an inner join (let's call it J2) in 
the path from N1 to J1, switch the left and right branches of J2;
-R4'. For every left join from N1 to J1 transformed from an inner join, a 
variable vi indicating non-match tuples is assigned to TRUE in its right branch;
-R5'. On top of J1, a GroupByOperaptor G1 is added where the group-by key is 
the primary key of O1 and the nested query plan for aggregation is the nested 
pipeline
-     on top of J1 with an added not-null-filter to check all vi are not null.
-R6'. All other NestedTupleSourceOperators in the subplan is inlined with deep 
copies (with new variables) of the query plan rooted at O1.
+R3'. If N1 resides in the right branch of an inner join (let's call it J2) in 
the path from N1 to J1,
+     switch the left and right branches of J2;
+R4'. For every left join from N1 to J1 transformed from an inner join, a 
variable vi indicating non-match tuples
+     is assigned to TRUE in its right branch;
+R5'. On top of J1, a GroupByOperaptor G1 is added where the group-by key is 
the primary key of O1 and
+     the nested query plan for aggregation is the nested pipeline on top of J1 
with an added not-null-filter
+     to check all vi are not null.
+R6'. All other NestedTupleSourceOperators in the subplan is inlined with deep 
copies (with new variables)
+     of the query plan rooted at O1.
 
 This is an abstract example for the special rewriting mechanism described 
above:
 Before rewriting:
@@ -197,9 +205,10 @@ After rewriting:
              – Assign v_new=TRUE
                 – ..... (L1)
 
-In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp and 
v_l1, ....v_ln in the decoration part of the added group-by operator are all 
live variables
-at InputOp except the covering variables v_lc_1, ..., v_lc_n.  In the current 
implementation, we use "covering" variables as primary key variables. In the 
next version,
-we will use the real primary key variables, which will fix ASTERIXDB-1168.
+In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp and 
v_l1, ....v_ln in the decoration part
+of the added group-by operator are all live variables at InputOp except the 
covering variables v_lc_1, ..., v_lc_n.
+In the current implementation, we use "covering" variables as primary key 
variables.
+In the next version, we will use the real primary key variables, which will 
fix ASTERIXDB-1168.
 
 Here is a concrete example (optimizerts/queries/nested_loj2.aql). .
 Before plan:
@@ -343,10 +352,8 @@ public class InlineSubplanInputForNestedTupleSourceRule 
implements IAlgebraicRew
     private Pair<Boolean, LinkedHashMap<LogicalVariable, LogicalVariable>> 
applyGeneralFlattening(
             Mutable<ILogicalOperator> opRef, IOptimizationContext context) 
throws AlgebricksException {
         SubplanOperator subplanOp = (SubplanOperator) opRef.getValue();
-        if (!SubplanFlatteningUtil.containsOperators(subplanOp,
-                ImmutableSet.of(LogicalOperatorTag.DATASOURCESCAN, 
LogicalOperatorTag.INNERJOIN,
-                        // We don't have nested runtime for union-all and 
distinct hence we have to include them here.
-                        LogicalOperatorTag.LEFTOUTERJOIN, 
LogicalOperatorTag.UNIONALL, LogicalOperatorTag.DISTINCT))) {
+        if (!SubplanFlatteningUtil.containsOperators(subplanOp, 
EnumSet.of(LogicalOperatorTag.DATASOURCESCAN,
+                LogicalOperatorTag.INNERJOIN, 
LogicalOperatorTag.LEFTOUTERJOIN))) {
             return new Pair<>(false, new LinkedHashMap<>());
         }
         Mutable<ILogicalOperator> inputOpRef = subplanOp.getInputs().get(0);
@@ -402,10 +409,8 @@ public class InlineSubplanInputForNestedTupleSourceRule 
implements IAlgebraicRew
         context.computeAndSetTypeEnvironmentForOperator(leftOuterJoinOp);
 
         // Creates group-by operator.
-        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList =
-                new ArrayList<Pair<LogicalVariable, 
Mutable<ILogicalExpression>>>();
-        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> 
groupByDecorList =
-                new ArrayList<Pair<LogicalVariable, 
Mutable<ILogicalExpression>>>();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = 
new ArrayList<>();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> 
groupByDecorList = new ArrayList<>();
         List<ILogicalPlan> nestedPlans = new ArrayList<>();
         GroupByOperator groupbyOp = new GroupByOperator(groupByList, 
groupByDecorList, nestedPlans);
 
@@ -435,8 +440,8 @@ public class InlineSubplanInputForNestedTupleSourceRule 
implements IAlgebraicRew
             
lowestAggregateRefInSubplan.getValue().getInputs().add(currentOpRef);
         }
 
-        // Adds a select operator into the nested plan for group-by to remove 
tuples with NULL on {@code assignVar}, i.e.,
-        // subplan input tuples that are filtered out within a subplan.
+        // Adds a select operator into the nested plan for group-by to remove 
tuples with NULL on {@code assignVar},
+        // i.e., subplan input tuples that are filtered out within a subplan.
         Mutable<ILogicalExpression> filterVarExpr = new MutableObject<>(new 
VariableReferenceExpression(assignVar));
         List<Mutable<ILogicalExpression>> args = new ArrayList<>();
         args.add(filterVarExpr);
@@ -500,10 +505,8 @@ public class InlineSubplanInputForNestedTupleSourceRule 
implements IAlgebraicRew
         Mutable<ILogicalOperator> topJoinRef = notNullVarsAndTopJoinRef.second;
 
         // Creates a group-by operator.
-        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList =
-                new ArrayList<Pair<LogicalVariable, 
Mutable<ILogicalExpression>>>();
-        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> 
groupByDecorList =
-                new ArrayList<Pair<LogicalVariable, 
Mutable<ILogicalExpression>>>();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = 
new ArrayList<>();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> 
groupByDecorList = new ArrayList<>();
         GroupByOperator groupbyOp = new GroupByOperator(groupByList, 
groupByDecorList, subplanOp.getNestedPlans());
 
         for (LogicalVariable coverVar : primaryKeyVars) {
@@ -521,8 +524,8 @@ public class InlineSubplanInputForNestedTupleSourceRule 
implements IAlgebraicRew
         groupbyOp.getInputs().add(new MutableObject<>(topJoinRef.getValue()));
 
         if (!notNullVars.isEmpty()) {
-            // Adds a select operator into the nested plan for group-by to 
remove tuples with NULL on {@code assignVar}, i.e.,
-            // subplan input tuples that are filtered out within a subplan.
+            // Adds a select operator into the nested plan for group-by to 
remove tuples with NULL on {@code assignVar},
+            // i.e., subplan input tuples that are filtered out within a 
subplan.
             List<Mutable<ILogicalExpression>> nullCheckExprRefs = new 
ArrayList<>();
             for (LogicalVariable notNullVar : notNullVars) {
                 Mutable<ILogicalExpression> filterVarExpr =

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
index dba5d47..894097d 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.optimizer.rules.subplan;
 
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -208,7 +209,7 @@ class SubplanFlatteningUtil {
         if (!OperatorManipulationUtil.ancestorOfOperators(
                 subplanOp.getNestedPlans().get(0).getRoots().get(0).getValue(),
                 // we don't need to check recursively for this special 
rewriting.
-                ImmutableSet.of(LogicalOperatorTag.INNERJOIN, 
LogicalOperatorTag.LEFTOUTERJOIN))) {
+                EnumSet.of(LogicalOperatorTag.INNERJOIN, 
LogicalOperatorTag.LEFTOUTERJOIN))) {
             return new Pair<Boolean, ILogicalOperator>(false, null);
         }
         SubplanSpecialFlatteningCheckVisitor visitor = new 
SubplanSpecialFlatteningCheckVisitor();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 69ce2ea..4d26d25 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -235,7 +235,7 @@ public class TestNodeController {
                 }
                 IPushRuntime assignOp =
                         new AssignRuntimeFactory(outColumns, 
secondaryFieldAccessEvalFactories, projectionList, true)
-                                .createPushRuntime(ctx);
+                                .createPushRuntime(ctx)[0];
                 insertOp.setOutputFrameWriter(0, assignOp, 
primaryIndexInfo.rDesc);
                 assignOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
                 SecondaryIndexInfo secondaryIndexInfo = new 
SecondaryIndexInfo(primaryIndexInfo, secondaryIndex);
@@ -270,7 +270,7 @@ public class TestNodeController {
             NoMergePolicyFactory mergePolicyFactory, Map<String, String> 
mergePolicyProperties, int[] filterFields,
             int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
             StorageComponentProvider storageComponentProvider) throws 
HyracksDataException, AlgebricksException {
-        IPushRuntime emptyTupleOp = new 
EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx);
+        IPushRuntime emptyTupleOp = new 
EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx)[0];
         JobSpecification spec = new JobSpecification();
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, 
primaryKeyTypes, recordType, metaType,
                 mergePolicyFactory, mergePolicyProperties, filterFields, 
primaryKeyIndexes, primaryKeyIndicators);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp
new file mode 100644
index 0000000..c1ce4a3
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create type TType as open
+{ id: bigint };
+
+create dataset TData (TType) primary key id;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp
new file mode 100644
index 0000000..3ce9554
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into TData ( [
+{'id':1, 'x':1, 'f':19},
+{'id':2, 'x':2, 'f':12},
+{'id':3, 'x':1, 'f':10},
+{'id':4, 'x':2, 'f':17},
+{'id':5, 'x':1, 'f':12}
+]);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp
new file mode 100644
index 0000000..b825157
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+select x,
+array_sum((
+   select value a.f
+   from g as p
+   union all
+   select value a.f
+   from g as w
+)) s
+from TData as a
+group by a.x as x group as g
+order by x
+;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm
new file mode 100644
index 0000000..5071183
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm
@@ -0,0 +1,2 @@
+{ "x": 1, "s": 82 }
+{ "x": 2, "s": 58 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 16fa334..3a535cc 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -6430,6 +6430,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="subquery">
+      <compilation-unit name="non_unary_subplan_01">
+        <output-dir compare="Text">non_unary_subplan_01</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="subquery">
       <compilation-unit name="query-ASTERIXDB-1571">
         <output-dir compare="Text">query-ASTERIXDB-1571</output-dir>
       </compilation-unit>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 3ad7fd5..8341a33 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -125,7 +125,7 @@
 1060 = Field \"%1$s\" in the with clause must be of type %2$s
 1061 = Field \"%1$s\" in the with clause must contain sub field \"%2$s\"
 1062 = Merge policy parameters cannot be of type %1$s
-1063 = There is no dataverse with name %1$s
+1063 = There is no dataverse with name \"%1$s\"
 
 # Feed Errors
 3001 = Illegal state.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
 
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
index 1a54b92..b754533 100644
--- 
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
+++ 
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
@@ -410,7 +410,7 @@ public class FreeVariableVisitor extends 
AbstractSqlppQueryExpressionVisitor<Voi
     public Void visit(IndexAccessor ia, Collection<VariableExpr> freeVars) 
throws CompilationException {
         ia.getExpr().accept(this, freeVars);
         if (ia.getIndexExpr() != null) {
-            ia.getIndexExpr();
+            ia.getIndexExpr().accept(this, freeVars);
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 445ad4a..bbfde38 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -51,10 +51,10 @@ public class CommitRuntimeFactory implements 
IPushRuntimeFactory {
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
         IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
-        return new CommitRuntime(ctx, ((IJobEventListenerFactory) 
fact).getTxnId(datasetId), datasetId,
-                primaryKeyFields, isWriteTransaction,
-                
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
+        return new IPushRuntime[] { new CommitRuntime(ctx, 
((IJobEventListenerFactory) fact).getTxnId(datasetId),
+                datasetId, primaryKeyFields, isWriteTransaction,
+                
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink) };
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
 
b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 79b8f38..2d6123e 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -104,7 +104,7 @@ public class HeuristicCompilerFactoryBuilder extends 
AbstractCompilerFactoryBuil
                                 clusterLocations);
 
                         PlanCompiler pc = new PlanCompiler(context);
-                        return pc.compilePlan(plan, null, 
jobEventListenerFactory);
+                        return pc.compilePlan(plan, jobEventListenerFactory);
                     }
                 };
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 75b63f1..db9728b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -44,6 +44,7 @@ public enum PhysicalOperatorTag {
     MATERIALIZE,
     MICRO_PRE_CLUSTERED_GROUP_BY,
     MICRO_PRE_SORTED_DISTINCT_BY,
+    MICRO_UNION_ALL,
     NESTED_LOOP,
     NESTED_TUPLE_SOURCE,
     ONE_TO_ONE_EXCHANGE,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index 43cde22..29d6037 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
@@ -97,7 +99,7 @@ public abstract class AbstractPhysicalOperator implements 
IPhysicalOperator {
 
     /**
      * @return labels (0 or 1) for each input and output indicating the 
dependency between them.
-     *         The edges labeled as 1 must wait for the edges with label 0.
+     * The edges labeled as 1 must wait for the edges with label 0.
      */
     @Override
     public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator 
op) {
@@ -118,47 +120,61 @@ public abstract class AbstractPhysicalOperator implements 
IPhysicalOperator {
     protected AlgebricksPipeline[] compileSubplans(IOperatorSchema 
outerPlanSchema,
             AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, 
JobGenContext context)
             throws AlgebricksException {
-        AlgebricksPipeline[] subplans = new 
AlgebricksPipeline[npOp.getNestedPlans().size()];
+        List<List<AlgebricksPipeline>> subplans = 
compileSubplansImpl(outerPlanSchema, npOp, opSchema, context);
+        int n = subplans.size();
+        AlgebricksPipeline[] result = new AlgebricksPipeline[n];
+        for (int i = 0; i < n; i++) {
+            List<AlgebricksPipeline> subplanOps = subplans.get(i);
+            if (subplanOps.size() != 1) {
+                throw new AlgebricksException("Attempting to construct a 
nested plan with " + subplanOps.size()
+                        + " operator descriptors. Currently, nested plans can 
only consist in linear pipelines of "
+                        + "micro operators.");
+            }
+            result[i] = subplanOps.get(0);
+        }
+        return result;
+    }
+
+    protected List<List<AlgebricksPipeline>> 
compileSubplansImpl(IOperatorSchema outerPlanSchema,
+            AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, 
JobGenContext context)
+            throws AlgebricksException {
+        List<List<AlgebricksPipeline>> subplans = new 
ArrayList<>(npOp.getNestedPlans().size());
         PlanCompiler pc = new PlanCompiler(context);
-        int i = 0;
         for (ILogicalPlan p : npOp.getNestedPlans()) {
-            subplans[i++] = buildPipelineWithProjection(p, outerPlanSchema, 
npOp, opSchema, pc);
+            subplans.add(buildPipelineWithProjection(p, outerPlanSchema, npOp, 
opSchema, pc));
         }
         return subplans;
     }
 
-    private AlgebricksPipeline buildPipelineWithProjection(ILogicalPlan p, 
IOperatorSchema outerPlanSchema,
+    private List<AlgebricksPipeline> buildPipelineWithProjection(ILogicalPlan 
p, IOperatorSchema outerPlanSchema,
             AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, 
PlanCompiler pc)
             throws AlgebricksException {
         if (p.getRoots().size() > 1) {
             throw new NotImplementedException("Nested plans with several roots 
are not supported.");
         }
-        JobSpecification nestedJob = pc.compilePlan(p, outerPlanSchema, null);
+        JobSpecification nestedJob = pc.compileNestedPlan(p, outerPlanSchema);
         ILogicalOperator topOpInSubplan = p.getRoots().get(0).getValue();
         JobGenContext context = pc.getContext();
         IOperatorSchema topOpInSubplanScm = context.getSchema(topOpInSubplan);
         opSchema.addAllVariables(topOpInSubplanScm);
 
         Map<OperatorDescriptorId, IOperatorDescriptor> opMap = 
nestedJob.getOperatorMap();
-        if (opMap.size() != 1) {
-            throw new AlgebricksException("Attempting to construct a nested 
plan with " + opMap.size()
-                    + " operator descriptors. Currently, nested plans can only 
consist in linear pipelines of Asterix micro operators.");
+        List<? extends IOperatorDescriptor> metaOps = nestedJob.getMetaOps();
+        if (opMap.size() != metaOps.size()) {
+            for (IOperatorDescriptor opd : opMap.values()) {
+                if (!(opd instanceof AlgebricksMetaOperatorDescriptor)) {
+                    throw new AlgebricksException(
+                            "Can only generate jobs for pipelinable nested 
plans, not for " + opd.getClass().getName());
+                }
+            }
+            throw new IllegalStateException("Unexpected nested plan");
         }
 
-        for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> opEntry : 
opMap.entrySet()) {
-            IOperatorDescriptor opd = opEntry.getValue();
-            if (!(opd instanceof AlgebricksMetaOperatorDescriptor)) {
-                throw new AlgebricksException(
-                        "Can only generate Hyracks jobs for pipelinable 
Asterix nested plans, not for "
-                                + opd.getClass().getName());
-            }
+        List<AlgebricksPipeline> result = new ArrayList<>(metaOps.size());
+        for (IOperatorDescriptor opd : metaOps) {
             AlgebricksMetaOperatorDescriptor amod = 
(AlgebricksMetaOperatorDescriptor) opd;
-
-            return amod.getPipeline();
-            // we suppose that the top operator in the subplan already does the
-            // projection for us
+            result.add(amod.getPipeline());
         }
-
-        throw new IllegalStateException();
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java
new file mode 100644
index 0000000..a4d9576
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+
+public abstract class AbstractUnionAllPOperator extends 
AbstractPhysicalOperator {
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, 
IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op.getInputs().get(0).getValue();
+        IPartitioningProperty pp = 
op2.getDeliveredPhysicalProperties().getPartitioningProperty();
+        this.deliveredProperties = new StructuralPropertiesVector(pp, new 
ArrayList<>(0));
+    }
+
+    @Override
+    public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) {
+        StructuralPropertiesVector pv0 =
+                
OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new 
StructuralPropertiesVector(
+                        new 
RandomPartitioningProperty(context.getComputationNodeDomain()), null));
+        StructuralPropertiesVector pv1 =
+                
OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new 
StructuralPropertiesVector(
+                        new 
RandomPartitioningProperty(context.getComputationNodeDomain()), null));
+        return new PhysicalRequirements(new StructuralPropertiesVector[] { 
pv0, pv1 },
+                IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        List<Mutable<ILogicalOperator>> inputs = op.getInputs();
+        for (int i = 0; i < inputs.size(); i++) {
+            ILogicalOperator src = inputs.get(i).getValue();
+            builder.contributeGraphEdge(src, 0, op, i);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java
new file mode 100644
index 0000000..f5e992e
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import 
org.apache.hyracks.algebricks.runtime.operators.union.MicroUnionAllRuntimeFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class MicroUnionAllPOperator extends AbstractUnionAllPOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.MICRO_UNION_ALL;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        RecordDescriptor recordDescriptor =
+                
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
+
+        MicroUnionAllRuntimeFactory runtime = new 
MicroUnionAllRuntimeFactory(op.getInputs().size());
+        builder.contributeMicroOperator(op, runtime, recordDescriptor);
+
+        super.contributeRuntimeOperator(builder, context, op, opSchema, 
inputSchemas, outerPlanSchema);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
index d43ddab..95efbac 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
@@ -89,18 +89,18 @@ public class SubplanPOperator extends 
AbstractPhysicalOperator {
         if (subplan.getNestedPlans().size() != 1) {
             throw new NotImplementedException("Subplan currently works only 
for one nested plan with one root.");
         }
-        AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], 
subplan, opSchema, context);
-        assert subplans.length == 1;
-        AlgebricksPipeline np = subplans[0];
+        List<List<AlgebricksPipeline>> subplans = 
compileSubplansImpl(inputSchemas[0], subplan, opSchema, context);
+        assert subplans.size() == 1;
+        List<AlgebricksPipeline> np = subplans.get(0);
         RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), 
inputSchemas[0], context);
-        IMissingWriterFactory[] missingWriterFactories = new 
IMissingWriterFactory[np.getOutputWidth()];
+        IMissingWriterFactory[] missingWriterFactories = new 
IMissingWriterFactory[np.get(0).getOutputWidth()];
         for (int i = 0; i < missingWriterFactories.length; i++) {
             missingWriterFactories[i] = context.getMissingWriterFactory();
         }
-        SubplanRuntimeFactory runtime = new SubplanRuntimeFactory(np, 
missingWriterFactories, inputRecordDesc, null);
-
         RecordDescriptor recDesc = 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
+        SubplanRuntimeFactory runtime =
+                new SubplanRuntimeFactory(np, missingWriterFactories, 
inputRecordDesc, recDesc, null);
         builder.contributeMicroOperator(subplan, runtime, recDesc);
 
         ILogicalOperator src = op.getInputs().get(0).getValue();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
index a617064..4ccce92 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
@@ -18,29 +18,18 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
-import java.util.ArrayList;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
 
-public class UnionAllPOperator extends AbstractPhysicalOperator {
+public class UnionAllPOperator extends AbstractUnionAllPOperator {
 
     @Override
     public PhysicalOperatorTag getOperatorTag() {
@@ -53,48 +42,16 @@ public class UnionAllPOperator extends 
AbstractPhysicalOperator {
     }
 
     @Override
-    public void computeDeliveredProperties(ILogicalOperator op, 
IOptimizationContext context) {
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op.getInputs().get(0).getValue();
-        IPartitioningProperty pp = 
op2.getDeliveredPhysicalProperties().getPartitioningProperty();
-        this.deliveredProperties = new StructuralPropertiesVector(pp, new 
ArrayList<>(0));
-    }
-
-    @Override
-    public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) {
-        StructuralPropertiesVector pv0 =
-                
OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new 
StructuralPropertiesVector(
-                        new 
RandomPartitioningProperty(context.getComputationNodeDomain()), null));
-        StructuralPropertiesVector pv1 =
-                
OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new 
StructuralPropertiesVector(
-                        new 
RandomPartitioningProperty(context.getComputationNodeDomain()), null));
-        return new PhysicalRequirements(new StructuralPropertiesVector[] { 
pv0, pv1 },
-                IPartitioningRequirementsCoordinator.NO_COORDINATION);
-    }
-
-    @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
-
-        IOperatorDescriptorRegistry spec = builder.getJobSpec();
         RecordDescriptor recordDescriptor =
                 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
 
-        // at algebricks level, union all only accepts two inputs, although at
-        // hyracks
-        // level, there is no restrictions
-        UnionAllOperatorDescriptor opDesc = new 
UnionAllOperatorDescriptor(spec, 2, recordDescriptor);
+        UnionAllOperatorDescriptor opDesc =
+                new UnionAllOperatorDescriptor(builder.getJobSpec(), 
op.getInputs().size(), recordDescriptor);
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
-        ILogicalOperator src1 = op.getInputs().get(0).getValue();
-        builder.contributeGraphEdge(src1, 0, op, 0);
-        ILogicalOperator src2 = op.getInputs().get(1).getValue();
-        builder.contributeGraphEdge(src2, 0, op, 1);
-    }
 
-    @Override
-    public boolean expensiveThanMaterialization() {
-        return false;
+        super.contributeRuntimeOperator(builder, context, op, opSchema, 
inputSchemas, outerPlanSchema);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 249e66f..c574cd8 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -18,7 +18,10 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.util;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -202,7 +205,7 @@ public class OperatorManipulationUtil {
     public static Pair<ILogicalOperator, Map<LogicalVariable, 
LogicalVariable>> deepCopyWithNewVars(
             ILogicalOperator root, IOptimizationContext ctx) throws 
AlgebricksException {
         LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor =
-                new LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, null, 
true);
+                new LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, ctx, 
true);
         ILogicalOperator newRoot = deepCopyVisitor.deepCopy(root);
         return Pair.of(newRoot, 
deepCopyVisitor.getInputToOutputVariableMapping());
     }
@@ -327,4 +330,45 @@ public class OperatorManipulationUtil {
         return false;
     }
 
+    /**
+     * Returns all descendants of an operator that are leaf operators
+     *
+     * @param opRef given operator
+     * @return list containing all leaf descendants
+     */
+    public static List<Mutable<ILogicalOperator>> 
findLeafDescendantsOrSelf(Mutable<ILogicalOperator> opRef) {
+        List<Mutable<ILogicalOperator>> result = Collections.emptyList();
+
+        Deque<Mutable<ILogicalOperator>> queue = new ArrayDeque<>();
+        queue.add(opRef);
+        Mutable<ILogicalOperator> currentOpRef;
+        while ((currentOpRef = queue.pollLast()) != null) {
+            List<Mutable<ILogicalOperator>> inputs = 
currentOpRef.getValue().getInputs();
+            if (inputs.isEmpty()) {
+                if (result.isEmpty()) {
+                    result = new ArrayList<>();
+                }
+                result.add(currentOpRef);
+            } else {
+                queue.addAll(inputs);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Find operator in a given list of operator references
+     *
+     * @param list list to search in
+     * @param op   operator to find
+     * @return operator position in the given list or {@code -1} if not found
+     */
+    public static int indexOf(List<Mutable<ILogicalOperator>> list, 
ILogicalOperator op) {
+        for (int i = 0, ln = list.size(); i < ln; i++) {
+            if (list.get(i).getValue() == op) {
+                return i;
+            }
+        }
+        return -1;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index 13eef09..16992e7 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang3.ArrayUtils;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -34,6 +35,8 @@ import 
org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import 
org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -96,7 +99,7 @@ public class JobBuilder implements IHyracksJobBuilder {
     @Override
     public void contributeMicroOperator(ILogicalOperator op, 
IPushRuntimeFactory runtime, RecordDescriptor recDesc,
             AlgebricksPartitionConstraint pc) {
-        microOps.put(op, new Pair<IPushRuntimeFactory, 
RecordDescriptor>(runtime, recDesc));
+        microOps.put(op, new Pair<>(runtime, recDesc));
         revMicroOpMap.put(runtime, op);
         if (pc != null) {
             pcForMicroOps.put(op, pc);
@@ -171,6 +174,17 @@ public class JobBuilder implements IHyracksJobBuilder {
         setAllPartitionConstraints(tgtConstraints);
     }
 
+    public List<IOperatorDescriptor> getGeneratedMetaOps() {
+        List<IOperatorDescriptor> resultOps = new ArrayList<>();
+        for (IOperatorDescriptor opd : jobSpec.getOperatorMap().values()) {
+            if (opd instanceof AlgebricksMetaOperatorDescriptor) {
+                resultOps.add(opd);
+            }
+        }
+        resultOps.sort((op1, op2) -> sendsOutput(op1, op2) ? 1 : 
sendsOutput(op2, op1) ? -1 : 0);
+        return resultOps;
+    }
+
     private void setAllPartitionConstraints(Map<IConnectorDescriptor, 
TargetConstraint> tgtConstraints) {
         List<OperatorDescriptorId> roots = jobSpec.getRoots();
         setSpecifiedPartitionConstraints();
@@ -317,20 +331,30 @@ public class JobBuilder implements IHyracksJobBuilder {
         int n = opContents.size();
         IPushRuntimeFactory[] runtimeFactories = new IPushRuntimeFactory[n];
         RecordDescriptor[] internalRecordDescriptors = new RecordDescriptor[n];
-        int i = 0;
-        for (Pair<IPushRuntimeFactory, RecordDescriptor> p : opContents) {
+        for (int i = 0, ln = opContents.size(); i < ln; i++) {
+            Pair<IPushRuntimeFactory, RecordDescriptor> p = opContents.get(i);
             runtimeFactories[i] = p.first;
             internalRecordDescriptors[i] = p.second;
-            i++;
         }
         ILogicalOperator lastLogicalOp = revMicroOpMap.get(runtimeFactories[n 
- 1]);
         ArrayList<ILogicalOperator> outOps = outEdges.get(lastLogicalOp);
-        int outArity = (outOps == null) ? 0 : outOps.size();
+        int outArity = outOps == null ? 0 : outOps.size();
+        int[] outPositions = new int[outArity];
+        IPushRuntimeFactory[] outRuntimeFactories = new 
IPushRuntimeFactory[outArity];
+        if (outOps != null) {
+            for (int i = 0, ln = outOps.size(); i < ln; i++) {
+                ILogicalOperator outOp = outOps.get(i);
+                outPositions[i] = 
OperatorManipulationUtil.indexOf(outOp.getInputs(), lastLogicalOp);
+                Pair<IPushRuntimeFactory, RecordDescriptor> microOpPair = 
microOps.get(outOp);
+                outRuntimeFactories[i] = microOpPair != null ? 
microOpPair.first : null;
+            }
+        }
+
         ILogicalOperator firstLogicalOp = 
revMicroOpMap.get(runtimeFactories[0]);
         ArrayList<ILogicalOperator> inOps = inEdges.get(firstLogicalOp);
         int inArity = (inOps == null) ? 0 : inOps.size();
         return new AlgebricksMetaOperatorDescriptor(jobSpec, inArity, 
outArity, runtimeFactories,
-                internalRecordDescriptors);
+                internalRecordDescriptors, outRuntimeFactories, outPositions);
     }
 
     private void addMicroOpToMetaRuntimeOp(ILogicalOperator aop) {
@@ -344,7 +368,12 @@ public class JobBuilder implements IHyracksJobBuilder {
             return;
         }
         ILogicalOperator dest = destList.get(0);
+        int destInputPos = OperatorManipulationUtil.indexOf(dest.getInputs(), 
aop);
         Integer j = algebraicOpBelongingToMetaAsterixOp.get(dest);
+        if (destInputPos != 0) {
+            return;
+        }
+
         if (j == null && microOps.get(dest) != null) {
             algebraicOpBelongingToMetaAsterixOp.put(dest, k);
             List<Pair<IPushRuntimeFactory, RecordDescriptor>> aodContent1 = 
metaAsterixOpSkeletons.get(k);
@@ -362,7 +391,6 @@ public class JobBuilder implements IHyracksJobBuilder {
                 }
             }
         }
-
     }
 
     private int createNewMetaOpInfo(ILogicalOperator aop) {
@@ -387,4 +415,28 @@ public class JobBuilder implements IHyracksJobBuilder {
         }
     }
 
+    private boolean sendsOutput(IOperatorDescriptor src, IOperatorDescriptor 
trg) {
+        AlgebricksPipeline srcPipeline = ((AlgebricksMetaOperatorDescriptor) 
src).getPipeline();
+        IPushRuntimeFactory[] srcOutRts = 
srcPipeline.getOutputRuntimeFactories();
+        if (srcOutRts == null) {
+            return false;
+        }
+        IPushRuntimeFactory[] trgRts = ((AlgebricksMetaOperatorDescriptor) 
trg).getPipeline().getRuntimeFactories();
+        for (IPushRuntimeFactory srcOutRt : srcOutRts) {
+            if (ArrayUtils.contains(trgRts, srcOutRt)) {
+                return true;
+            }
+            ILogicalOperator srcOutOp = revMicroOpMap.get(srcOutRt);
+            if (srcOutOp != null) {
+                Integer k = algebraicOpBelongingToMetaAsterixOp.get(srcOutOp);
+                if (k != null) {
+                    AlgebricksMetaOperatorDescriptor srcOutMetaOp = 
metaAsterixOps.get(k);
+                    if (srcOutMetaOp != null && sendsOutput(srcOutMetaOp, 
trg)) {
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index 7409247..ddda258 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -48,14 +48,24 @@ public class PlanCompiler {
         return context;
     }
 
-    public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema 
outerPlanSchema,
+    public JobSpecification compilePlan(ILogicalPlan plan, 
IJobletEventListenerFactory jobEventListenerFactory)
+            throws AlgebricksException {
+        return compilePlanImpl(plan, false, null, jobEventListenerFactory);
+    }
+
+    public JobSpecification compileNestedPlan(ILogicalPlan plan, 
IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        return compilePlanImpl(plan, true, outerPlanSchema, null);
+    }
+
+    private JobSpecification compilePlanImpl(ILogicalPlan plan, boolean 
isNestedPlan, IOperatorSchema outerPlanSchema,
             IJobletEventListenerFactory jobEventListenerFactory) throws 
AlgebricksException {
         JobSpecification spec = new JobSpecification(context.getFrameSize());
         if (jobEventListenerFactory != null) {
             spec.setJobletEventListenerFactory(jobEventListenerFactory);
         }
-        List<ILogicalOperator> rootOps = new ArrayList<ILogicalOperator>();
-        IHyracksJobBuilder builder = new JobBuilder(spec, 
context.getClusterLocations());
+        List<ILogicalOperator> rootOps = new ArrayList<>();
+        JobBuilder builder = new JobBuilder(spec, 
context.getClusterLocations());
         for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
             compileOpRef(opRef, spec, builder, outerPlanSchema);
             rootOps.add(opRef.getValue());
@@ -66,6 +76,9 @@ public class PlanCompiler {
         spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
         // Do not do activity cluster planning because it is slow on large 
clusters
         spec.setUseConnectorPolicyForScheduling(false);
+        if (isNestedPlan) {
+            spec.setMetaOps(builder.getGeneratedMetaOps());
+        }
         return spec;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 5b6285a..d277043 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -67,6 +67,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.IntersectPO
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.LeftOuterUnnestPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreSortedDistinctByPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroUnionAllPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
@@ -200,11 +201,11 @@ public class SetAlgebricksPhysicalOperatorsRule 
implements IAlgebraicRewriteRule
                     break;
                 }
                 case INNERJOIN: {
-                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context);
+                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp, 
context);
                     break;
                 }
                 case LEFTOUTERJOIN: {
-                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, context);
+                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, 
topLevelOp, context);
                     break;
                 }
                 case LIMIT: {
@@ -259,11 +260,19 @@ public class SetAlgebricksPhysicalOperatorsRule 
implements IAlgebraicRewriteRule
                     break;
                 }
                 case UNIONALL: {
-                    op.setPhysicalOperator(new UnionAllPOperator());
+                    if (topLevelOp) {
+                        op.setPhysicalOperator(new UnionAllPOperator());
+                    } else {
+                        op.setPhysicalOperator(new MicroUnionAllPOperator());
+                    }
                     break;
                 }
                 case INTERSECT: {
-                    op.setPhysicalOperator(new IntersectPOperator());
+                    if (topLevelOp) {
+                        op.setPhysicalOperator(new IntersectPOperator());
+                    } else {
+                        throw new IllegalStateException("Micro operator not 
implemented for: " + op.getOperatorTag());
+                    }
                     break;
                 }
                 case UNNEST: {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
index 9d3b311..3efa46b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
@@ -139,6 +139,10 @@ public class PushSubplanIntoGroupByRule implements 
IAlgebraicRewriteRule {
                 while (upperSubplanRootRefIterator.hasNext()) {
                     Mutable<ILogicalOperator> rootOpRef = 
upperSubplanRootRefIterator.next();
 
+                    if (downToNts(rootOpRef) == null) {
+                        continue;
+                    }
+
                     // Collects free variables in the root operator of a 
nested plan and its descent.
                     Set<LogicalVariable> freeVars = new ListSet<>();
                     
OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) 
rootOpRef.getValue(),
@@ -154,6 +158,9 @@ public class PushSubplanIntoGroupByRule implements 
IAlgebraicRewriteRule {
                             // Sets the nts for a original subplan.
                             Mutable<ILogicalOperator> originalGbyRootOpRef = 
gbyNestedPlan.getRoots().get(rootIndex);
                             Mutable<ILogicalOperator> originalGbyNtsRef = 
downToNts(originalGbyRootOpRef);
+                            if (originalGbyNtsRef == null) {
+                                continue;
+                            }
                             NestedTupleSourceOperator originalNts =
                                     (NestedTupleSourceOperator) 
originalGbyNtsRef.getValue();
                             originalNts.setDataSourceReference(new 
MutableObject<>(gby));
@@ -265,11 +272,13 @@ public class PushSubplanIntoGroupByRule implements 
IAlgebraicRewriteRule {
     }
 
     private Mutable<ILogicalOperator> downToNts(Mutable<ILogicalOperator> 
opRef) {
-        Mutable<ILogicalOperator> currentOpRef = opRef;
-        while (currentOpRef.getValue().getInputs().size() > 0) {
-            currentOpRef = currentOpRef.getValue().getInputs().get(0);
+        List<Mutable<ILogicalOperator>> leafOps = 
OperatorManipulationUtil.findLeafDescendantsOrSelf(opRef);
+        if (leafOps.size() == 1) {
+            Mutable<ILogicalOperator> leafOp = leafOps.get(0);
+            if (leafOp.getValue().getOperatorTag() == 
LogicalOperatorTag.NESTEDTUPLESOURCE) {
+                return leafOp;
+            }
         }
-        return currentOpRef;
+        return null;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 6efda52..0bc2a5e 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -51,8 +51,11 @@ public class JoinUtils {
     private JoinUtils() {
     }
 
-    public static void 
setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, 
IOptimizationContext context)
-            throws AlgebricksException {
+    public static void 
setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, boolean 
topLevelOp,
+            IOptimizationContext context) throws AlgebricksException {
+        if (!topLevelOp) {
+            throw new IllegalStateException("Micro operator not implemented 
for: " + op.getOperatorTag());
+        }
         List<LogicalVariable> sideLeft = new LinkedList<>();
         List<LogicalVariable> sideRight = new LinkedList<>();
         List<LogicalVariable> varsLeft = 
op.getInputs().get(0).getValue().getSchema();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml 
b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
index bbea2ab..dafb6ab 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
@@ -73,6 +73,10 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
index 379944b..f24d38d 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
@@ -27,10 +27,15 @@ public class AlgebricksPipeline implements Serializable {
     private static final long serialVersionUID = 1L;
     private final IPushRuntimeFactory[] runtimeFactories;
     private final RecordDescriptor[] recordDescriptors;
+    private final IPushRuntimeFactory[] outputRuntimeFactories;
+    private final int[] outputPositions;
 
-    public AlgebricksPipeline(IPushRuntimeFactory[] runtimeFactories, 
RecordDescriptor[] recordDescriptors) {
+    public AlgebricksPipeline(IPushRuntimeFactory[] runtimeFactories, 
RecordDescriptor[] recordDescriptors,
+            IPushRuntimeFactory[] outputRuntimeFactories, int[] 
outputPositions) {
         this.runtimeFactories = runtimeFactories;
         this.recordDescriptors = recordDescriptors;
+        this.outputRuntimeFactories = outputRuntimeFactories;
+        this.outputPositions = outputPositions;
         // this.projectedColumns = projectedColumns;
     }
 
@@ -46,8 +51,15 @@ public class AlgebricksPipeline implements Serializable {
         return recordDescriptors[recordDescriptors.length - 1].getFieldCount();
     }
 
+    public IPushRuntimeFactory[] getOutputRuntimeFactories() {
+        return outputRuntimeFactories;
+    }
+
+    public int[] getOutputPositions() {
+        return outputPositions;
+    }
+
     // public int[] getProjectedColumns() {
     // return projectedColumns;
     // }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
index de6cddd..f90de81 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
@@ -24,5 +24,5 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IPushRuntimeFactory extends Serializable {
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException;
+    IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 94af04f..0a578f6 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -168,7 +168,7 @@ public class NestedPlansAccumulatingAggregatorFactory 
extends AbstractAccumulati
         // should enforce protocol
         boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         for (int i = runtimeFactories.length - 1; i >= 0; i--) {
-            IPushRuntime newRuntime = 
runtimeFactories[i].createPushRuntime(ctx);
+            IPushRuntime newRuntime = 
runtimeFactories[i].createPushRuntime(ctx)[0];
             newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : 
newRuntime;
             start = enforce ? EnforcePushRuntime.enforce(start) : start;
             newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index c261df8..75b2fb2 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -149,7 +149,7 @@ public class NestedPlansRunningAggregatorFactory implements 
IAggregatorDescripto
         IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
         RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
         for (int i = runtimeFactories.length - 1; i >= 0; i--) {
-            IPushRuntime newRuntime = 
runtimeFactories[i].createPushRuntime(ctx);
+            IPushRuntime newRuntime = 
runtimeFactories[i].createPushRuntime(ctx)[0];
             newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : 
newRuntime;
             start = enforce ? EnforceFrameWriter.enforce(start) : start;
             newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);

Reply via email to