XnY-wei commented on code in PR #62607:
URL: https://github.com/apache/doris/pull/62607#discussion_r3308382785


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewWindowRule.java:
##########
@@ -104,20 +117,141 @@ protected Plan rewriteQueryByView(MatchMode matchMode, 
StructInfo queryStructInf
                 viewToQuerySlotMapping,
                 ImmutableMap.of(), cascadesContext
         );
-        // Can not rewrite, bail out
+        // If generic rewrite fails, try roll up from query expressions.
         if (expressionsRewritten.isEmpty()) {
-            materializationContext.recordFailReason(queryStructInfo,
-                    "Rewrite expressions by view in window scan fail",
-                    () -> String.format("expressionToRewritten is %s,\n 
mvExprToMvScanExprMapping is %s,\n"
-                                    + "targetToSourceMapping = %s", 
queryStructInfo.getExpressions(),
-                            
materializationContext.getShuttledExprToScanExprMapping(),
-                            viewToQuerySlotMapping));
-            return null;
+            expressionsRewritten = 
rollupWindowAggregateFunctions(queryStructInfo.getExpressions(),
+                    queryStructInfo.getTopPlan(), 
mvExprToMvScanExprQueryBased, true, false);
+            if (expressionsRewritten.isEmpty()) {
+                materializationContext.recordFailReason(queryStructInfo,
+                        "Rewrite expressions by view in window scan fail",
+                        () -> String.format("expressionToRewritten is %s,\n 
mvExprToMvScanExprMapping is %s,\n"
+                                        + "targetToSourceMapping = %s", 
queryStructInfo.getExpressions(),
+                                
materializationContext.getShuttledExprToScanExprMapping(),
+                                viewToQuerySlotMapping));
+                return null;
+            }
         }
         return new LogicalProject<>(
                 expressionsRewritten.stream()
                         .map(expression -> expression instanceof 
NamedExpression ? expression : new Alias(expression))
                         .map(NamedExpression.class::cast)
                         .collect(Collectors.toList()), tempRewrittenPlan);
     }
+
+    private static List<Expression> rollupWindowAggregateFunctions(List<? 
extends Expression> expressions,
+            Plan queryTopPlan, Map<Expression, Expression> 
mvExprToMvScanExprQueryBased,
+            boolean needShuttle, boolean strictSlotRewrite) {
+        WindowAggregateRollupContext context = new 
WindowAggregateRollupContext(queryTopPlan,
+                mvExprToMvScanExprQueryBased, strictSlotRewrite);
+        List<? extends Expression> inputExpressions = needShuttle
+                ? ExpressionUtils.shuttleExpressionWithLineage(expressions, 
queryTopPlan)
+                : expressions;
+        List<Expression> rewrittenExpressions = inputExpressions.stream()
+                .map(expression -> 
expression.accept(WindowAggregateRollupRewriter.INSTANCE, context))
+                .collect(Collectors.toList());
+        return context.isValid() ? rewrittenExpressions : ImmutableList.of();
+    }
+
+    private static Function rollupWindowAggregateFunction(AggregateFunction 
queryAggregateFunction,
+            Expression queryAggregateFunctionShuttled, Map<Expression, 
Expression> mvExprToMvScanExprQueryBased) {
+        for (Map.Entry<Expression, Expression> expressionEntry : 
mvExprToMvScanExprQueryBased.entrySet()) {
+            Expression viewExpression = expressionEntry.getKey();
+            // Window mapping keys may be full WindowExpression while rollup 
handlers match aggregate functions.
+            if (viewExpression instanceof WindowExpression) {
+                viewExpression = ((WindowExpression) 
viewExpression).getFunction();
+            }
+            Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair = 
Pair.of(viewExpression,
+                    expressionEntry.getValue());
+            for (AggFunctionRollUpHandler rollUpHandler : 
AbstractMaterializedViewAggregateRule.ROLL_UP_HANDLERS) {
+                if (!rollUpHandler.canRollup(queryAggregateFunction, 
queryAggregateFunctionShuttled,
+                        mvExprToMvScanExprQueryBasedPair, 
mvExprToMvScanExprQueryBased)) {
+                    continue;
+                }
+                Function rollupFunction = 
rollUpHandler.doRollup(queryAggregateFunction,
+                        queryAggregateFunctionShuttled, 
mvExprToMvScanExprQueryBasedPair,
+                        mvExprToMvScanExprQueryBased);
+                if (rollupFunction != null) {
+                    return rollupFunction;
+                }
+            }
+        }
+        return null;
+    }
+
+    private static class WindowAggregateRollupRewriter
+            extends DefaultExpressionRewriter<WindowAggregateRollupContext> {
+
+        private static final WindowAggregateRollupRewriter INSTANCE = new 
WindowAggregateRollupRewriter();
+
+        @Override
+        public Expression visitWindow(WindowExpression windowExpression, 
WindowAggregateRollupContext context) {
+            if (!context.isValid()) {
+                return windowExpression;
+            }
+            Expression rewrittenWindowExpr = 
context.getMvExprToMvScanExprQueryBased().get(windowExpression);
+            if (rewrittenWindowExpr != null) {
+                return rewrittenWindowExpr;
+            }
+            Expression function = windowExpression.getFunction();
+            if (!(function instanceof AggregateFunction)) {
+                return super.visitWindow(windowExpression, context);
+            }
+            Expression queryFunctionShuttled = 
ExpressionUtils.shuttleExpressionWithLineage(function,
+                    context.getQueryTopPlan());
+            Function rewrittenFunction = 
rollupWindowAggregateFunction((AggregateFunction) function,
+                    queryFunctionShuttled, 
context.getMvExprToMvScanExprQueryBased());
+            if (rewrittenFunction == null) {
+                context.setValid(false);
+                return windowExpression;
+            }
+            return 
super.visitWindow(windowExpression.withFunction(rewrittenFunction), context);

Review Comment:
   visitWindow now returns the rollup scalar directly (e.g. 
ndv_merge(cum_uv_state)), not a WindowExpression. The MV column is treated as 
the precomputed window output, so the rewritten plan is a LogicalProject over 
the MV scan without a second LogicalWindow / VANALYTIC on that column.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewWindowRule.java:
##########
@@ -104,20 +117,141 @@ protected Plan rewriteQueryByView(MatchMode matchMode, 
StructInfo queryStructInf
                 viewToQuerySlotMapping,
                 ImmutableMap.of(), cascadesContext
         );
-        // Can not rewrite, bail out
+        // If generic rewrite fails, try roll up from query expressions.
         if (expressionsRewritten.isEmpty()) {
-            materializationContext.recordFailReason(queryStructInfo,
-                    "Rewrite expressions by view in window scan fail",
-                    () -> String.format("expressionToRewritten is %s,\n 
mvExprToMvScanExprMapping is %s,\n"
-                                    + "targetToSourceMapping = %s", 
queryStructInfo.getExpressions(),
-                            
materializationContext.getShuttledExprToScanExprMapping(),
-                            viewToQuerySlotMapping));
-            return null;
+            expressionsRewritten = 
rollupWindowAggregateFunctions(queryStructInfo.getExpressions(),
+                    queryStructInfo.getTopPlan(), 
mvExprToMvScanExprQueryBased, true, false);
+            if (expressionsRewritten.isEmpty()) {
+                materializationContext.recordFailReason(queryStructInfo,
+                        "Rewrite expressions by view in window scan fail",
+                        () -> String.format("expressionToRewritten is %s,\n 
mvExprToMvScanExprMapping is %s,\n"
+                                        + "targetToSourceMapping = %s", 
queryStructInfo.getExpressions(),
+                                
materializationContext.getShuttledExprToScanExprMapping(),
+                                viewToQuerySlotMapping));
+                return null;
+            }
         }
         return new LogicalProject<>(
                 expressionsRewritten.stream()
                         .map(expression -> expression instanceof 
NamedExpression ? expression : new Alias(expression))
                         .map(NamedExpression.class::cast)
                         .collect(Collectors.toList()), tempRewrittenPlan);
     }
+
+    private static List<Expression> rollupWindowAggregateFunctions(List<? 
extends Expression> expressions,
+            Plan queryTopPlan, Map<Expression, Expression> 
mvExprToMvScanExprQueryBased,
+            boolean needShuttle, boolean strictSlotRewrite) {
+        WindowAggregateRollupContext context = new 
WindowAggregateRollupContext(queryTopPlan,
+                mvExprToMvScanExprQueryBased, strictSlotRewrite);
+        List<? extends Expression> inputExpressions = needShuttle
+                ? ExpressionUtils.shuttleExpressionWithLineage(expressions, 
queryTopPlan)
+                : expressions;
+        List<Expression> rewrittenExpressions = inputExpressions.stream()
+                .map(expression -> 
expression.accept(WindowAggregateRollupRewriter.INSTANCE, context))
+                .collect(Collectors.toList());
+        return context.isValid() ? rewrittenExpressions : ImmutableList.of();
+    }
+
+    private static Function rollupWindowAggregateFunction(AggregateFunction 
queryAggregateFunction,
+            Expression queryAggregateFunctionShuttled, Map<Expression, 
Expression> mvExprToMvScanExprQueryBased) {
+        for (Map.Entry<Expression, Expression> expressionEntry : 
mvExprToMvScanExprQueryBased.entrySet()) {
+            Expression viewExpression = expressionEntry.getKey();
+            // Window mapping keys may be full WindowExpression while rollup 
handlers match aggregate functions.
+            if (viewExpression instanceof WindowExpression) {

Review Comment:
   Added windowSpecEquals() in rollupWindowAggregateFunction(). Before using an 
MV window mapping entry, we require the MV key to be a full WindowExpression 
and compare partition keys, order keys, and window frame between query and 
view. Mismatched specs are skipped and rollup does not use that MV column.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to