[ASTERIXDB-2244][RT] Implement micro union-all operator - user model changes: no - storage format changes: no - interface changes: no
Details: - implement support for binary micro operators in subplans - implement micro union-all operator - fix free variables visitor Change-Id: I11be926f175889978c144dd4483ec565d3d86e2d Reviewed-on: https://asterix-gerrit.ics.uci.edu/2277 Reviewed-by: Till Westmann <ti...@apache.org> Contrib: Till Westmann <ti...@apache.org> Integration-Tests: Till Westmann <ti...@apache.org> Tested-by: Till Westmann <ti...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/10e5ad1a Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/10e5ad1a Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/10e5ad1a Branch: refs/heads/master Commit: 10e5ad1a789d436639863273028f565960dface0 Parents: caf4306 Author: Dmitry Lychagin <dmitry.lycha...@couchbase.com> Authored: Mon Jan 29 11:14:45 2018 -0800 Committer: Till Westmann <ti...@apache.org> Committed: Wed Jan 31 16:22:47 2018 -0800 ---------------------------------------------------------------------- .../rules/SetAsterixPhysicalOperatorsRule.java | 18 +- ...ineSubplanInputForNestedTupleSourceRule.java | 79 ++++--- .../rules/subplan/SubplanFlatteningUtil.java | 3 +- .../app/bootstrap/TestNodeController.java | 4 +- .../non_unary_subplan_01.1.ddl.sqlpp | 27 +++ .../non_unary_subplan_01.2.update.sqlpp | 28 +++ .../non_unary_subplan_01.3.query.sqlpp | 33 +++ .../non_unary_subplan_01.1.adm | 2 + .../resources/runtimets/testsuite_sqlpp.xml | 5 + .../main/resources/asx_errormsg/en.properties | 2 +- .../lang/sqlpp/visitor/FreeVariableVisitor.java | 2 +- .../runtime/CommitRuntimeFactory.java | 8 +- .../api/HeuristicCompilerFactoryBuilder.java | 2 +- .../core/algebra/base/PhysicalOperatorTag.java | 1 + .../physical/AbstractPhysicalOperator.java | 60 +++-- .../physical/AbstractUnionAllPOperator.java | 78 +++++++ .../physical/MicroUnionAllPOperator.java | 56 +++++ .../operators/physical/SubplanPOperator.java | 12 +- .../operators/physical/UnionAllPOperator.java | 51 +--- .../algebra/util/OperatorManipulationUtil.java | 46 +++- .../algebricks/core/jobgen/impl/JobBuilder.java | 66 +++++- .../core/jobgen/impl/PlanCompiler.java | 19 +- .../SetAlgebricksPhysicalOperatorsRule.java | 17 +- .../subplan/PushSubplanIntoGroupByRule.java | 19 +- .../algebricks/rewriter/util/JoinUtils.java | 7 +- .../algebricks/algebricks-runtime/pom.xml | 4 + .../runtime/base/AlgebricksPipeline.java | 16 +- .../runtime/base/IPushRuntimeFactory.java | 2 +- ...estedPlansAccumulatingAggregatorFactory.java | 2 +- .../NestedPlansRunningAggregatorFactory.java | 2 +- ...AbstractOneInputOneOutputRuntimeFactory.java | 4 +- .../operators/base/SinkRuntimeFactory.java | 7 +- .../meta/AlgebricksMetaOperatorDescriptor.java | 11 +- .../operators/meta/PipelineAssembler.java | 39 +++- .../operators/meta/SubplanRuntimeFactory.java | 232 +++++++++++++------ .../std/EmptyTupleSourceRuntimeFactory.java | 6 +- .../std/NestedTupleSourceRuntimeFactory.java | 4 +- .../operators/std/PrinterRuntimeFactory.java | 4 +- .../operators/std/SinkWriterRuntimeFactory.java | 4 +- .../union/MicroUnionAllRuntimeFactory.java | 112 +++++++++ .../tests/pushruntime/PushRuntimeTest.java | 11 +- .../hyracks/api/job/JobSpecification.java | 10 + 42 files changed, 848 insertions(+), 267 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java index 464476b..4afccb0 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java @@ -80,20 +80,20 @@ public class SetAsterixPhysicalOperatorsRule implements IAlgebraicRewriteRule { return false; } - computeDefaultPhysicalOp(op, context); + computeDefaultPhysicalOp(op, true, context); context.addToDontApplySet(this, op); return true; } - private static void setPhysicalOperators(ILogicalPlan plan, IOptimizationContext context) + private static void setPhysicalOperators(ILogicalPlan plan, boolean topLevelOp, IOptimizationContext context) throws AlgebricksException { for (Mutable<ILogicalOperator> root : plan.getRoots()) { - computeDefaultPhysicalOp((AbstractLogicalOperator) root.getValue(), context); + computeDefaultPhysicalOp((AbstractLogicalOperator) root.getValue(), topLevelOp, context); } } - private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, IOptimizationContext context) - throws AlgebricksException { + private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, boolean topLevelOp, + IOptimizationContext context) throws AlgebricksException { PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig(); if (op.getOperatorTag().equals(LogicalOperatorTag.GROUP)) { GroupByOperator gby = (GroupByOperator) op; @@ -207,11 +207,11 @@ public class SetAsterixPhysicalOperatorsRule implements IAlgebraicRewriteRule { if (op.getPhysicalOperator() == null) { switch (op.getOperatorTag()) { case INNERJOIN: { - JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context); + JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp, context); break; } case LEFTOUTERJOIN: { - JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, context); + JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, topLevelOp, context); break; } case UNNEST_MAP: @@ -277,11 +277,11 @@ public class SetAsterixPhysicalOperatorsRule implements IAlgebraicRewriteRule { if (op.hasNestedPlans()) { AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op; for (ILogicalPlan p : nested.getNestedPlans()) { - setPhysicalOperators(p, context); + setPhysicalOperators(p, false, context); } } for (Mutable<ILogicalOperator> opRef : op.getInputs()) { - computeDefaultPhysicalOp((AbstractLogicalOperator) opRef.getValue(), context); + computeDefaultPhysicalOp((AbstractLogicalOperator) opRef.getValue(), topLevelOp, context); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java index a9cd806..3edccec 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java @@ -19,6 +19,7 @@ package org.apache.asterix.optimizer.rules.subplan; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -56,21 +57,21 @@ import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl; import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; -import com.google.common.collect.ImmutableSet; - /* -This rule is to remove SubplanOperators containing DataScan, InnerJoin, LeftOuterJoin, UnionAll or Distinct. Given a qualified Subplan operator called S1, -Let's call its input operator O1. +This rule is to remove SubplanOperators containing DataScan, InnerJoin, LeftOuterJoin. +Given a qualified Subplan operator called S1, Let's call its input operator O1. General Cases We have the following rewritings for general cases: R1. Replace all NestedTupleSourceOperators in S1 with deep-copies (with new variables) of the query plan rooted at O1; -R2. Add a LeftOuterOperatorJoinOperator (let's call it LJ) between O1 and the SubplanOperator's root operator's input (let's call it SO1), - where O1 is the left branch and SO1 is the right branch; -R3. The deep copy of the primary key variables in O1 should be preserved from an inlined NestedTupleSourceOperator to SO1. - The join condition of LJ is the equality between the primary key variables in O1 and its deep copied version at SO1; +R2. Add a LeftOuterOperatorJoinOperator (let's call it LJ) between O1 and the SubplanOperator's root operator's input + (let's call it SO1),where O1 is the left branch and SO1 is the right branch; +R3. The deep copy of the primary key variables in O1 should be preserved from an inlined NestedTupleSourceOperator + to SO1. The join condition of LJ is the equality between the primary key variables in O1 and its deep copied + version at SO1; R4. A variable v indicating non-match tuples is assigned to TRUE between LJ and SO1; -R5. On top of the LJ, add a GroupByOperaptor in which the nested plan consists of the S1's root operator, i.e., an aggregate operator. +R5. On top of the LJ, add a GroupByOperaptor in which the nested plan consists of the S1's root operator, + i.e., an aggregate operator. Below the aggregate, there is a not-null-filter on variable v. The group key is the primary key variables in O1. This is an abstract example for the rewriting mechanism described above: @@ -102,10 +103,12 @@ After rewriting: ..... --Deepcopy_The_Plan_Rooted_At_InputOp_With_New_Variables(InputOp) -In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp, while v_rc_1, ..., v_rc_n are their corresponding variables populated from the deepcopy of InputOp. -"Covering" variables form a set of variables that can imply all live variables. v_l1, ....v_ln in the decoration part of the added group-by operator are all live variables -at InputOp except the covering variables v_lc_1, ..., v_lc_n. In the current implementation, we use "covering" variables as primary key variables. In the next version, we -will use the real primary key variables, which will fix ASTERIXDB-1168. +In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp, while v_rc_1, ..., v_rc_n are their +corresponding variables populated from the deepcopy of InputOp. +"Covering" variables form a set of variables that can imply all live variables. v_l1, ....v_ln in the decoration part +of the added group-by operator are all live variables at InputOp except the covering variables v_lc_1, ..., v_lc_n. +In the current implementation, we use "covering" variables as primary key variables. +In the next version, we will use the real primary key variables, which will fix ASTERIXDB-1168. Here is a concrete example of the general case rewriting (optimizerts/queries/nested_loj4.aql). Before plan: @@ -157,14 +160,19 @@ b. if J1 is an inner join, one input pipeline of J1 has a NestedTupleSource desc c. if J1 is a left outer join, the left branch of J1 has a NestedTupleSource descendant (let's call it N1), d. there is no tuple dropping from N1 to J1 -Rewriting R2 is not necessary since before J1, all tuples from N1 are preserved. But the following rewritings are needed: +Rewriting R2 is not necessary since before J1, all tuples from N1 are preserved. But the following rewritings are +needed: R1'. Replace N1 by the O1 (no additional deep copy); R2'. All inner joins on the path from N1 to J1 (including J1) become left-outer joins with the same join conditions; -R3'. If N1 resides in the right branch of an inner join (let's call it J2) in the path from N1 to J1, switch the left and right branches of J2; -R4'. For every left join from N1 to J1 transformed from an inner join, a variable vi indicating non-match tuples is assigned to TRUE in its right branch; -R5'. On top of J1, a GroupByOperaptor G1 is added where the group-by key is the primary key of O1 and the nested query plan for aggregation is the nested pipeline - on top of J1 with an added not-null-filter to check all vi are not null. -R6'. All other NestedTupleSourceOperators in the subplan is inlined with deep copies (with new variables) of the query plan rooted at O1. +R3'. If N1 resides in the right branch of an inner join (let's call it J2) in the path from N1 to J1, + switch the left and right branches of J2; +R4'. For every left join from N1 to J1 transformed from an inner join, a variable vi indicating non-match tuples + is assigned to TRUE in its right branch; +R5'. On top of J1, a GroupByOperaptor G1 is added where the group-by key is the primary key of O1 and + the nested query plan for aggregation is the nested pipeline on top of J1 with an added not-null-filter + to check all vi are not null. +R6'. All other NestedTupleSourceOperators in the subplan is inlined with deep copies (with new variables) + of the query plan rooted at O1. This is an abstract example for the special rewriting mechanism described above: Before rewriting: @@ -197,9 +205,10 @@ After rewriting: â Assign v_new=TRUE â ..... (L1) -In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp and v_l1, ....v_ln in the decoration part of the added group-by operator are all live variables -at InputOp except the covering variables v_lc_1, ..., v_lc_n. In the current implementation, we use "covering" variables as primary key variables. In the next version, -we will use the real primary key variables, which will fix ASTERIXDB-1168. +In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp and v_l1, ....v_ln in the decoration part +of the added group-by operator are all live variables at InputOp except the covering variables v_lc_1, ..., v_lc_n. +In the current implementation, we use "covering" variables as primary key variables. +In the next version, we will use the real primary key variables, which will fix ASTERIXDB-1168. Here is a concrete example (optimizerts/queries/nested_loj2.aql). . Before plan: @@ -343,10 +352,8 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew private Pair<Boolean, LinkedHashMap<LogicalVariable, LogicalVariable>> applyGeneralFlattening( Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { SubplanOperator subplanOp = (SubplanOperator) opRef.getValue(); - if (!SubplanFlatteningUtil.containsOperators(subplanOp, - ImmutableSet.of(LogicalOperatorTag.DATASOURCESCAN, LogicalOperatorTag.INNERJOIN, - // We don't have nested runtime for union-all and distinct hence we have to include them here. - LogicalOperatorTag.LEFTOUTERJOIN, LogicalOperatorTag.UNIONALL, LogicalOperatorTag.DISTINCT))) { + if (!SubplanFlatteningUtil.containsOperators(subplanOp, EnumSet.of(LogicalOperatorTag.DATASOURCESCAN, + LogicalOperatorTag.INNERJOIN, LogicalOperatorTag.LEFTOUTERJOIN))) { return new Pair<>(false, new LinkedHashMap<>()); } Mutable<ILogicalOperator> inputOpRef = subplanOp.getInputs().get(0); @@ -402,10 +409,8 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew context.computeAndSetTypeEnvironmentForOperator(leftOuterJoinOp); // Creates group-by operator. - List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = - new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>(); - List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = - new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>(); + List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<>(); + List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<>(); List<ILogicalPlan> nestedPlans = new ArrayList<>(); GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans); @@ -435,8 +440,8 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew lowestAggregateRefInSubplan.getValue().getInputs().add(currentOpRef); } - // Adds a select operator into the nested plan for group-by to remove tuples with NULL on {@code assignVar}, i.e., - // subplan input tuples that are filtered out within a subplan. + // Adds a select operator into the nested plan for group-by to remove tuples with NULL on {@code assignVar}, + // i.e., subplan input tuples that are filtered out within a subplan. Mutable<ILogicalExpression> filterVarExpr = new MutableObject<>(new VariableReferenceExpression(assignVar)); List<Mutable<ILogicalExpression>> args = new ArrayList<>(); args.add(filterVarExpr); @@ -500,10 +505,8 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew Mutable<ILogicalOperator> topJoinRef = notNullVarsAndTopJoinRef.second; // Creates a group-by operator. - List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = - new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>(); - List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = - new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>(); + List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<>(); + List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<>(); GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, subplanOp.getNestedPlans()); for (LogicalVariable coverVar : primaryKeyVars) { @@ -521,8 +524,8 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew groupbyOp.getInputs().add(new MutableObject<>(topJoinRef.getValue())); if (!notNullVars.isEmpty()) { - // Adds a select operator into the nested plan for group-by to remove tuples with NULL on {@code assignVar}, i.e., - // subplan input tuples that are filtered out within a subplan. + // Adds a select operator into the nested plan for group-by to remove tuples with NULL on {@code assignVar}, + // i.e., subplan input tuples that are filtered out within a subplan. List<Mutable<ILogicalExpression>> nullCheckExprRefs = new ArrayList<>(); for (LogicalVariable notNullVar : notNullVars) { Mutable<ILogicalExpression> filterVarExpr = http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java index dba5d47..894097d 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.optimizer.rules.subplan; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -208,7 +209,7 @@ class SubplanFlatteningUtil { if (!OperatorManipulationUtil.ancestorOfOperators( subplanOp.getNestedPlans().get(0).getRoots().get(0).getValue(), // we don't need to check recursively for this special rewriting. - ImmutableSet.of(LogicalOperatorTag.INNERJOIN, LogicalOperatorTag.LEFTOUTERJOIN))) { + EnumSet.of(LogicalOperatorTag.INNERJOIN, LogicalOperatorTag.LEFTOUTERJOIN))) { return new Pair<Boolean, ILogicalOperator>(false, null); } SubplanSpecialFlatteningCheckVisitor visitor = new SubplanSpecialFlatteningCheckVisitor(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index 69ce2ea..4d26d25 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -235,7 +235,7 @@ public class TestNodeController { } IPushRuntime assignOp = new AssignRuntimeFactory(outColumns, secondaryFieldAccessEvalFactories, projectionList, true) - .createPushRuntime(ctx); + .createPushRuntime(ctx)[0]; insertOp.setOutputFrameWriter(0, assignOp, primaryIndexInfo.rDesc); assignOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc); SecondaryIndexInfo secondaryIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, secondaryIndex); @@ -270,7 +270,7 @@ public class TestNodeController { NoMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators, StorageComponentProvider storageComponentProvider) throws HyracksDataException, AlgebricksException { - IPushRuntime emptyTupleOp = new EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx); + IPushRuntime emptyTupleOp = new EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx)[0]; JobSpecification spec = new JobSpecification(); PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp new file mode 100644 index 0000000..c1ce4a3 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse test if exists; +create dataverse test; +use test; + +create type TType as open +{ id: bigint }; + +create dataset TData (TType) primary key id; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp new file mode 100644 index 0000000..3ce9554 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; + +insert into TData ( [ +{'id':1, 'x':1, 'f':19}, +{'id':2, 'x':2, 'f':12}, +{'id':3, 'x':1, 'f':10}, +{'id':4, 'x':2, 'f':17}, +{'id':5, 'x':1, 'f':12} +]); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp new file mode 100644 index 0000000..b825157 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; + +select x, +array_sum(( + select value a.f + from g as p + union all + select value a.f + from g as w +)) s +from TData as a +group by a.x as x group as g +order by x +; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm new file mode 100644 index 0000000..5071183 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm @@ -0,0 +1,2 @@ +{ "x": 1, "s": 82 } +{ "x": 2, "s": 58 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index 16fa334..3a535cc 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -6430,6 +6430,11 @@ </compilation-unit> </test-case> <test-case FilePath="subquery"> + <compilation-unit name="non_unary_subplan_01"> + <output-dir compare="Text">non_unary_subplan_01</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="subquery"> <compilation-unit name="query-ASTERIXDB-1571"> <output-dir compare="Text">query-ASTERIXDB-1571</output-dir> </compilation-unit> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 3ad7fd5..8341a33 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -125,7 +125,7 @@ 1060 = Field \"%1$s\" in the with clause must be of type %2$s 1061 = Field \"%1$s\" in the with clause must contain sub field \"%2$s\" 1062 = Merge policy parameters cannot be of type %1$s -1063 = There is no dataverse with name %1$s +1063 = There is no dataverse with name \"%1$s\" # Feed Errors 3001 = Illegal state. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java index 1a54b92..b754533 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java @@ -410,7 +410,7 @@ public class FreeVariableVisitor extends AbstractSqlppQueryExpressionVisitor<Voi public Void visit(IndexAccessor ia, Collection<VariableExpr> freeVars) throws CompilationException { ia.getExpr().accept(this, freeVars); if (ia.getIndexExpr() != null) { - ia.getIndexExpr(); + ia.getIndexExpr().accept(this, freeVars); } return null; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java index 445ad4a..bbfde38 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java @@ -51,10 +51,10 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory { } @Override - public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); - return new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(datasetId), datasetId, - primaryKeyFields, isWriteTransaction, - datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink); + return new IPushRuntime[] { new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(datasetId), + datasetId, primaryKeyFields, isWriteTransaction, + datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink) }; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java index 79b8f38..2d6123e 100644 --- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java +++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java @@ -104,7 +104,7 @@ public class HeuristicCompilerFactoryBuilder extends AbstractCompilerFactoryBuil clusterLocations); PlanCompiler pc = new PlanCompiler(context); - return pc.compilePlan(plan, null, jobEventListenerFactory); + return pc.compilePlan(plan, jobEventListenerFactory); } }; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java index 75b63f1..db9728b 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java @@ -44,6 +44,7 @@ public enum PhysicalOperatorTag { MATERIALIZE, MICRO_PRE_CLUSTERED_GROUP_BY, MICRO_PRE_SORTED_DISTINCT_BY, + MICRO_UNION_ALL, NESTED_LOOP, NESTED_TUPLE_SOURCE, ONE_TO_ONE_EXCHANGE, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java index 43cde22..29d6037 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.algebricks.core.algebra.operators.physical; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint; @@ -97,7 +99,7 @@ public abstract class AbstractPhysicalOperator implements IPhysicalOperator { /** * @return labels (0 or 1) for each input and output indicating the dependency between them. - * The edges labeled as 1 must wait for the edges with label 0. + * The edges labeled as 1 must wait for the edges with label 0. */ @Override public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) { @@ -118,47 +120,61 @@ public abstract class AbstractPhysicalOperator implements IPhysicalOperator { protected AlgebricksPipeline[] compileSubplans(IOperatorSchema outerPlanSchema, AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException { - AlgebricksPipeline[] subplans = new AlgebricksPipeline[npOp.getNestedPlans().size()]; + List<List<AlgebricksPipeline>> subplans = compileSubplansImpl(outerPlanSchema, npOp, opSchema, context); + int n = subplans.size(); + AlgebricksPipeline[] result = new AlgebricksPipeline[n]; + for (int i = 0; i < n; i++) { + List<AlgebricksPipeline> subplanOps = subplans.get(i); + if (subplanOps.size() != 1) { + throw new AlgebricksException("Attempting to construct a nested plan with " + subplanOps.size() + + " operator descriptors. Currently, nested plans can only consist in linear pipelines of " + + "micro operators."); + } + result[i] = subplanOps.get(0); + } + return result; + } + + protected List<List<AlgebricksPipeline>> compileSubplansImpl(IOperatorSchema outerPlanSchema, + AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, JobGenContext context) + throws AlgebricksException { + List<List<AlgebricksPipeline>> subplans = new ArrayList<>(npOp.getNestedPlans().size()); PlanCompiler pc = new PlanCompiler(context); - int i = 0; for (ILogicalPlan p : npOp.getNestedPlans()) { - subplans[i++] = buildPipelineWithProjection(p, outerPlanSchema, npOp, opSchema, pc); + subplans.add(buildPipelineWithProjection(p, outerPlanSchema, npOp, opSchema, pc)); } return subplans; } - private AlgebricksPipeline buildPipelineWithProjection(ILogicalPlan p, IOperatorSchema outerPlanSchema, + private List<AlgebricksPipeline> buildPipelineWithProjection(ILogicalPlan p, IOperatorSchema outerPlanSchema, AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, PlanCompiler pc) throws AlgebricksException { if (p.getRoots().size() > 1) { throw new NotImplementedException("Nested plans with several roots are not supported."); } - JobSpecification nestedJob = pc.compilePlan(p, outerPlanSchema, null); + JobSpecification nestedJob = pc.compileNestedPlan(p, outerPlanSchema); ILogicalOperator topOpInSubplan = p.getRoots().get(0).getValue(); JobGenContext context = pc.getContext(); IOperatorSchema topOpInSubplanScm = context.getSchema(topOpInSubplan); opSchema.addAllVariables(topOpInSubplanScm); Map<OperatorDescriptorId, IOperatorDescriptor> opMap = nestedJob.getOperatorMap(); - if (opMap.size() != 1) { - throw new AlgebricksException("Attempting to construct a nested plan with " + opMap.size() - + " operator descriptors. Currently, nested plans can only consist in linear pipelines of Asterix micro operators."); + List<? extends IOperatorDescriptor> metaOps = nestedJob.getMetaOps(); + if (opMap.size() != metaOps.size()) { + for (IOperatorDescriptor opd : opMap.values()) { + if (!(opd instanceof AlgebricksMetaOperatorDescriptor)) { + throw new AlgebricksException( + "Can only generate jobs for pipelinable nested plans, not for " + opd.getClass().getName()); + } + } + throw new IllegalStateException("Unexpected nested plan"); } - for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> opEntry : opMap.entrySet()) { - IOperatorDescriptor opd = opEntry.getValue(); - if (!(opd instanceof AlgebricksMetaOperatorDescriptor)) { - throw new AlgebricksException( - "Can only generate Hyracks jobs for pipelinable Asterix nested plans, not for " - + opd.getClass().getName()); - } + List<AlgebricksPipeline> result = new ArrayList<>(metaOps.size()); + for (IOperatorDescriptor opd : metaOps) { AlgebricksMetaOperatorDescriptor amod = (AlgebricksMetaOperatorDescriptor) opd; - - return amod.getPipeline(); - // we suppose that the top operator in the subplan already does the - // projection for us + result.add(amod.getPipeline()); } - - throw new IllegalStateException(); + return result; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java new file mode 100644 index 0000000..a4d9576 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; + +public abstract class AbstractUnionAllPOperator extends AbstractPhysicalOperator { + + @Override + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + IPartitioningProperty pp = op2.getDeliveredPhysicalProperties().getPartitioningProperty(); + this.deliveredProperties = new StructuralPropertiesVector(pp, new ArrayList<>(0)); + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { + StructuralPropertiesVector pv0 = + OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new StructuralPropertiesVector( + new RandomPartitioningProperty(context.getComputationNodeDomain()), null)); + StructuralPropertiesVector pv1 = + OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new StructuralPropertiesVector( + new RandomPartitioningProperty(context.getComputationNodeDomain()), null)); + return new PhysicalRequirements(new StructuralPropertiesVector[] { pv0, pv1 }, + IPartitioningRequirementsCoordinator.NO_COORDINATION); + } + + @Override + public boolean expensiveThanMaterialization() { + return false; + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + List<Mutable<ILogicalOperator>> inputs = op.getInputs(); + for (int i = 0; i < inputs.size(); i++) { + ILogicalOperator src = inputs.get(i).getValue(); + builder.contributeGraphEdge(src, 0, op, i); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java new file mode 100644 index 0000000..f5e992e --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.runtime.operators.union.MicroUnionAllRuntimeFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; + +public class MicroUnionAllPOperator extends AbstractUnionAllPOperator { + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.MICRO_UNION_ALL; + } + + @Override + public boolean isMicroOperator() { + return true; + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + RecordDescriptor recordDescriptor = + JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); + + MicroUnionAllRuntimeFactory runtime = new MicroUnionAllRuntimeFactory(op.getInputs().size()); + builder.contributeMicroOperator(op, runtime, recordDescriptor); + + super.contributeRuntimeOperator(builder, context, op, opSchema, inputSchemas, outerPlanSchema); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java index d43ddab..95efbac 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java @@ -89,18 +89,18 @@ public class SubplanPOperator extends AbstractPhysicalOperator { if (subplan.getNestedPlans().size() != 1) { throw new NotImplementedException("Subplan currently works only for one nested plan with one root."); } - AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], subplan, opSchema, context); - assert subplans.length == 1; - AlgebricksPipeline np = subplans[0]; + List<List<AlgebricksPipeline>> subplans = compileSubplansImpl(inputSchemas[0], subplan, opSchema, context); + assert subplans.size() == 1; + List<AlgebricksPipeline> np = subplans.get(0); RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor( context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context); - IMissingWriterFactory[] missingWriterFactories = new IMissingWriterFactory[np.getOutputWidth()]; + IMissingWriterFactory[] missingWriterFactories = new IMissingWriterFactory[np.get(0).getOutputWidth()]; for (int i = 0; i < missingWriterFactories.length; i++) { missingWriterFactories[i] = context.getMissingWriterFactory(); } - SubplanRuntimeFactory runtime = new SubplanRuntimeFactory(np, missingWriterFactories, inputRecordDesc, null); - RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); + SubplanRuntimeFactory runtime = + new SubplanRuntimeFactory(np, missingWriterFactories, inputRecordDesc, recDesc, null); builder.contributeMicroOperator(subplan, runtime, recDesc); ILogicalOperator src = op.getInputs().get(0).getValue(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java index a617064..4ccce92 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java @@ -18,29 +18,18 @@ */ package org.apache.hyracks.algebricks.core.algebra.operators.physical; -import java.util.ArrayList; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; -import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; -import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; -import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; -import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; -import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty; -import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; -import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.dataflow.std.union.UnionAllOperatorDescriptor; -public class UnionAllPOperator extends AbstractPhysicalOperator { +public class UnionAllPOperator extends AbstractUnionAllPOperator { @Override public PhysicalOperatorTag getOperatorTag() { @@ -53,48 +42,16 @@ public class UnionAllPOperator extends AbstractPhysicalOperator { } @Override - public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { - AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); - IPartitioningProperty pp = op2.getDeliveredPhysicalProperties().getPartitioningProperty(); - this.deliveredProperties = new StructuralPropertiesVector(pp, new ArrayList<>(0)); - } - - @Override - public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { - StructuralPropertiesVector pv0 = - OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new StructuralPropertiesVector( - new RandomPartitioningProperty(context.getComputationNodeDomain()), null)); - StructuralPropertiesVector pv1 = - OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new StructuralPropertiesVector( - new RandomPartitioningProperty(context.getComputationNodeDomain()), null)); - return new PhysicalRequirements(new StructuralPropertiesVector[] { pv0, pv1 }, - IPartitioningRequirementsCoordinator.NO_COORDINATION); - } - - @Override public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) throws AlgebricksException { - - IOperatorDescriptorRegistry spec = builder.getJobSpec(); RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); - // at algebricks level, union all only accepts two inputs, although at - // hyracks - // level, there is no restrictions - UnionAllOperatorDescriptor opDesc = new UnionAllOperatorDescriptor(spec, 2, recordDescriptor); + UnionAllOperatorDescriptor opDesc = + new UnionAllOperatorDescriptor(builder.getJobSpec(), op.getInputs().size(), recordDescriptor); contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc); - ILogicalOperator src1 = op.getInputs().get(0).getValue(); - builder.contributeGraphEdge(src1, 0, op, 0); - ILogicalOperator src2 = op.getInputs().get(1).getValue(); - builder.contributeGraphEdge(src2, 0, op, 1); - } - @Override - public boolean expensiveThanMaterialization() { - return false; + super.contributeRuntimeOperator(builder, context, op, opSchema, inputSchemas, outerPlanSchema); } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java index 249e66f..c574cd8 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java @@ -18,7 +18,10 @@ */ package org.apache.hyracks.algebricks.core.algebra.util; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; import java.util.List; import java.util.Map; import java.util.Set; @@ -202,7 +205,7 @@ public class OperatorManipulationUtil { public static Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> deepCopyWithNewVars( ILogicalOperator root, IOptimizationContext ctx) throws AlgebricksException { LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor = - new LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, null, true); + new LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, ctx, true); ILogicalOperator newRoot = deepCopyVisitor.deepCopy(root); return Pair.of(newRoot, deepCopyVisitor.getInputToOutputVariableMapping()); } @@ -327,4 +330,45 @@ public class OperatorManipulationUtil { return false; } + /** + * Returns all descendants of an operator that are leaf operators + * + * @param opRef given operator + * @return list containing all leaf descendants + */ + public static List<Mutable<ILogicalOperator>> findLeafDescendantsOrSelf(Mutable<ILogicalOperator> opRef) { + List<Mutable<ILogicalOperator>> result = Collections.emptyList(); + + Deque<Mutable<ILogicalOperator>> queue = new ArrayDeque<>(); + queue.add(opRef); + Mutable<ILogicalOperator> currentOpRef; + while ((currentOpRef = queue.pollLast()) != null) { + List<Mutable<ILogicalOperator>> inputs = currentOpRef.getValue().getInputs(); + if (inputs.isEmpty()) { + if (result.isEmpty()) { + result = new ArrayList<>(); + } + result.add(currentOpRef); + } else { + queue.addAll(inputs); + } + } + return result; + } + + /** + * Find operator in a given list of operator references + * + * @param list list to search in + * @param op operator to find + * @return operator position in the given list or {@code -1} if not found + */ + public static int indexOf(List<Mutable<ILogicalOperator>> list, ILogicalOperator op) { + for (int i = 0, ln = list.size(); i < ln; i++) { + if (list.get(i).getValue() == op) { + return i; + } + } + return -1; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java index 13eef09..16992e7 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.ArrayUtils; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; @@ -34,6 +35,8 @@ import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode; +import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil; +import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; @@ -96,7 +99,7 @@ public class JobBuilder implements IHyracksJobBuilder { @Override public void contributeMicroOperator(ILogicalOperator op, IPushRuntimeFactory runtime, RecordDescriptor recDesc, AlgebricksPartitionConstraint pc) { - microOps.put(op, new Pair<IPushRuntimeFactory, RecordDescriptor>(runtime, recDesc)); + microOps.put(op, new Pair<>(runtime, recDesc)); revMicroOpMap.put(runtime, op); if (pc != null) { pcForMicroOps.put(op, pc); @@ -171,6 +174,17 @@ public class JobBuilder implements IHyracksJobBuilder { setAllPartitionConstraints(tgtConstraints); } + public List<IOperatorDescriptor> getGeneratedMetaOps() { + List<IOperatorDescriptor> resultOps = new ArrayList<>(); + for (IOperatorDescriptor opd : jobSpec.getOperatorMap().values()) { + if (opd instanceof AlgebricksMetaOperatorDescriptor) { + resultOps.add(opd); + } + } + resultOps.sort((op1, op2) -> sendsOutput(op1, op2) ? 1 : sendsOutput(op2, op1) ? -1 : 0); + return resultOps; + } + private void setAllPartitionConstraints(Map<IConnectorDescriptor, TargetConstraint> tgtConstraints) { List<OperatorDescriptorId> roots = jobSpec.getRoots(); setSpecifiedPartitionConstraints(); @@ -317,20 +331,30 @@ public class JobBuilder implements IHyracksJobBuilder { int n = opContents.size(); IPushRuntimeFactory[] runtimeFactories = new IPushRuntimeFactory[n]; RecordDescriptor[] internalRecordDescriptors = new RecordDescriptor[n]; - int i = 0; - for (Pair<IPushRuntimeFactory, RecordDescriptor> p : opContents) { + for (int i = 0, ln = opContents.size(); i < ln; i++) { + Pair<IPushRuntimeFactory, RecordDescriptor> p = opContents.get(i); runtimeFactories[i] = p.first; internalRecordDescriptors[i] = p.second; - i++; } ILogicalOperator lastLogicalOp = revMicroOpMap.get(runtimeFactories[n - 1]); ArrayList<ILogicalOperator> outOps = outEdges.get(lastLogicalOp); - int outArity = (outOps == null) ? 0 : outOps.size(); + int outArity = outOps == null ? 0 : outOps.size(); + int[] outPositions = new int[outArity]; + IPushRuntimeFactory[] outRuntimeFactories = new IPushRuntimeFactory[outArity]; + if (outOps != null) { + for (int i = 0, ln = outOps.size(); i < ln; i++) { + ILogicalOperator outOp = outOps.get(i); + outPositions[i] = OperatorManipulationUtil.indexOf(outOp.getInputs(), lastLogicalOp); + Pair<IPushRuntimeFactory, RecordDescriptor> microOpPair = microOps.get(outOp); + outRuntimeFactories[i] = microOpPair != null ? microOpPair.first : null; + } + } + ILogicalOperator firstLogicalOp = revMicroOpMap.get(runtimeFactories[0]); ArrayList<ILogicalOperator> inOps = inEdges.get(firstLogicalOp); int inArity = (inOps == null) ? 0 : inOps.size(); return new AlgebricksMetaOperatorDescriptor(jobSpec, inArity, outArity, runtimeFactories, - internalRecordDescriptors); + internalRecordDescriptors, outRuntimeFactories, outPositions); } private void addMicroOpToMetaRuntimeOp(ILogicalOperator aop) { @@ -344,7 +368,12 @@ public class JobBuilder implements IHyracksJobBuilder { return; } ILogicalOperator dest = destList.get(0); + int destInputPos = OperatorManipulationUtil.indexOf(dest.getInputs(), aop); Integer j = algebraicOpBelongingToMetaAsterixOp.get(dest); + if (destInputPos != 0) { + return; + } + if (j == null && microOps.get(dest) != null) { algebraicOpBelongingToMetaAsterixOp.put(dest, k); List<Pair<IPushRuntimeFactory, RecordDescriptor>> aodContent1 = metaAsterixOpSkeletons.get(k); @@ -362,7 +391,6 @@ public class JobBuilder implements IHyracksJobBuilder { } } } - } private int createNewMetaOpInfo(ILogicalOperator aop) { @@ -387,4 +415,28 @@ public class JobBuilder implements IHyracksJobBuilder { } } + private boolean sendsOutput(IOperatorDescriptor src, IOperatorDescriptor trg) { + AlgebricksPipeline srcPipeline = ((AlgebricksMetaOperatorDescriptor) src).getPipeline(); + IPushRuntimeFactory[] srcOutRts = srcPipeline.getOutputRuntimeFactories(); + if (srcOutRts == null) { + return false; + } + IPushRuntimeFactory[] trgRts = ((AlgebricksMetaOperatorDescriptor) trg).getPipeline().getRuntimeFactories(); + for (IPushRuntimeFactory srcOutRt : srcOutRts) { + if (ArrayUtils.contains(trgRts, srcOutRt)) { + return true; + } + ILogicalOperator srcOutOp = revMicroOpMap.get(srcOutRt); + if (srcOutOp != null) { + Integer k = algebraicOpBelongingToMetaAsterixOp.get(srcOutOp); + if (k != null) { + AlgebricksMetaOperatorDescriptor srcOutMetaOp = metaAsterixOps.get(k); + if (srcOutMetaOp != null && sendsOutput(srcOutMetaOp, trg)) { + return true; + } + } + } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java index 7409247..ddda258 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java @@ -48,14 +48,24 @@ public class PlanCompiler { return context; } - public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema, + public JobSpecification compilePlan(ILogicalPlan plan, IJobletEventListenerFactory jobEventListenerFactory) + throws AlgebricksException { + return compilePlanImpl(plan, false, null, jobEventListenerFactory); + } + + public JobSpecification compileNestedPlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + return compilePlanImpl(plan, true, outerPlanSchema, null); + } + + private JobSpecification compilePlanImpl(ILogicalPlan plan, boolean isNestedPlan, IOperatorSchema outerPlanSchema, IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException { JobSpecification spec = new JobSpecification(context.getFrameSize()); if (jobEventListenerFactory != null) { spec.setJobletEventListenerFactory(jobEventListenerFactory); } - List<ILogicalOperator> rootOps = new ArrayList<ILogicalOperator>(); - IHyracksJobBuilder builder = new JobBuilder(spec, context.getClusterLocations()); + List<ILogicalOperator> rootOps = new ArrayList<>(); + JobBuilder builder = new JobBuilder(spec, context.getClusterLocations()); for (Mutable<ILogicalOperator> opRef : plan.getRoots()) { compileOpRef(opRef, spec, builder, outerPlanSchema); rootOps.add(opRef.getValue()); @@ -66,6 +76,9 @@ public class PlanCompiler { spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); // Do not do activity cluster planning because it is slow on large clusters spec.setUseConnectorPolicyForScheduling(false); + if (isNestedPlan) { + spec.setMetaOps(builder.getGeneratedMetaOps()); + } return spec; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java index 5b6285a..d277043 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java @@ -67,6 +67,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.IntersectPO import org.apache.hyracks.algebricks.core.algebra.operators.physical.LeftOuterUnnestPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreSortedDistinctByPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroUnionAllPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator; @@ -200,11 +201,11 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule break; } case INNERJOIN: { - JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context); + JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp, context); break; } case LEFTOUTERJOIN: { - JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, context); + JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, topLevelOp, context); break; } case LIMIT: { @@ -259,11 +260,19 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule break; } case UNIONALL: { - op.setPhysicalOperator(new UnionAllPOperator()); + if (topLevelOp) { + op.setPhysicalOperator(new UnionAllPOperator()); + } else { + op.setPhysicalOperator(new MicroUnionAllPOperator()); + } break; } case INTERSECT: { - op.setPhysicalOperator(new IntersectPOperator()); + if (topLevelOp) { + op.setPhysicalOperator(new IntersectPOperator()); + } else { + throw new IllegalStateException("Micro operator not implemented for: " + op.getOperatorTag()); + } break; } case UNNEST: { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java index 9d3b311..3efa46b 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java @@ -139,6 +139,10 @@ public class PushSubplanIntoGroupByRule implements IAlgebraicRewriteRule { while (upperSubplanRootRefIterator.hasNext()) { Mutable<ILogicalOperator> rootOpRef = upperSubplanRootRefIterator.next(); + if (downToNts(rootOpRef) == null) { + continue; + } + // Collects free variables in the root operator of a nested plan and its descent. Set<LogicalVariable> freeVars = new ListSet<>(); OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) rootOpRef.getValue(), @@ -154,6 +158,9 @@ public class PushSubplanIntoGroupByRule implements IAlgebraicRewriteRule { // Sets the nts for a original subplan. Mutable<ILogicalOperator> originalGbyRootOpRef = gbyNestedPlan.getRoots().get(rootIndex); Mutable<ILogicalOperator> originalGbyNtsRef = downToNts(originalGbyRootOpRef); + if (originalGbyNtsRef == null) { + continue; + } NestedTupleSourceOperator originalNts = (NestedTupleSourceOperator) originalGbyNtsRef.getValue(); originalNts.setDataSourceReference(new MutableObject<>(gby)); @@ -265,11 +272,13 @@ public class PushSubplanIntoGroupByRule implements IAlgebraicRewriteRule { } private Mutable<ILogicalOperator> downToNts(Mutable<ILogicalOperator> opRef) { - Mutable<ILogicalOperator> currentOpRef = opRef; - while (currentOpRef.getValue().getInputs().size() > 0) { - currentOpRef = currentOpRef.getValue().getInputs().get(0); + List<Mutable<ILogicalOperator>> leafOps = OperatorManipulationUtil.findLeafDescendantsOrSelf(opRef); + if (leafOps.size() == 1) { + Mutable<ILogicalOperator> leafOp = leafOps.get(0); + if (leafOp.getValue().getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) { + return leafOp; + } } - return currentOpRef; + return null; } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java index 6efda52..0bc2a5e 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java @@ -51,8 +51,11 @@ public class JoinUtils { private JoinUtils() { } - public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, IOptimizationContext context) - throws AlgebricksException { + public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, boolean topLevelOp, + IOptimizationContext context) throws AlgebricksException { + if (!topLevelOp) { + throw new IllegalStateException("Micro operator not implemented for: " + op.getOperatorTag()); + } List<LogicalVariable> sideLeft = new LinkedList<>(); List<LogicalVariable> sideRight = new LinkedList<>(); List<LogicalVariable> varsLeft = op.getInputs().get(0).getValue().getSchema(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml index bbea2ab..dafb6ab 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml +++ b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml @@ -73,6 +73,10 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java index 379944b..f24d38d 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java @@ -27,10 +27,15 @@ public class AlgebricksPipeline implements Serializable { private static final long serialVersionUID = 1L; private final IPushRuntimeFactory[] runtimeFactories; private final RecordDescriptor[] recordDescriptors; + private final IPushRuntimeFactory[] outputRuntimeFactories; + private final int[] outputPositions; - public AlgebricksPipeline(IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] recordDescriptors) { + public AlgebricksPipeline(IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] recordDescriptors, + IPushRuntimeFactory[] outputRuntimeFactories, int[] outputPositions) { this.runtimeFactories = runtimeFactories; this.recordDescriptors = recordDescriptors; + this.outputRuntimeFactories = outputRuntimeFactories; + this.outputPositions = outputPositions; // this.projectedColumns = projectedColumns; } @@ -46,8 +51,15 @@ public class AlgebricksPipeline implements Serializable { return recordDescriptors[recordDescriptors.length - 1].getFieldCount(); } + public IPushRuntimeFactory[] getOutputRuntimeFactories() { + return outputRuntimeFactories; + } + + public int[] getOutputPositions() { + return outputPositions; + } + // public int[] getProjectedColumns() { // return projectedColumns; // } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java index de6cddd..f90de81 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java @@ -24,5 +24,5 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IPushRuntimeFactory extends Serializable { - public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException; + IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java index 94af04f..0a578f6 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java @@ -168,7 +168,7 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati // should enforce protocol boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); for (int i = runtimeFactories.length - 1; i >= 0; i--) { - IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx); + IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx)[0]; newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; start = enforce ? EnforcePushRuntime.enforce(start) : start; newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java index c261df8..75b2fb2 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java @@ -149,7 +149,7 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories(); RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors(); for (int i = runtimeFactories.length - 1; i >= 0; i--) { - IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx); + IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx)[0]; newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; start = enforce ? EnforceFrameWriter.enforce(start) : start; newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);