github-actions[bot] commented on code in PR #63763:
URL: https://github.com/apache/doris/pull/63763#discussion_r3490860648
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java:
##########
@@ -205,8 +233,25 @@ private boolean checkFilter(LogicalFilter<? extends Plan>
outerFilter) {
while (outerIterator.hasNext()) {
Expression outerExpr = outerIterator.next();
if (ExpressionIdenticalChecker.INSTANCE.check(innerExpr,
outerExpr)) {
+ // Volatile expressions (e.g. volatile UDFs) must never
+ // be matched as inner-filter conjuncts. Two
+ // syntactically identical volatile calls in the outer
+ // and inner filters — like volatile_bool_udf(f.k) and
+ // volatile_bool_udf(f2.k) after slot replacement — are
+ // independent evaluations with distinct VolatileIdentity
+ // from the parser/builder. Treating them as the same
+ // predicate collapses the independent outer-row filter
+ // and inner-aggregate filter into one, producing wrong
+ // results.
+ if (innerExpr.containsVolatileExpression()
Review Comment:
This guard also needs to reject `NoneMovableFunction` matches, not only
volatile expressions. For example:
```text
Filter(f.k = d.k, assert_true(f.v > 0, 'bad'), f.v * 2 > sum_alias)
Apply(correlation: d.k)
CrossJoin(Scan fact f, Scan dim_unique d)
Aggregate(sum(f2.v) AS sum_alias)
Filter(f2.k = d.k, assert_true(f2.v > 0, 'bad'))
Scan fact f2
```
After slot replacement, `ExpressionIdenticalChecker` can match the two
`assert_true` conjuncts because the new guard only checks
`containsVolatileExpression()`. The rewrite then records the outer assertion in
`matchedInnerFilterConjuncts` and emits a single below-window filter,
effectively pruning one `assert_true` occurrence even though `AssertTrue`
implements `NoneMovableFunction` and that marker's contract says these
functions should not be moved or pruned. Please reject matches when either side
contains `NoneMovableFunction`, or otherwise preserve both evaluations instead
of classifying the outer assertion as the inner aggregate filter.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java:
##########
@@ -362,17 +506,247 @@ private Plan rewrite(LogicalFilter<? extends Plan>
filter, LogicalApply<Plan, Pl
windowFilterConjunct = ExpressionUtils.replace(windowFilterConjunct,
ImmutableMap.of(aggOut.toSlot(), aggOutExpr));
- LogicalFilter<Plan> newFilter =
filter.withConjunctsAndChild(conjuncts.get(true), apply.left());
+ // Split uncorrelated conjuncts: predicates that reference ONLY shared
+ // relation slots (tables appearing in both outer and inner plans) must
+ // stay ABOVE the window. Otherwise the window function would see a
+ // different set of rows than the original scalar subquery.
+ //
+ // For example, with fact(f) as shared table and dim(d) as outer-only:
+ // f.v > 6 → shared-only → must stay above the window
+ // d.tag > 0 → outer-only → safe below the window
+ // f.k = d.k → join cond → needed below the window
+ //
+ // We find shared tables by comparing table IDs that appear in both
+ // outer and inner plans, then collect ALL output slots of those
+ // tables (not just columns referenced in the inner query).
+ List<CatalogRelation> outerRels = outerPlans.stream()
+ .filter(CatalogRelation.class::isInstance)
+ .map(CatalogRelation.class::cast)
+ .collect(Collectors.toList());
+ List<CatalogRelation> innerRels = innerPlans.stream()
+ .filter(CatalogRelation.class::isInstance)
+ .map(CatalogRelation.class::cast)
+ .collect(Collectors.toList());
+ Set<Long> innerTableIds = innerRels.stream()
+ .map(r -> r.getTable().getId())
+ .collect(Collectors.toSet());
+ Set<ExprId> sharedOuterExprIds = outerRels.stream()
+ .filter(r -> innerTableIds.contains(r.getTable().getId()))
+ .flatMap(r -> r.getOutput().stream())
+ .map(Slot::getExprId)
+ .collect(Collectors.toSet());
+ Set<Expression> uncorrelatedConjuncts = conjuncts.get(true);
+ Set<Expression> belowWindowConjuncts = Sets.newHashSet();
+ Set<Expression> aboveWindowConjuncts = Sets.newHashSet();
+ if (uncorrelatedConjuncts != null) {
+ for (Expression conj : uncorrelatedConjuncts) {
+ // Conjuncts that were matched against inner subquery filter
+ // conjuncts (tracked by checkFilter) must stay BELOW the
+ // window. They are semantically part of the inner aggregate's
+ // filter, not extra outer-only predicates. Placing them above
+ // the window would let the window see more rows than the
+ // original scalar subquery, producing wrong aggregate results.
+ if (matchedInnerFilterConjuncts.contains(conj)) {
+ belowWindowConjuncts.add(conj);
+ continue;
+ }
+ // Volatile predicates (e.g. random() > 0.5) must stay ABOVE
+ // the window. Pushing them below would let the window
+ // aggregate over a different set of rows per partition than
+ // the original scalar subquery. PushDownFilterThroughWindow
+ // has the same hazard and explicitly rejects volatile
+ // predicates for this reason.
+ if (conj.containsVolatileExpression()) {
+ aboveWindowConjuncts.add(conj);
+ continue;
+ }
+ // Any predicate that references shared-table slots (tables
+ // appearing in both outer and inner plans) must stay ABOVE
+ // the window. This handles both shared-only predicates
+ // (e.g. f.v > 6) and mixed shared+outer predicates
+ // (e.g. f.v > d.tag). Pushing them below would restrict
+ // the rows seen by the window function, producing a
+ // different aggregate than the original scalar subquery.
+ boolean hasShared = false;
+ for (ExprId id : conj.getInputSlotExprIds()) {
+ if (sharedOuterExprIds.contains(id)) {
+ hasShared = true;
+ break;
+ }
+ }
+ if (hasShared) {
+ aboveWindowConjuncts.add(conj);
+ } else {
+ // No shared-table references: the predicate involves only
+ // outer-only table columns. Safe to place below the
window.
+ belowWindowConjuncts.add(conj);
+ }
+ }
+ }
+
+ // Extract and classify conjuncts from any LogicalFilter nodes nested
+ // inside the outer child (apply.left()). Nested shared-table filters
+ // (e.g. f.v > 6 under a CrossJoin) must be hoisted ABOVE the window;
+ // otherwise the window would aggregate over a filtered subset of rows
+ // while the original scalar subquery sees all rows for the key.
+ //
+ // Nested outer-only filters (e.g. d.tag > 0) are safe below and can
+ // stay in place after the filters are stripped.
+ List<LogicalFilter<Plan>> nestedOuterFilters = apply.left()
+ .collectToList(LogicalFilter.class::isInstance);
+ Set<ExprId> extractedConjunctExprIds = Sets.newHashSet();
+ for (LogicalFilter<Plan> nf : nestedOuterFilters) {
+ for (Expression conj : nf.getConjuncts()) {
+ // Matched inner-filter conjuncts always go below.
+ if (matchedInnerFilterConjuncts.contains(conj)) {
+ belowWindowConjuncts.add(conj);
+
extractedConjunctExprIds.addAll(conj.getInputSlotExprIds());
+ continue;
+ }
+ // Volatile (e.g. random()) and NoneMovableFunction
+ // (e.g. assert_true()) predicates in nested filters must NOT
+ // be hoisted: moving them across a join or window changes
+ // their evaluation context. Keeping them in place is only
+ // safe when the predicate belongs exclusively to the
+ // outer-only relation. A volatile / side-effecting predicate
+ // on a shared table would restrict the window's input relative
+ // to the original scalar subquery, producing wrong results.
+ // Reject such rewrites.
+ if (conj.containsVolatileExpression()
+ || conj.containsType(NoneMovableFunction.class)) {
+ Set<Long> tablesUnderFilter = nf.collect(
+ CatalogRelation.class::isInstance).stream()
+ .map(r -> ((CatalogRelation) r).getTable().getId())
+ .collect(Collectors.toSet());
+ if (!Sets.intersection(tablesUnderFilter,
innerTableIds).isEmpty()) {
+ return filter;
+ }
+ continue;
+ }
+ boolean nfHasShared = false;
+ for (ExprId id : conj.getInputSlotExprIds()) {
+ if (sharedOuterExprIds.contains(id)) {
+ nfHasShared = true;
+ break;
+ }
+ }
+ if (nfHasShared) {
+ // Shared-table predicate → hoist above the window.
+ aboveWindowConjuncts.add(conj);
+ } else {
+ // Outer-only predicate → safe below, keep it there.
+ belowWindowConjuncts.add(conj);
+ }
+ extractedConjunctExprIds.addAll(conj.getInputSlotExprIds());
+ }
+ }
+ // Strip all nested LogicalFilter nodes from the outer child so the
+ // window operates on the unfiltered scan/join.
+ Plan strippedOuterChild = stripOuterFilters(apply.left());
+
+ // If a pruning project (e.g. Project(f.k) above Filter(f.v > 6))
+ // pruned columns referenced by extracted conjuncts, expand the
+ // project to carry those columns through. Otherwise the
+ // reinserted predicates would have dangling slot references.
+ strippedOuterChild = ensureProjectOutput(strippedOuterChild,
Review Comment:
This expansion only includes slots referenced by extracted nested filters,
so a plan with no nested filter can still build a window aggregate over a slot
that the outer branch has already pruned. For example, a reduced accepted tree
is:
```text
Filter(sf.k = d.k, sf.k * 2 > sum_alias)
Apply(correlation: d.k)
CrossJoin
SubQueryAlias sf
Project(k)
Scan fact(k, v)
Scan dim_unique(k)
Aggregate(sum(f2.v) AS sum_alias)
Filter(f2.k = d.k)
Scan fact f2
```
`checkProject()` accepts `Project(k)` because it is slot-only,
`checkFilter()` can match `f2.k = d.k` after slot replacement, and the
uniqueness check can pass on `dim_unique`. During rewrite,
`ExpressionUtils.replace(function, innerOuterSlotMap)` builds `sum(f.v) OVER
(...)`, but this call receives an empty `extractedConjunctExprIds`, so
`Project(k)` is left unchanged and the window child does not output `v`. The
rewritten plan then references `f.v` above a child that only exposes `sf.k`.
Please include the replaced aggregate/window input slots in the
project-expansion set, or reject the rewrite when those slots are not exposed;
if projects are expanded, restore the original projection shape above the
window.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java:
##########
@@ -205,8 +233,25 @@ private boolean checkFilter(LogicalFilter<? extends Plan>
outerFilter) {
while (outerIterator.hasNext()) {
Expression outerExpr = outerIterator.next();
if (ExpressionIdenticalChecker.INSTANCE.check(innerExpr,
outerExpr)) {
Review Comment:
This matcher still ignores deterministic UDF identity. A reduced accepted
shape is:
```text
Filter(f.k = d.k, outer_bool_udf(f.v), f.v * 2 > sum_alias)
Apply(correlation: d.k)
CrossJoin(Scan fact f, Scan dim_unique d)
Aggregate(sum(f2.v) AS sum_alias)
Filter(f2.k = d.k, inner_bool_udf(f2.v))
Scan fact f2
```
After slot replacement, `inner_bool_udf(f2.v)` becomes a
`JavaUdf`/`PythonUdf` with the same child as `outer_bool_udf(f.v)`.
`ExpressionIdenticalChecker` dispatches UDF nodes to the generic visitor, which
only checks runtime class and children; it does not compare the function
name/id/signature/body, even though normal `BoundFunction` equality compares
the name. Since deterministic UDFs are not caught by the volatile guard below,
this removes the inner predicate and records the outer UDF as the matched
aggregate filter, so the window is computed under `outer_bool_udf` instead of
the original `inner_bool_udf`. Please compare function identity for
UDF/bound-function nodes, or reject UDF filter matches when exact semantic
equality cannot be proven.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]