seawinde commented on code in PR #62607:
URL: https://github.com/apache/doris/pull/62607#discussion_r3263216045
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewWindowRule.java:
##########
@@ -104,20 +117,141 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
StructInfo queryStructInf
viewToQuerySlotMapping,
ImmutableMap.of(), cascadesContext
);
- // Can not rewrite, bail out
+ // If generic rewrite fails, try roll up from query expressions.
if (expressionsRewritten.isEmpty()) {
- materializationContext.recordFailReason(queryStructInfo,
- "Rewrite expressions by view in window scan fail",
- () -> String.format("expressionToRewritten is %s,\n
mvExprToMvScanExprMapping is %s,\n"
- + "targetToSourceMapping = %s",
queryStructInfo.getExpressions(),
-
materializationContext.getShuttledExprToScanExprMapping(),
- viewToQuerySlotMapping));
- return null;
+ expressionsRewritten =
rollupWindowAggregateFunctions(queryStructInfo.getExpressions(),
+ queryStructInfo.getTopPlan(),
mvExprToMvScanExprQueryBased, true, false);
+ if (expressionsRewritten.isEmpty()) {
+ materializationContext.recordFailReason(queryStructInfo,
+ "Rewrite expressions by view in window scan fail",
+ () -> String.format("expressionToRewritten is %s,\n
mvExprToMvScanExprMapping is %s,\n"
+ + "targetToSourceMapping = %s",
queryStructInfo.getExpressions(),
+
materializationContext.getShuttledExprToScanExprMapping(),
+ viewToQuerySlotMapping));
+ return null;
+ }
}
return new LogicalProject<>(
expressionsRewritten.stream()
.map(expression -> expression instanceof
NamedExpression ? expression : new Alias(expression))
.map(NamedExpression.class::cast)
.collect(Collectors.toList()), tempRewrittenPlan);
}
+
+ private static List<Expression> rollupWindowAggregateFunctions(List<?
extends Expression> expressions,
+ Plan queryTopPlan, Map<Expression, Expression>
mvExprToMvScanExprQueryBased,
+ boolean needShuttle, boolean strictSlotRewrite) {
+ WindowAggregateRollupContext context = new
WindowAggregateRollupContext(queryTopPlan,
+ mvExprToMvScanExprQueryBased, strictSlotRewrite);
+ List<? extends Expression> inputExpressions = needShuttle
+ ? ExpressionUtils.shuttleExpressionWithLineage(expressions,
queryTopPlan)
+ : expressions;
+ List<Expression> rewrittenExpressions = inputExpressions.stream()
+ .map(expression ->
expression.accept(WindowAggregateRollupRewriter.INSTANCE, context))
+ .collect(Collectors.toList());
+ return context.isValid() ? rewrittenExpressions : ImmutableList.of();
+ }
+
+ private static Function rollupWindowAggregateFunction(AggregateFunction
queryAggregateFunction,
+ Expression queryAggregateFunctionShuttled, Map<Expression,
Expression> mvExprToMvScanExprQueryBased) {
+ for (Map.Entry<Expression, Expression> expressionEntry :
mvExprToMvScanExprQueryBased.entrySet()) {
+ Expression viewExpression = expressionEntry.getKey();
+ // Window mapping keys may be full WindowExpression while rollup
handlers match aggregate functions.
+ if (viewExpression instanceof WindowExpression) {
+ viewExpression = ((WindowExpression)
viewExpression).getFunction();
+ }
+ Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair =
Pair.of(viewExpression,
+ expressionEntry.getValue());
+ for (AggFunctionRollUpHandler rollUpHandler :
AbstractMaterializedViewAggregateRule.ROLL_UP_HANDLERS) {
+ if (!rollUpHandler.canRollup(queryAggregateFunction,
queryAggregateFunctionShuttled,
+ mvExprToMvScanExprQueryBasedPair,
mvExprToMvScanExprQueryBased)) {
+ continue;
+ }
+ Function rollupFunction =
rollUpHandler.doRollup(queryAggregateFunction,
+ queryAggregateFunctionShuttled,
mvExprToMvScanExprQueryBasedPair,
+ mvExprToMvScanExprQueryBased);
+ if (rollupFunction != null) {
+ return rollupFunction;
+ }
+ }
+ }
+ return null;
+ }
+
+ private static class WindowAggregateRollupRewriter
+ extends DefaultExpressionRewriter<WindowAggregateRollupContext> {
+
+ private static final WindowAggregateRollupRewriter INSTANCE = new
WindowAggregateRollupRewriter();
+
+ @Override
+ public Expression visitWindow(WindowExpression windowExpression,
WindowAggregateRollupContext context) {
+ if (!context.isValid()) {
+ return windowExpression;
+ }
+ Expression rewrittenWindowExpr =
context.getMvExprToMvScanExprQueryBased().get(windowExpression);
+ if (rewrittenWindowExpr != null) {
+ return rewrittenWindowExpr;
+ }
+ Expression function = windowExpression.getFunction();
+ if (!(function instanceof AggregateFunction)) {
+ return super.visitWindow(windowExpression, context);
+ }
+ Expression queryFunctionShuttled =
ExpressionUtils.shuttleExpressionWithLineage(function,
+ context.getQueryTopPlan());
+ Function rewrittenFunction =
rollupWindowAggregateFunction((AggregateFunction) function,
+ queryFunctionShuttled,
context.getMvExprToMvScanExprQueryBased());
+ if (rewrittenFunction == null) {
+ context.setValid(false);
+ return windowExpression;
+ }
+ return
super.visitWindow(windowExpression.withFunction(rewrittenFunction), context);
Review Comment:
This seems to keep the expression as a WindowExpression after rollup. The
PR description says the rewrite result should be xx_merge(mv_col), but this
code builds xx_merge(mv_col) OVER (...). That can
compute a new window over the already-windowed MV column instead of using
the precomputed MV result directly, which may double-aggregate state values.
Should this return the rolled-up scalar function
instead of windowExpression.withFunction(...)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]