Dmitry Lychagin has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/3151
Change subject: [NO ISSUE][RT] Window operator runtime optimization ...................................................................... [NO ISSUE][RT] Window operator runtime optimization - user model changes: no - storage format changes: no - interface changes: no Details: - Runtime optimization for window operators with accumulating frames (unbounded preceding to current row or n following) - Refactor window function properties into generic builtin function properties Change-Id: I8d1574defc73076ad960c4067432da29ead160a5 --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.7.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.8.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.7.adm A asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.8.adm A asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.9.adm M asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowAggregationSugarVisitor.java M asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowRewriteVisitor.java M asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java A hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregatePushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java A hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java A hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java A hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java A hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java 32 files changed, 928 insertions(+), 233 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/51/3151/1 diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java index d1ce865..6e40291 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java @@ -21,6 +21,8 @@ import java.util.ArrayList; import java.util.List; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.metadata.declared.DataSourceId; import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.optimizer.rules.am.AccessMethodUtils; @@ -37,6 +39,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; public class AnalysisUtil { /* @@ -131,6 +134,25 @@ } /** + * Checks whether a window operator has a function call where the function has given property + */ + public static boolean hasFunctionWithProperty(WindowOperator winOp, + BuiltinFunctions.WindowFunctionProperty property) throws CompilationException { + for (Mutable<ILogicalExpression> exprRef : winOp.getExpressions()) { + ILogicalExpression expr = exprRef.getValue(); + if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, winOp.getSourceLocation(), + expr.getExpressionTag()); + } + AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) expr; + if (BuiltinFunctions.builtinFunctionHasProperty(callExpr.getFunctionIdentifier(), property)) { + return true; + } + } + return false; + } + + /** * Checks whether frame boundary expression is a monotonically non-descreasing function over a frame value variable */ public static boolean isWindowFrameBoundaryMonotonic(List<Mutable<ILogicalExpression>> frameBoundaryExprList, diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java index ce9fd03..368393d 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.apache.asterix.algebra.operators.physical.BTreeSearchPOperator; import org.apache.asterix.algebra.operators.physical.InvertedIndexPOperator; @@ -67,6 +68,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator; import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; +import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.algebricks.rewriter.util.JoinUtils; @@ -365,24 +367,21 @@ LogicalVariable var = ((VariableReferenceExpression) orderExpr).getVariableReference(); orderColumns.add(new OrderColumn(var, p.first.getKind())); } - boolean partitionMaterialization = winOp.hasNestedPlans(); - if (!partitionMaterialization) { - for (Mutable<ILogicalExpression> exprRef : winOp.getExpressions()) { - ILogicalExpression expr = exprRef.getValue(); - if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { - throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, winOp.getSourceLocation(), - expr.getExpressionTag()); - } - AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) expr; - if (BuiltinFunctions.windowFunctionHasProperty(callExpr.getFunctionIdentifier(), - BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION)) { - partitionMaterialization = true; - break; - } - } - } + + boolean partitionMaterialization = winOp.hasNestedPlans() || AnalysisUtil.hasFunctionWithProperty(winOp, + BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION); boolean frameStartIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(), winOp.getFrameValueExpressions()); - return new WindowPOperator(partitionColumns, partitionMaterialization, orderColumns, frameStartIsMonotonic); + boolean frameEndIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameEndExpressions(), + winOp.getFrameValueExpressions()); + + boolean nestedAggregates = false; + if (winOp.hasNestedPlans()) { + Set<LogicalOperatorTag> roots = OperatorPropertiesUtil.getRootOperatorTags(winOp.getNestedPlans()); + nestedAggregates = roots.size() == 1 && roots.contains(LogicalOperatorTag.AGGREGATE); + } + + return new WindowPOperator(partitionColumns, partitionMaterialization, orderColumns, frameStartIsMonotonic, + frameEndIsMonotonic, nestedAggregates); } } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java index ce857ea..81f054a 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java @@ -1040,11 +1040,11 @@ fs.getName()); } boolean isWin = BuiltinFunctions.isWindowFunction(fi); - boolean isWinAgg = isWin - && BuiltinFunctions.windowFunctionHasProperty(fi, BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG); - boolean prohibitOrderClause = isWin && BuiltinFunctions.windowFunctionHasProperty(fi, + boolean isWinAgg = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi, + BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG); + boolean prohibitOrderClause = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi, BuiltinFunctions.WindowFunctionProperty.NO_ORDER_CLAUSE); - boolean prohibitFrameClause = isWin && BuiltinFunctions.windowFunctionHasProperty(fi, + boolean prohibitFrameClause = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi, BuiltinFunctions.WindowFunctionProperty.NO_FRAME_CLAUSE); Mutable<ILogicalOperator> currentOpRef = tupSource; @@ -1313,7 +1313,7 @@ if (fcallExpr.getKind() != AbstractFunctionCallExpression.FunctionKind.STATEFUL) { throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc, fcallExpr.getKind()); } - if (BuiltinFunctions.windowFunctionHasProperty(fi, + if (BuiltinFunctions.builtinFunctionHasProperty(fi, BuiltinFunctions.WindowFunctionProperty.INJECT_ORDER_ARGS)) { for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExprListOut) { fcallExpr.getArguments().add(new MutableObject<>(p.second.getValue().cloneExpression())); @@ -1575,7 +1575,7 @@ } AbstractFunctionCallExpression valueExpr = BuiltinFunctions.makeWindowFunctionExpression(fid, new ArrayList<>()); - if (BuiltinFunctions.windowFunctionHasProperty(valueExpr.getFunctionIdentifier(), + if (BuiltinFunctions.builtinFunctionHasProperty(valueExpr.getFunctionIdentifier(), BuiltinFunctions.WindowFunctionProperty.INJECT_ORDER_ARGS)) { for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExprList) { valueExpr.getArguments().add(new MutableObject<>(p.second.getValue().cloneExpression())); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp index 47f2ce9..31228c5 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp @@ -37,3 +37,17 @@ min(result_delta) min_delta, max(result_delta) max_delta }; + +create function q2_max_unbounded_preceding_n_following(N) { + let + DBL_N = N * 2 + from + range(1, DBL_N) x + let + result_expected = case when x > N then DBL_N else x + N end, + result_actual = max(x) over (order by x range between unbounded preceding and N following), + result_delta = result_expected - result_actual + select + min(result_delta) min_delta, + max(result_delta) max_delta +} \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.7.query.sqlpp new file mode 100644 index 0000000..fab9e42 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.7.query.sqlpp @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Test window operator with accumulating frame + * : that always covers the whole partition + * Expected Res : SUCCESS + */ + +from range(1, 10) t +select t, sum(t) over(order by t range between unbounded preceding and 1000 following) as `sum` +order by t \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.8.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.8.query.sqlpp new file mode 100644 index 0000000..bf21dc3 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.8.query.sqlpp @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Test window operator with accumulating frame + * : and window function with row limit + * Expected Res : SUCCESS + */ + +from range(1, 10) t +select t, first_value(t) over(order by t) as first +order by t diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp new file mode 100644 index 0000000..ad6913c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Test window operator with accumulating frame + * : that spans several physical frames + * Expected Res : SUCCESS + */ + +use test; + +q2_max_unbounded_preceding_n_following(5000); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.7.adm new file mode 100644 index 0000000..5f3b610 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.7.adm @@ -0,0 +1,10 @@ +{ "t": 1, "sum": 55 } +{ "t": 2, "sum": 55 } +{ "t": 3, "sum": 55 } +{ "t": 4, "sum": 55 } +{ "t": 5, "sum": 55 } +{ "t": 6, "sum": 55 } +{ "t": 7, "sum": 55 } +{ "t": 8, "sum": 55 } +{ "t": 9, "sum": 55 } +{ "t": 10, "sum": 55 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.8.adm new file mode 100644 index 0000000..d6b51d1 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.8.adm @@ -0,0 +1,10 @@ +{ "t": 1, "first": 1 } +{ "t": 2, "first": 1 } +{ "t": 3, "first": 1 } +{ "t": 4, "first": 1 } +{ "t": 5, "first": 1 } +{ "t": 6, "first": 1 } +{ "t": 7, "first": 1 } +{ "t": 8, "first": 1 } +{ "t": 9, "first": 1 } +{ "t": 10, "first": 1 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.9.adm new file mode 100644 index 0000000..6115ead --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.9.adm @@ -0,0 +1 @@ +{ "min_delta": 0, "max_delta": 0 } \ No newline at end of file diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowAggregationSugarVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowAggregationSugarVisitor.java index a060d57..777f6db 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowAggregationSugarVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowAggregationSugarVisitor.java @@ -88,7 +88,7 @@ if (winfi != null) { winExpr.setFunctionSignature(new FunctionSignature(winfi)); rewriteSpecificWindowFunctions(winfi, winExpr); - if (BuiltinFunctions.windowFunctionHasProperty(winfi, + if (BuiltinFunctions.builtinFunctionHasProperty(winfi, BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG)) { wrapAggregationArguments(winExpr, 1); } diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowRewriteVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowRewriteVisitor.java index 3c5032a..42a4282 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowRewriteVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppWindowRewriteVisitor.java @@ -64,7 +64,7 @@ FunctionSignature signature = winExpr.getFunctionSignature(); FunctionIdentifier winfi = FunctionMapUtil.getInternalWindowFunction(signature); if (winfi != null) { - if (BuiltinFunctions.windowFunctionHasProperty(winfi, + if (BuiltinFunctions.builtinFunctionHasProperty(winfi, BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG)) { List<Expression> newExprList = extractExpressions(winExpr.getExprList(), 1); if (newExprList == null) { diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java index 2eddfb5..50b13e2 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java @@ -219,7 +219,7 @@ FunctionSignature fs = winExpr.getFunctionSignature(); FunctionIdentifier winfi = FunctionMapUtil.getInternalWindowFunction(fs); if (winfi != null) { - if (BuiltinFunctions.windowFunctionHasProperty(winfi, + if (BuiltinFunctions.builtinFunctionHasProperty(winfi, BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG)) { visitWindowExpressionExcludingExprList(winExpr, arg); List<Expression> exprList = winExpr.getExprList(); diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java index 20fbf63..623cf08 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java @@ -157,6 +157,8 @@ private static final Map<IFunctionInfo, IFunctionInfo> builtinPublicFunctionsSet = new HashMap<>(); private static final Map<IFunctionInfo, IFunctionInfo> builtinPrivateFunctionsSet = new HashMap<>(); private static final Map<IFunctionInfo, IResultTypeComputer> funTypeComputer = new HashMap<>(); + private static final Map<IFunctionInfo, Set<? extends BuiltinFunctionProperty>> builtinFunctionProperties = + new HashMap<>(); private static final Set<IFunctionInfo> builtinAggregateFunctions = new HashSet<>(); private static final Map<IFunctionInfo, IFunctionToDataSourceRewriter> datasourceFunctions = new HashMap<>(); private static final Set<IFunctionInfo> similarityFunctions = new HashSet<>(); @@ -170,7 +172,7 @@ private static final Map<IFunctionInfo, IFunctionInfo> distinctToRegularScalarAggregateFunctionMap = new HashMap<>(); private static final Map<IFunctionInfo, IFunctionInfo> sqlToWindowFunctions = new HashMap<>(); - private static final Map<IFunctionInfo, Set<WindowFunctionProperty>> windowFunctions = new HashMap<>(); + private static final Set<IFunctionInfo> windowFunctions = new HashSet<>(); private static final Map<IFunctionInfo, SpatialFilterKind> spatialFilterFunctions = new HashMap<>(); @@ -2629,7 +2631,7 @@ addIntermediateAgg(SERIAL_GLOBAL_SQL_SUM, SERIAL_INTERMEDIATE_SQL_SUM); addGlobalAgg(SERIAL_SQL_SUM, SERIAL_GLOBAL_SQL_SUM); - // SQL SUM Distinct + // SQL SUM DISTINCT addDistinctAgg(SQL_SUM_DISTINCT, SCALAR_SQL_SUM); addScalarAgg(SQL_SUM_DISTINCT, SCALAR_SQL_SUM_DISTINCT); @@ -2641,7 +2643,10 @@ addGlobalAgg(ST_UNION_AGG, ST_UNION_AGG); } - public enum WindowFunctionProperty { + interface BuiltinFunctionProperty { + } + + public enum WindowFunctionProperty implements BuiltinFunctionProperty { /** Whether the order clause is prohibited */ NO_ORDER_CLAUSE, /** Whether the frame clause is prohibited */ @@ -2818,6 +2823,21 @@ registeredFunctions.put(fi, functionInfo); } + private static <T extends Enum<T> & BuiltinFunctionProperty> void registerFunctionProperties(IFunctionInfo finfo, + Class<T> propertyClass, T[] properties) { + if (properties == null) { + return; + } + Set<T> propertySet = EnumSet.noneOf(propertyClass); + Collections.addAll(propertySet, properties); + builtinFunctionProperties.put(finfo, propertySet); + } + + public static boolean builtinFunctionHasProperty(FunctionIdentifier fi, BuiltinFunctionProperty property) { + Set<? extends BuiltinFunctionProperty> propertySet = builtinFunctionProperties.get(getAsterixFunctionInfo(fi)); + return propertySet != null && propertySet.contains(property); + } + public static void addAgg(FunctionIdentifier fi) { builtinAggregateFunctions.add(getAsterixFunctionInfo(fi)); } @@ -2856,10 +2876,9 @@ WindowFunctionProperty... properties) { IFunctionInfo sqlinfo = getAsterixFunctionInfo(sqlfi); IFunctionInfo wininfo = getAsterixFunctionInfo(winfi); - Set<WindowFunctionProperty> propertiesSet = EnumSet.noneOf(WindowFunctionProperty.class); - Collections.addAll(propertiesSet, properties); sqlToWindowFunctions.put(sqlinfo, wininfo); - windowFunctions.put(wininfo, propertiesSet); + windowFunctions.add(wininfo); + registerFunctionProperties(wininfo, WindowFunctionProperty.class, properties); } public static FunctionIdentifier getWindowFunction(FunctionIdentifier sqlfi) { @@ -2868,12 +2887,7 @@ } public static boolean isWindowFunction(FunctionIdentifier winfi) { - return windowFunctions.containsKey(getAsterixFunctionInfo(winfi)); - } - - public static boolean windowFunctionHasProperty(FunctionIdentifier winfi, WindowFunctionProperty property) { - Set<WindowFunctionProperty> propertySet = windowFunctions.get(getAsterixFunctionInfo(winfi)); - return propertySet != null && propertySet.contains(property); + return windowFunctions.contains(getAsterixFunctionInfo(winfi)); } public static AbstractFunctionCallExpression makeWindowFunctionExpression(FunctionIdentifier winfi, diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java index bdfdac8..71b5239 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java @@ -327,6 +327,7 @@ * Use {@link #acceptExpressionTransform(ILogicalExpressionReferenceTransform, boolean)} * to visit only non-requiring expressions. */ + @Override public boolean requiresVariableReferenceExpressions() { return false; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java index 2a8658d..b65feb3 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java @@ -58,6 +58,7 @@ import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRunningRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansUnboundedRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.win.WindowSimpleRuntimeFactory; @@ -77,12 +78,19 @@ private final boolean frameStartIsMonotonic; + private final boolean frameEndIsMonotonic; + + private final boolean nestedAggregates; + public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization, - List<OrderColumn> orderColumns, boolean frameStartIsMonotonic) { + List<OrderColumn> orderColumns, boolean frameStartIsMonotonic, boolean frameEndIsMonotonic, + boolean nestedAggregates) { this.partitionColumns = partitionColumns; this.partitionMaterialization = partitionMaterialization; this.orderColumns = orderColumns; this.frameStartIsMonotonic = frameStartIsMonotonic; + this.frameEndIsMonotonic = frameEndIsMonotonic; + this.nestedAggregates = nestedAggregates; } @Override @@ -202,7 +210,7 @@ inputSchemas, context); } - AbstractWindowRuntimeFactory runtime; + AbstractWindowRuntimeFactory runtime = null; if (winOp.hasNestedPlans()) { int opSchemaSizePreSubplans = opSchema.getSize(); AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], winOp, opSchema, context); @@ -210,20 +218,34 @@ WindowAggregatorDescriptorFactory nestedAggFactory = new WindowAggregatorDescriptorFactory(subplans); nestedAggFactory.setSourceLocation(winOp.getSourceLocation()); - boolean useUnboundedRuntime = frameStartExprList.isEmpty() && frameEndExprList.isEmpty() - && frameExcludeExprList.isEmpty() && frameOffsetExprEval == null; - if (useUnboundedRuntime) { - runtime = new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList, - partitionComparatorFactories, orderComparatorFactories, winOp.getFrameMaxObjects(), - projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, - aggregatorOutputSchemaSize, nestedAggFactory); - } else { + int frameMaxObjects = winOp.getFrameMaxObjects(); + + // special cases + if (frameStartExprList.isEmpty() && frameExcludeExprList.isEmpty() && frameOffsetExpr == null) { + if (frameEndExprList.isEmpty()) { + // special case #1: frame == whole partition, no exclusions, no offset + runtime = new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList, + partitionComparatorFactories, orderComparatorFactories, frameMaxObjects, + projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, + aggregatorOutputSchemaSize, nestedAggFactory); + } else if (frameEndIsMonotonic && nestedAggregates) { + // special case #2: accumulating frame from beginning of the partition, no exclusions, no offset + nestedAggFactory.setPartialOutputEnabled(true); + runtime = new WindowNestedPlansRunningRuntimeFactory(partitionColumnsList, + partitionComparatorFactories, orderComparatorFactories, + frameValueExprEvalsAndComparators.first, frameValueExprEvalsAndComparators.second, + frameEndExprEvals, frameMaxObjects, projectionColumnsExcludingSubplans, + runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory); + } + } + // default case + if (runtime == null) { runtime = new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories, orderComparatorFactories, frameValueExprEvalsAndComparators.first, frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartIsMonotonic, frameEndExprEvals, frameExcludeExprEvalsAndComparators.first, winOp.getFrameExcludeNegationStartIdx(), frameExcludeExprEvalsAndComparators.second, - frameOffsetExprEval, context.getBinaryIntegerInspectorFactory(), winOp.getFrameMaxObjects(), + frameOffsetExprEval, context.getBinaryIntegerInspectorFactory(), frameMaxObjects, projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory); } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java index 8713106..ebe024f 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java @@ -19,7 +19,9 @@ package org.apache.hyracks.algebricks.core.algebra.util; import java.util.Collection; +import java.util.EnumSet; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.commons.lang3.mutable.Mutable; @@ -367,4 +369,17 @@ && rightChild.getExecutionMode().equals(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED); return unPartitioned ? StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR : partitionedPropertiesVector; } + + /** + * Returns a set of all root operator tags in given plans + */ + public static Set<LogicalOperatorTag> getRootOperatorTags(List<ILogicalPlan> plans) { + Set<LogicalOperatorTag> opTags = EnumSet.noneOf(LogicalOperatorTag.class); + for (ILogicalPlan plan : plans) { + for (Mutable<ILogicalOperator> root : plan.getRoots()) { + opTags.add(root.getValue().getOperatorTag()); + } + } + return opTags; + } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregatePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregatePushRuntime.java new file mode 100644 index 0000000..c909782 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregatePushRuntime.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.aggreg; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; + +/** + * Aggregate operator runtime + */ +public class AggregatePushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { + + private final IAggregateEvaluatorFactory[] aggFactories; + + private final IHyracksTaskContext ctx; + + private IAggregateEvaluator[] aggEvals; + + private IPointable result; + + private ArrayTupleBuilder tupleBuilder; + + private boolean first; + + AggregatePushRuntime(IAggregateEvaluatorFactory[] aggFactories, IHyracksTaskContext ctx) { + this.aggFactories = aggFactories; + this.ctx = ctx; + aggEvals = new IAggregateEvaluator[aggFactories.length]; + result = VoidPointable.FACTORY.createPointable(); + tupleBuilder = new ArrayTupleBuilder(aggEvals.length); + first = true; + } + + @Override + public void open() throws HyracksDataException { + if (first) { + first = false; + initAccessAppendRef(ctx); + for (int i = 0; i < aggFactories.length; i++) { + aggEvals[i] = aggFactories[i].createAggregateEvaluator(ctx); + } + } + for (int i = 0; i < aggFactories.length; i++) { + aggEvals[i].init(); + } + super.open(); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + tAccess.reset(buffer); + int nTuple = tAccess.getTupleCount(); + for (int t = 0; t < nTuple; t++) { + tRef.reset(tAccess, t); + processTuple(tRef); + } + } + + @Override + public void close() throws HyracksDataException { + if (isOpen) { + try { + finishAggregates(false); + } finally { + super.close(); + } + } + } + + public void finishAggregates(boolean flushFrame) throws HyracksDataException { + tupleBuilder.reset(); + for (IAggregateEvaluator aggEval : aggEvals) { + aggEval.finish(result); + tupleBuilder.addField(result.getByteArray(), result.getStartOffset(), result.getLength()); + } + appendToFrameFromTupleBuilder(tupleBuilder, flushFrame); + } + + private void processTuple(FrameTupleReference tupleRef) throws HyracksDataException { + for (IAggregateEvaluator aggEval : aggEvals) { + aggEval.step(tupleRef); + } + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java index 1f9cb91..5a2be67 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java @@ -18,107 +18,38 @@ */ package org.apache.hyracks.algebricks.runtime.operators.aggreg; -import java.nio.ByteBuffer; - -import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.data.std.primitive.VoidPointable; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory { private static final long serialVersionUID = 1L; - // private int[] outColumns; - private IAggregateEvaluatorFactory[] aggregFactories; + private IAggregateEvaluatorFactory[] aggFactories; - public AggregateRuntimeFactory(IAggregateEvaluatorFactory[] aggregFactories) { + public AggregateRuntimeFactory(IAggregateEvaluatorFactory[] aggFactories) { super(null); - // this.outColumns = outColumns; - this.aggregFactories = aggregFactories; + this.aggFactories = aggFactories; + } + + @Override + public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) { + return new AggregatePushRuntime(aggFactories, ctx); } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("assign ["); - for (int i = 0; i < aggregFactories.length; i++) { + for (int i = 0; i < aggFactories.length; i++) { if (i > 0) { sb.append(", "); } - sb.append(aggregFactories[i]); + sb.append(aggFactories[i]); } sb.append("]"); return sb.toString(); - } - - @Override - public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) - throws HyracksDataException { - return new AbstractOneInputOneOutputOneFramePushRuntime() { - private IAggregateEvaluator[] aggregs = new IAggregateEvaluator[aggregFactories.length]; - private IPointable result = VoidPointable.FACTORY.createPointable(); - private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(aggregs.length); - - private boolean first = true; - - @Override - public void open() throws HyracksDataException { - if (first) { - first = false; - initAccessAppendRef(ctx); - for (int i = 0; i < aggregFactories.length; i++) { - aggregs[i] = aggregFactories[i].createAggregateEvaluator(ctx); - } - } - for (int i = 0; i < aggregFactories.length; i++) { - aggregs[i].init(); - } - super.open(); - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - tAccess.reset(buffer); - int nTuple = tAccess.getTupleCount(); - for (int t = 0; t < nTuple; t++) { - tRef.reset(tAccess, t); - processTuple(tRef); - } - - } - - @Override - public void close() throws HyracksDataException { - if (isOpen) { - try { - computeAggregate(); - appendToFrameFromTupleBuilder(tupleBuilder); - } finally { - super.close(); - } - } - } - - private void computeAggregate() throws HyracksDataException { - tupleBuilder.reset(); - for (int f = 0; f < aggregs.length; f++) { - aggregs[f].finish(result); - tupleBuilder.addField(result.getByteArray(), result.getStartOffset(), result.getLength()); - } - } - - private void processTuple(FrameTupleReference tupleRef) throws HyracksDataException { - for (int f = 0; f < aggregs.length; f++) { - aggregs[f].step(tupleRef); - } - } - }; } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java index 0ee7c1d..5fbea5a 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java @@ -58,7 +58,7 @@ final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length]; for (int i = 0; i < subplans.length; i++) { pipelines[i] = - (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], outputWriter, ctx); + (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], outputWriter, ctx, null); } return new IAggregatorDescriptor() { diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java index d14f5c1..417ad9e 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java @@ -68,8 +68,8 @@ IFrameWriter enforcedWriter = enforce ? EnforceFrameWriter.enforce(outputWriter) : outputWriter; final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length]; for (int i = 0; i < subplans.length; i++) { - pipelines[i] = - (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], enforcedWriter, ctx); + pipelines[i] = (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], enforcedWriter, + ctx, null); } final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder(); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java index 376a7a1..f5477ec 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java @@ -96,7 +96,7 @@ //TODO: refactoring is needed public static IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, - IHyracksTaskContext ctx) throws HyracksDataException { + IHyracksTaskContext ctx, Map<IPushRuntimeFactory, IPushRuntime> outRuntimeMap) throws HyracksDataException { // should enforce protocol boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); // plug the operators @@ -104,9 +104,10 @@ IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories(); RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors(); for (int i = runtimeFactories.length - 1; i >= 0; i--) { - IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx)[0]; - newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; start = enforce ? EnforceFrameWriter.enforce(start) : start; + IPushRuntimeFactory runtimeFactory = runtimeFactories[i]; + IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx); + IPushRuntime newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntimes[0]) : newRuntimes[0]; newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]); if (i > 0) { newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]); @@ -114,6 +115,9 @@ // the nts has the same input and output rec. desc. newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]); } + if (outRuntimeMap != null) { + outRuntimeMap.put(runtimeFactory, newRuntimes[0]); + } start = newRuntime; } return start; diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java index 832cb22..059f946 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java @@ -61,7 +61,7 @@ @Override public void flush() throws HyracksDataException { - writer.flush(); + appender.flush(writer); } } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java new file mode 100644 index 0000000..9adeb4d --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.win; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference; +import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor; + +/** + * Base class for window runtime implementations that compute nested aggregates + */ +abstract class AbstractWindowNestedPlansPushRuntime extends WindowMaterializingPushRuntime { + + final int nestedAggOutSchemaSize; + + private final WindowAggregatorDescriptorFactory nestedAggFactory; + + private IAggregatorDescriptor nestedAgg; + + AbstractWindowNestedPlansPushRuntime(int[] partitionColumns, + IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns, + IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, + WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) { + super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns, + runningAggOutColumns, runningAggFactories, ctx); + this.nestedAggFactory = nestedAggFactory; + this.nestedAggOutSchemaSize = nestedAggOutSchemaSize; + } + + @Override + protected void init() throws HyracksDataException { + super.init(); + nestedAgg = nestedAggFactory.createAggregator(ctx, null, null, null, null, null, -1); + } + + @Override + public void close() throws HyracksDataException { + super.close(); + nestedAgg.close(); + } + + @Override + protected ArrayTupleBuilder createOutputTupleBuilder(int[] projectionList) { + return new ArrayTupleBuilder(projectionList.length + nestedAggOutSchemaSize); + } + + /** + * Aggregator created by + * {@link WindowAggregatorDescriptorFactory#createAggregator(IHyracksTaskContext, RecordDescriptor, RecordDescriptor, int[], int[], long) + * WindowAggregatorDescriptorFactory.createAggregator(...)} + * does not process argument tuple in init() + */ + void nestedAggInit() throws HyracksDataException { + nestedAgg.init(null, null, -1, null); + } + + void nestedAggAggregate(FrameTupleAccessor tAccess, int tIndex) throws HyracksDataException { + nestedAgg.aggregate(tAccess, tIndex, null, -1, null); + } + + void nestedAggOutputFinalResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException { + nestedAgg.outputFinalResult(outTupleBuilder, null, -1, null); + } + + void nestedAggOutputPartialResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException { + nestedAgg.outputPartialResult(outTupleBuilder, null, -1, null); + } + + static IScalarEvaluator[] createEvaluators(IScalarEvaluatorFactory[] evalFactories, IHyracksTaskContext ctx) + throws HyracksDataException { + IScalarEvaluator[] evals = new IScalarEvaluator[evalFactories.length]; + for (int i = 0; i < evalFactories.length; i++) { + evals[i] = evalFactories[i].createScalarEvaluator(ctx); + } + return evals; + } + + static void evaluate(IScalarEvaluator[] evals, IFrameTupleReference inTuple, PointableTupleReference outTuple) + throws HyracksDataException { + for (int i = 0; i < evals.length; i++) { + evals[i].evaluate(inTuple, outTuple.getField(i)); + } + } + + static PointableTupleReference createPointables(int ln) { + IPointable[] pointables = new IPointable[ln]; + for (int i = 0; i < ln; i++) { + pointables[i] = VoidPointable.FACTORY.createPointable(); + } + return new PointableTupleReference(pointables); + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java new file mode 100644 index 0000000..53857ac --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.win; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; + +/** + * Base class for window runtime factories that compute nested aggregates + */ +abstract class AbstractWindowNestedPlansRuntimeFactory extends AbstractWindowRuntimeFactory { + + private static final long serialVersionUID = 1L; + + final int nestedAggOutSchemaSize; + + final WindowAggregatorDescriptorFactory nestedAggFactory; + + AbstractWindowNestedPlansRuntimeFactory(int[] partitionColumns, + IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumnsExcludingSubplans, + int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, + int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory) { + super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, + projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories); + this.nestedAggFactory = nestedAggFactory; + this.nestedAggOutSchemaSize = nestedAggOutSchemaSize; + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java index 6177723..4cf5ec5 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java @@ -19,7 +19,13 @@ package org.apache.hyracks.algebricks.runtime.operators.win; +import java.util.HashMap; +import java.util.Map; + import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.aggreg.AggregatePushRuntime; import org.apache.hyracks.algebricks.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory; import org.apache.hyracks.algebricks.runtime.operators.meta.PipelineAssembler; import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime; @@ -40,10 +46,16 @@ private static final long serialVersionUID = 1L; - private AlgebricksPipeline[] subplans; + private final AlgebricksPipeline[] subplans; + + private boolean partialOutputEnabled; public WindowAggregatorDescriptorFactory(AlgebricksPipeline[] subplans) { this.subplans = subplans; + } + + public void setPartialOutputEnabled(boolean value) { + partialOutputEnabled = value; } @Override @@ -53,9 +65,26 @@ NestedPlansAccumulatingAggregatorFactory.AggregatorOutput outputWriter = new NestedPlansAccumulatingAggregatorFactory.AggregatorOutput(subplans, 0); NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length]; + + Map<IPushRuntimeFactory, IPushRuntime> pipelineRuntimeMap = partialOutputEnabled ? new HashMap<>() : null; + AggregatePushRuntime[] aggs = partialOutputEnabled ? new AggregatePushRuntime[subplans.length] : null; + for (int i = 0; i < subplans.length; i++) { - pipelines[i] = - (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], outputWriter, ctx); + AlgebricksPipeline subplan = subplans[i]; + if (pipelineRuntimeMap != null) { + pipelineRuntimeMap.clear(); + } + pipelines[i] = (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplan, outputWriter, ctx, + pipelineRuntimeMap); + if (pipelineRuntimeMap != null) { + IPushRuntimeFactory[] subplanFactories = subplan.getRuntimeFactories(); + IPushRuntimeFactory aggFactory = subplanFactories[subplanFactories.length - 1]; + AggregatePushRuntime agg = (AggregatePushRuntime) pipelineRuntimeMap.get(aggFactory); + if (agg == null) { + throw new IllegalStateException(); + } + aggs[i] = agg; + } } return new IAggregatorDescriptor() { @@ -64,7 +93,6 @@ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state) throws HyracksDataException { outputWriter.getTupleBuilder().reset(); - for (NestedTupleSourceRuntime pipeline : pipelines) { pipeline.open(); } @@ -91,6 +119,30 @@ return true; } + /** + * This method is called when evaluating accumulating frames. + * It emits current result of the aggregates but does not close pipelines, so aggregation can continue. + * This method may be called several times. + * {@link #outputFinalResult(ArrayTupleBuilder, IFrameTupleAccessor, int, AggregateState)} + * should be called at the end to emit the last value and close all pipelines + */ + @Override + public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, + AggregateState state) throws HyracksDataException { + if (aggs == null) { + throw new UnsupportedOperationException(); + } + for (int i = 0; i < pipelines.length; i++) { + outputWriter.setInputIdx(i); + pipelines[i].flush(); + aggs[i].finishAggregates(true); + } + memoryUsageCheck(); + TupleUtils.addFields(outputWriter.getTupleBuilder(), tupleBuilder); + outputWriter.getTupleBuilder().reset(); + return true; + } + @Override public AggregateState createAggregateStates() { return null; @@ -98,12 +150,6 @@ @Override public void reset() { - } - - @Override - public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, - AggregateState state) { - throw new UnsupportedOperationException(); } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java index 565cbe6..cb4f534 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java @@ -35,21 +35,18 @@ import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.data.std.util.DataUtils; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; -import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference; import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; -import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor; import org.apache.hyracks.storage.common.MultiComparator; /** * Runtime for window operators that performs partition materialization and can evaluate running aggregates * as well as regular aggregates (in nested plans) over window frames. */ -public class WindowNestedPlansPushRuntime extends WindowMaterializingPushRuntime { +class WindowNestedPlansPushRuntime extends AbstractWindowNestedPlansPushRuntime { private final boolean frameValueExists; @@ -109,12 +106,6 @@ private final int frameMaxObjects; - private final int nestedAggOutSchemaSize; - - private final WindowAggregatorDescriptorFactory nestedAggFactory; - - private IAggregatorDescriptor nestedAgg; - private IFrame copyFrame2; private IFrame runFrame; @@ -145,7 +136,7 @@ int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns, - runningAggOutColumns, runningAggFactories, ctx); + runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx); this.frameValueEvalFactories = frameValueEvalFactories; this.frameValueExists = frameValueEvalFactories != null && frameValueEvalFactories.length > 0; this.frameStartEvalFactories = frameStartEvalFactories; @@ -162,8 +153,6 @@ this.frameOffsetEvalFactory = frameOffsetEvalFactory; this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory; this.frameMaxObjects = frameMaxObjects; - this.nestedAggFactory = nestedAggFactory; - this.nestedAggOutSchemaSize = nestedAggOutSchemaSize; } @Override @@ -194,8 +183,6 @@ frameOffsetPointable = VoidPointable.FACTORY.createPointable(); bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx); } - - nestedAgg = nestedAggFactory.createAggregator(ctx, null, null, null, null, null, -1); runFrame = new VSizeFrame(ctx); copyFrame2 = new VSizeFrame(ctx); @@ -256,8 +243,7 @@ } int toWrite = frameMaxObjects; - // aggregator created by WindowAggregatorDescriptorFactory does not process argument tuple in init() - nestedAgg.init(null, null, -1, null); + nestedAggInit(); int chunkIdxInnerStart = frameStartForward ? chunkIdxFrameStartGlobal : 0; int tBeginIdxInnerStart = frameStartForward ? tBeginIdxFrameStartGlobal : -1; @@ -334,7 +320,7 @@ } if (toWrite != 0) { - nestedAgg.aggregate(tAccess2, tIdxInner, null, -1, null); + nestedAggAggregate(tAccess2, tIdxInner); } if (toWrite > 0) { toWrite--; @@ -345,10 +331,11 @@ } } - nestedAgg.outputFinalResult(tupleBuilder, null, -1, null); + nestedAggOutputFinalResult(tupleBuilder); appendToFrameFromTupleBuilder(tupleBuilder); if (frameStartIsMonotonic) { + frameStartForward = true; if (chunkIdxFrameStartLocal >= 0) { chunkIdxFrameStartGlobal = chunkIdxFrameStartLocal; tBeginIdxFrameStartGlobal = tBeginIdxFrameStartLocal; @@ -380,34 +367,5 @@ } } return true; - } - - @Override - protected ArrayTupleBuilder createOutputTupleBuilder(int[] projectionList) { - return new ArrayTupleBuilder(projectionList.length + nestedAggOutSchemaSize); - } - - private static IScalarEvaluator[] createEvaluators(IScalarEvaluatorFactory[] evalFactories, IHyracksTaskContext ctx) - throws HyracksDataException { - IScalarEvaluator[] evals = new IScalarEvaluator[evalFactories.length]; - for (int i = 0; i < evalFactories.length; i++) { - evals[i] = evalFactories[i].createScalarEvaluator(ctx); - } - return evals; - } - - private static void evaluate(IScalarEvaluator[] evals, IFrameTupleReference inTuple, - PointableTupleReference outTuple) throws HyracksDataException { - for (int i = 0; i < evals.length; i++) { - evals[i].evaluate(inTuple, outTuple.getField(i)); - } - } - - private static PointableTupleReference createPointables(int ln) { - IPointable[] pointables = new IPointable[ln]; - for (int i = 0; i < ln; i++) { - pointables[i] = VoidPointable.FACTORY.createPointable(); - } - return new PointableTupleReference(pointables); } } \ No newline at end of file diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java new file mode 100644 index 0000000..e550d65 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.win; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; +import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference; +import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; +import org.apache.hyracks.storage.common.MultiComparator; + +/** + * Optimized runtime for window operators that performs partition materialization and can evaluate running aggregates + * as well as regular aggregates (in nested plans) over accumulating window frames + * (unbounded preceding to current row or N following). + */ +class WindowNestedPlansRunningPushRuntime extends AbstractWindowNestedPlansPushRuntime { + + private final IScalarEvaluatorFactory[] frameValueEvalFactories; + + private IScalarEvaluator[] frameValueEvals; + + private PointableTupleReference frameValuePointables; + + private final IBinaryComparatorFactory[] frameValueComparatorFactories; + + private MultiComparator frameValueComparators; + + private final IScalarEvaluatorFactory[] frameEndEvalFactories; + + private IScalarEvaluator[] frameEndEvals; + + private PointableTupleReference frameEndPointables; + + private final int frameMaxObjects; + + private IFrame copyFrame2; + + private IFrame runFrame; + + private int runFrameChunkId; + + private long runFrameSize; + + private FrameTupleAccessor tAccess2; + + private FrameTupleReference tRef2; + + private int chunkIdxFrameEndGlobal; + + private int tBeginIdxFrameEndGlobal; + + private long readerPosFrameEndGlobal; + + private int toWrite; + + WindowNestedPlansRunningPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories, + IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameEndEvalFactories, + int frameMaxObjects, int[] projectionColumns, int[] runningAggOutColumns, + IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, + WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) { + super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns, + runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx); + this.frameValueEvalFactories = frameValueEvalFactories; + this.frameEndEvalFactories = frameEndEvalFactories; + this.frameValueComparatorFactories = frameValueComparatorFactories; + this.frameMaxObjects = frameMaxObjects; + } + + @Override + protected void init() throws HyracksDataException { + super.init(); + + frameValueEvals = createEvaluators(frameValueEvalFactories, ctx); + frameValueComparators = MultiComparator.create(frameValueComparatorFactories); + frameValuePointables = createPointables(frameValueEvalFactories.length); + frameEndEvals = createEvaluators(frameEndEvalFactories, ctx); + frameEndPointables = createPointables(frameEndEvalFactories.length); + + runFrame = new VSizeFrame(ctx); + copyFrame2 = new VSizeFrame(ctx); + tAccess2 = new FrameTupleAccessor(inputRecordDesc); + tRef2 = new FrameTupleReference(); + } + + @Override + protected void beginPartitionImpl() throws HyracksDataException { + super.beginPartitionImpl(); + nestedAggInit(); + chunkIdxFrameEndGlobal = 0; + tBeginIdxFrameEndGlobal = -1; + readerPosFrameEndGlobal = 0; + runFrameChunkId = -1; + toWrite = frameMaxObjects; + } + + @Override + protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException { + long readerPos = -1; + int nChunks = getPartitionChunkCount(); + if (nChunks > 1) { + readerPos = reader.position(); + if (chunkIdx == 0) { + ByteBuffer curFrameBuffer = curFrame.getBuffer(); + int pos = curFrameBuffer.position(); + copyFrame2.ensureFrameSize(curFrameBuffer.capacity()); + FrameUtils.copyAndFlip(curFrameBuffer, copyFrame2.getBuffer()); + curFrameBuffer.position(pos); + } + } + + boolean isLastChunk = chunkIdx == nChunks - 1; + + tAccess.reset(curFrame.getBuffer()); + int tBeginIdx = getTupleBeginIdx(chunkIdx); + int tEndIdx = getTupleEndIdx(chunkIdx); + for (int tIdx = tBeginIdx; tIdx <= tEndIdx; tIdx++) { + tRef.reset(tAccess, tIdx); + + // running aggregates + produceTuple(tupleBuilder, tAccess, tIdx, tRef); + + // frame boundaries + evaluate(frameEndEvals, tRef, frameEndPointables); + + int chunkIdxInnerStart = chunkIdxFrameEndGlobal; + int tBeginIdxInnerStart = tBeginIdxFrameEndGlobal; + if (nChunks > 1) { + reader.seek(readerPosFrameEndGlobal); + } + + int chunkIdxFrameEndLocal = -1, tBeginIdxFrameEndLocal = -1; + long readerPosFrameEndLocal = -1; + + frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) { + long readerPosFrameInner; + IFrame frameInner; + if (chunkIdxInner == 0) { + // first chunk's frame is always in memory + frameInner = chunkIdx == 0 ? curFrame : copyFrame2; + readerPosFrameInner = 0; + } else { + readerPosFrameInner = reader.position(); + if (runFrameChunkId == chunkIdxInner) { + // runFrame has this chunk, so just advance the reader + reader.seek(readerPosFrameInner + runFrameSize); + } else { + reader.nextFrame(runFrame); + runFrameSize = reader.position() - readerPosFrameInner; + runFrameChunkId = chunkIdxInner; + } + frameInner = runFrame; + } + tAccess2.reset(frameInner.getBuffer()); + + int tBeginIdxInner; + if (tBeginIdxInnerStart < 0) { + tBeginIdxInner = getTupleBeginIdx(chunkIdxInner); + } else { + tBeginIdxInner = tBeginIdxInnerStart; + tBeginIdxInnerStart = -1; + } + int tEndIdxInner = getTupleEndIdx(chunkIdxInner); + + for (int tIdxInner = tBeginIdxInner; tIdxInner <= tEndIdxInner && toWrite != 0; tIdxInner++) { + tRef2.reset(tAccess2, tIdxInner); + + evaluate(frameValueEvals, tRef2, frameValuePointables); + if (frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) { + // save position of the tuple that matches the frame end. + // we'll continue from it in the next outer iteration + chunkIdxFrameEndLocal = chunkIdxInner; + tBeginIdxFrameEndLocal = tIdxInner; + readerPosFrameEndLocal = readerPosFrameInner; + + // skip and exit if value > end + break frame_loop; + } + + nestedAggAggregate(tAccess2, tIdxInner); + + if (toWrite > 0) { + toWrite--; + } + } + } + + boolean isLastTuple = isLastChunk && tIdx == tEndIdx; + if (isLastTuple) { + nestedAggOutputFinalResult(tupleBuilder); + } else { + nestedAggOutputPartialResult(tupleBuilder); + } + appendToFrameFromTupleBuilder(tupleBuilder); + + if (chunkIdxFrameEndLocal >= 0) { + chunkIdxFrameEndGlobal = chunkIdxFrameEndLocal; + tBeginIdxFrameEndGlobal = tBeginIdxFrameEndLocal; + readerPosFrameEndGlobal = readerPosFrameEndLocal; + } else { + // could not find the end, set beyond the last chunk + chunkIdxFrameEndGlobal = nChunks; + tBeginIdxFrameEndGlobal = 0; + readerPosFrameEndGlobal = 0; + } + } + + if (nChunks > 1) { + reader.seek(readerPos); + } + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java new file mode 100644 index 0000000..ddeaf2b --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.win; + +import java.util.Arrays; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; + +/** + * Optimized runtime for window operators that performs partition materialization and can evaluate running aggregates + * as well as regular aggregates (in nested plans) over accumulating window frames + * (unbounded preceding to current row or N following). + */ +public class WindowNestedPlansRunningRuntimeFactory extends AbstractWindowNestedPlansRuntimeFactory { + + private static final long serialVersionUID = 1L; + + private final IScalarEvaluatorFactory[] frameValueEvalFactories; + + private final IBinaryComparatorFactory[] frameValueComparatorFactories; + + private final IScalarEvaluatorFactory[] frameEndEvalFactories; + + private final int frameMaxObjects; + + public WindowNestedPlansRunningRuntimeFactory(int[] partitionColumns, + IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories, + IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameEndEvalFactories, + int frameMaxObjects, int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns, + IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, + WindowAggregatorDescriptorFactory nestedAggFactory) { + super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, + projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, + nestedAggFactory); + this.frameValueEvalFactories = frameValueEvalFactories; + this.frameValueComparatorFactories = frameValueComparatorFactories; + this.frameEndEvalFactories = frameEndEvalFactories; + this.frameMaxObjects = frameMaxObjects; + } + + @Override + public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) { + return new WindowNestedPlansRunningPushRuntime(partitionColumns, partitionComparatorFactories, + orderComparatorFactories, frameValueEvalFactories, frameValueComparatorFactories, frameEndEvalFactories, + frameMaxObjects, projectionList, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, + nestedAggFactory, ctx); + } + + @Override + public String toString() { + return "window [nested-running] (" + Arrays.toString(partitionColumns) + ") " + + Arrays.toString(runningAggOutColumns) + " := " + Arrays.toString(runningAggFactories); + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java index 16591d5..f754b91 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java @@ -32,19 +32,19 @@ * Runtime factory for window operators that performs partition materialization and can evaluate running aggregates * as well as regular aggregates (in nested plans) over window frames. */ -public class WindowNestedPlansRuntimeFactory extends AbstractWindowRuntimeFactory { +public class WindowNestedPlansRuntimeFactory extends AbstractWindowNestedPlansRuntimeFactory { private static final long serialVersionUID = 1L; private final IScalarEvaluatorFactory[] frameValueEvalFactories; + + private final IBinaryComparatorFactory[] frameValueComparatorFactories; private final IScalarEvaluatorFactory[] frameStartEvalFactories; private final boolean frameStartIsMonotonic; private final IScalarEvaluatorFactory[] frameEndEvalFactories; - - private final IBinaryComparatorFactory[] frameValueComparatorFactories; private final IScalarEvaluatorFactory[] frameExcludeEvalFactories; @@ -58,10 +58,6 @@ private final int frameMaxObjects; - private final int nestedAggOutSchemaSize; - - private final WindowAggregatorDescriptorFactory nestedAggFactory; - public WindowNestedPlansRuntimeFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories, @@ -74,20 +70,19 @@ IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, - projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories); + projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, + nestedAggFactory); this.frameValueEvalFactories = frameValueEvalFactories; + this.frameValueComparatorFactories = frameValueComparatorFactories; this.frameStartEvalFactories = frameStartEvalFactories; this.frameStartIsMonotonic = frameStartIsMonotonic; this.frameEndEvalFactories = frameEndEvalFactories; - this.frameValueComparatorFactories = frameValueComparatorFactories; this.frameExcludeEvalFactories = frameExcludeEvalFactories; this.frameExcludeComparatorFactories = frameExcludeComparatorFactories; this.frameExcludeNegationStartIdx = frameExcludeNegationStartIdx; this.frameOffsetEvalFactory = frameOffsetEvalFactory; this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory; this.frameMaxObjects = frameMaxObjects; - this.nestedAggFactory = nestedAggFactory; - this.nestedAggOutSchemaSize = nestedAggOutSchemaSize; } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java index 4ceda1e..b25a36c 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java @@ -29,7 +29,6 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; import org.apache.hyracks.dataflow.common.utils.TupleUtils; -import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor; /** * Optimized runtime for window operators that performs partition materialization and can evaluate running aggregates @@ -44,13 +43,7 @@ * <li>no frame offset</li> * </ul> */ -public class WindowNestedPlansUnboundedPushRuntime extends WindowMaterializingPushRuntime { - - private final int nestedAggOutSchemaSize; - - private final WindowAggregatorDescriptorFactory nestedAggFactory; - - private IAggregatorDescriptor nestedAgg; +class WindowNestedPlansUnboundedPushRuntime extends AbstractWindowNestedPlansPushRuntime { private ArrayTupleBuilder nestedAggOutputBuilder; @@ -64,24 +57,20 @@ int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns, - runningAggOutColumns, runningAggFactories, ctx); + runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx); this.frameMaxObjects = frameMaxObjects; - this.nestedAggFactory = nestedAggFactory; - this.nestedAggOutSchemaSize = nestedAggOutSchemaSize; } @Override protected void init() throws HyracksDataException { super.init(); - nestedAgg = nestedAggFactory.createAggregator(ctx, null, null, null, null, null, -1); nestedAggOutputBuilder = new ArrayTupleBuilder(nestedAggOutSchemaSize); } @Override protected void beginPartitionImpl() throws HyracksDataException { super.beginPartitionImpl(); - // aggregator created by WindowAggregatorDescriptorFactory does not process argument tuple in init() - nestedAgg.init(null, null, -1, null); + nestedAggInit(); nestedAggOutputBuilder.reset(); toWrite = frameMaxObjects; } @@ -92,7 +81,7 @@ super.partitionChunkImpl(frameId, frameBuffer, tBeginIdx, tEndIdx); tAccess.reset(frameBuffer); for (int t = tBeginIdx; t <= tEndIdx && toWrite != 0; t++) { - nestedAgg.aggregate(tAccess, t, null, -1, null); + nestedAggAggregate(tAccess, t); if (toWrite > 0) { toWrite--; } @@ -101,7 +90,7 @@ @Override protected void endPartitionImpl() throws HyracksDataException { - nestedAgg.outputFinalResult(nestedAggOutputBuilder, null, -1, null); + nestedAggOutputFinalResult(nestedAggOutputBuilder); super.endPartitionImpl(); } @@ -110,10 +99,5 @@ FrameTupleReference tupleRef) throws HyracksDataException { super.produceTuple(tb, accessor, tIndex, tupleRef); TupleUtils.addFields(nestedAggOutputBuilder, tb); - } - - @Override - protected ArrayTupleBuilder createOutputTupleBuilder(int[] projectionList) { - return new ArrayTupleBuilder(projectionList.length + nestedAggOutSchemaSize); } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java index f89a8e5..0f7d9cf 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java @@ -30,15 +30,11 @@ * Optimized runtime for window operators that performs partition materialization and can evaluate running aggregates * as well as regular aggregates (in nested plans) over <b>unbounded</b> window frames. */ -public final class WindowNestedPlansUnboundedRuntimeFactory extends AbstractWindowRuntimeFactory { +public final class WindowNestedPlansUnboundedRuntimeFactory extends AbstractWindowNestedPlansRuntimeFactory { private static final long serialVersionUID = 1L; private final int frameMaxObjects; - - private final int nestedAggOutSchemaSize; - - private final WindowAggregatorDescriptorFactory nestedAggFactory; public WindowNestedPlansUnboundedRuntimeFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, @@ -47,10 +43,9 @@ IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, - projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories); + projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, + nestedAggFactory); this.frameMaxObjects = frameMaxObjects; - this.nestedAggFactory = nestedAggFactory; - this.nestedAggOutSchemaSize = nestedAggOutSchemaSize; } @Override -- To view, visit https://asterix-gerrit.ics.uci.edu/3151 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I8d1574defc73076ad960c4067432da29ead160a5 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Dmitry Lychagin <dmitry.lycha...@couchbase.com>