github-actions[bot] commented on code in PR #63763:
URL: https://github.com/apache/doris/pull/63763#discussion_r3502887539
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java:
##########
@@ -362,17 +513,245 @@ private Plan rewrite(LogicalFilter<? extends Plan>
filter, LogicalApply<Plan, Pl
windowFilterConjunct = ExpressionUtils.replace(windowFilterConjunct,
ImmutableMap.of(aggOut.toSlot(), aggOutExpr));
- LogicalFilter<Plan> newFilter =
filter.withConjunctsAndChild(conjuncts.get(true), apply.left());
+ // Split uncorrelated conjuncts: predicates that reference ONLY shared
+ // relation slots (tables appearing in both outer and inner plans) must
+ // stay ABOVE the window. Otherwise the window function would see a
+ // different set of rows than the original scalar subquery.
+ //
+ // For example, with fact(f) as shared table and dim(d) as outer-only:
+ // f.v > 6 → shared-only → must stay above the window
+ // d.tag > 0 → outer-only → safe below the window
+ // f.k = d.k → join cond → needed below the window
+ //
+ // We find shared tables by comparing table IDs that appear in both
+ // outer and inner plans, then collect ALL output slots of those
+ // tables (not just columns referenced in the inner query).
+ List<CatalogRelation> outerRels = outerPlans.stream()
+ .filter(CatalogRelation.class::isInstance)
+ .map(CatalogRelation.class::cast)
+ .collect(Collectors.toList());
+ List<CatalogRelation> innerRels = innerPlans.stream()
+ .filter(CatalogRelation.class::isInstance)
+ .map(CatalogRelation.class::cast)
+ .collect(Collectors.toList());
+ Set<Long> innerTableIds = innerRels.stream()
+ .map(r -> r.getTable().getId())
+ .collect(Collectors.toSet());
+ Set<ExprId> sharedOuterExprIds = outerRels.stream()
+ .filter(r -> innerTableIds.contains(r.getTable().getId()))
+ .flatMap(r -> r.getOutput().stream())
+ .map(Slot::getExprId)
+ .collect(Collectors.toSet());
+ Set<Expression> uncorrelatedConjuncts = conjuncts.get(true);
+ Set<Expression> belowWindowConjuncts = Sets.newHashSet();
+ Set<Expression> aboveWindowConjuncts = Sets.newHashSet();
+ if (uncorrelatedConjuncts != null) {
+ for (Expression conj : uncorrelatedConjuncts) {
+ // Conjuncts that were matched against inner subquery filter
+ // conjuncts (tracked by checkFilter) must stay BELOW the
+ // window. They are semantically part of the inner aggregate's
+ // filter, not extra outer-only predicates. Placing them above
+ // the window would let the window see more rows than the
+ // original scalar subquery, producing wrong aggregate results.
+ if (matchedInnerFilterConjuncts.contains(conj)) {
+ belowWindowConjuncts.add(conj);
+ continue;
+ }
+ // Volatile and NoneMovableFunction predicates must stay
+ // ABOVE the window — pushing them below would change
+ // evaluation frequency or move a side-effecting call.
+ if (isVolatileOrNoneMovable(conj)) {
+ aboveWindowConjuncts.add(conj);
+ continue;
+ }
+ // Predicates referencing shared-table columns must stay
+ // ABOVE the window; otherwise the window sees fewer rows
+ // than the original scalar subquery.
+ if (referencesSharedTable(conj, sharedOuterExprIds)) {
+ aboveWindowConjuncts.add(conj);
+ } else {
+ belowWindowConjuncts.add(conj);
+ }
+ }
+ }
+
+ // Extract and classify conjuncts from any LogicalFilter nodes nested
+ // inside the outer child (apply.left()). Nested shared-table filters
+ // (e.g. f.v > 6 under a CrossJoin) must be hoisted ABOVE the window;
+ // otherwise the window would aggregate over a filtered subset of rows
+ // while the original scalar subquery sees all rows for the key.
+ //
+ // Nested outer-only filters (e.g. d.tag > 0) are safe below and can
+ // stay in place after the filters are stripped.
+ List<LogicalFilter<Plan>> nestedOuterFilters = apply.left()
+ .collectToList(LogicalFilter.class::isInstance);
+ Set<ExprId> extractedConjunctExprIds = Sets.newHashSet();
+ for (LogicalFilter<Plan> nf : nestedOuterFilters) {
+ for (Expression conj : nf.getConjuncts()) {
+ // Matched inner-filter conjuncts always go below.
+ if (matchedInnerFilterConjuncts.contains(conj)) {
+ belowWindowConjuncts.add(conj);
+
extractedConjunctExprIds.addAll(conj.getInputSlotExprIds());
+ continue;
+ }
+ // Volatile / NoneMovable on a shared table would restrict
+ // the window's input — reject the rewrite.
+ if (isVolatileOrNoneMovable(conj)) {
+ if (nf.collect(CatalogRelation.class::isInstance).stream()
+ .map(r -> ((CatalogRelation) r).getTable().getId())
+ .anyMatch(innerTableIds::contains)) {
+ return filter;
+ }
+ continue;
+ }
+ if (referencesSharedTable(conj, sharedOuterExprIds)) {
+ aboveWindowConjuncts.add(conj);
+ } else {
+ belowWindowConjuncts.add(conj);
+ }
+ extractedConjunctExprIds.addAll(conj.getInputSlotExprIds());
+ }
+ }
+ // Strip all nested LogicalFilter nodes from the outer child so the
+ // window operates on the unfiltered scan/join.
+ Plan strippedOuterChild = stripOuterFilters(apply.left());
+
+ // The window function's aggregate references shared-table slots
+ // (e.g. f.v for SUM(f.v) after slot replacement). A pruning
+ // project inside apply.left() may have dropped those slots even
+ // when there are no nested filters to extract. Collect the
+ // replaced aggregate's input slot ExprIds and include them in
+ // the project-expansion set so ensureProjectOutput carries them
+ // through.
+ Set<ExprId> allNeededExprIds =
Sets.newHashSet(extractedConjunctExprIds);
+ allNeededExprIds.addAll(ExpressionUtils.replace(function,
innerOuterSlotMap)
+ .getInputSlotExprIds());
+ strippedOuterChild = ensureProjectOutput(strippedOuterChild,
+ allNeededExprIds);
+
+ LogicalFilter<Plan> newFilter = filter.withConjunctsAndChild(
+ belowWindowConjuncts, strippedOuterChild);
LogicalWindow<Plan> newWindow = new
LogicalWindow<>(ImmutableList.of(windowFunctionAlias), newFilter);
- LogicalFilter<Plan> windowFilter = new
LogicalFilter<>(ImmutableSet.of(windowFilterConjunct), newWindow);
+
+ // Combine shared-table predicates with the window comparison
predicate above the window
+ Set<Expression> topConjuncts = Sets.newHashSet(windowFilterConjunct);
+ topConjuncts.addAll(aboveWindowConjuncts);
+ LogicalFilter<Plan> windowFilter = new
LogicalFilter<>(ImmutableSet.copyOf(topConjuncts), newWindow);
return windowFilter;
}
+ /**
+ * Ensure that every LogicalProject in the plan tree outputs all slots
+ * referenced by extracted conjuncts (ExprIds in {@code neededExprIds}).
+ * If a project prunes away a needed slot, the reinserted filter
+ * predicates would have dangling references.
+ *
+ * <p>Recursively walks the tree depth-first; expands each project to
+ * include any missing slots as simple identity projections (SlotReference
→
+ * SlotReference). Recursing into the child before computing the current
+ * project's {@code childOutput} ensures that stacked pruning projects
+ * (e.g. {@code Project(k) → Project(k) → Scan(k, v)}) are expanded
+ * bottom-up — the inner project is expanded first, and then the outer
+ * project can see the expanded slots in its child output.
+ */
+ private Plan ensureProjectOutput(Plan plan, Set<ExprId> neededExprIds) {
+ if (neededExprIds.isEmpty()) {
+ return plan;
+ }
+ if (plan instanceof LogicalProject) {
+ LogicalProject<Plan> project = (LogicalProject<Plan>) plan;
+ // Recurse into the child first so stacked pruning projects
+ // are expanded bottom-up. The child's output after expansion
+ // determines which slots the current project can pull through.
+ Plan newChild = ensureProjectOutput(project.child(),
neededExprIds);
+ Set<ExprId> projectOutputExprIds = project.getOutputExprIdSet();
+ Set<Slot> childOutput = newChild.getOutputSet();
+ List<NamedExpression> newProjects =
Lists.newArrayList(project.getProjects());
+ boolean expanded = false;
+ for (ExprId id : neededExprIds) {
+ if (!projectOutputExprIds.contains(id)) {
+ for (Slot slot : childOutput) {
+ if (slot.getExprId().equals(id)) {
+ newProjects.add(slot);
+ expanded = true;
+ break;
+ }
+ }
+ }
+ }
+ if (expanded) {
+ return project.withProjectsAndChild(newProjects, newChild);
+ }
+ if (newChild != project.child()) {
+ return project.withChildren(ImmutableList.of(newChild));
+ }
+ return plan;
+ }
+ if (plan.children().isEmpty()) {
+ return plan;
+ }
+ List<Plan> newChildren = plan.children().stream()
+ .map(c -> ensureProjectOutput(c, neededExprIds))
+ .collect(Collectors.toList());
+ return plan.withChildren(newChildren);
+ }
+
private WindowExpression createWindowFunction(List<Slot> correlatedSlots,
AggregateFunction function) {
// partition by clause is set by all the correlated slots.
return new WindowExpression(function,
ImmutableList.copyOf(correlatedSlots), Collections.emptyList());
}
+ /** Recursively strip deterministic, movable LogicalFilter nodes from the
+ * plan tree. Filters containing volatile predicates (e.g. random()) or
+ * NoneMovableFunction predicates (e.g. assert_true()) are left in place —
+ * moving such predicates across a join/window changes their evaluation
+ * context and can alter query results. */
+ private Plan stripOuterFilters(Plan plan) {
+ if (plan instanceof LogicalFilter) {
+ LogicalFilter<?> filter = (LogicalFilter<?>) plan;
+ // Separate unsafe-to-move conjuncts from safe ones.
+ // Volatile and NoneMovableFunction predicates stay at their
+ // original position; deterministic movable predicates have been
+ // extracted and can be stripped.
+ Set<Expression> keepConjuncts = filter.getConjuncts().stream()
+
.filter(AggScalarSubQueryToWindowFunction::isVolatileOrNoneMovable)
+ .collect(Collectors.toSet());
+ if (keepConjuncts.isEmpty()) {
+ // All conjuncts are safe to strip.
+ return stripOuterFilters(filter.child(0));
+ }
+ // Keep only unsafe-to-move conjuncts at this position.
+ Plan strippedChild = stripOuterFilters(filter.child(0));
+ return new LogicalFilter<>(ImmutableSet.copyOf(keepConjuncts),
strippedChild);
+ }
+ if (plan.children().isEmpty()) {
+ return plan;
+ }
+ return plan.withChildren(
+
plan.children().stream().map(this::stripOuterFilters).collect(Collectors.toList()));
+ }
+
+ // ---- helper methods ----------------------------------------------------
+
+ /** An expression that must not be moved or pruned by plan rewrites. */
+ private static boolean isVolatileOrNoneMovable(Expression expr) {
+ return expr.containsVolatileExpression()
+ || expr.containsType(NoneMovableFunction.class);
+ }
+
+ /** True when {@code expr} references any column of a shared table
+ * (a table appearing in both outer and inner plans). */
+ private static boolean referencesSharedTable(Expression expr, Set<ExprId>
sharedOuterExprIds) {
+ for (ExprId id : expr.getInputSlotExprIds()) {
+ if (sharedOuterExprIds.contains(id)) {
Review Comment:
This matcher still treats `MATCH_*` predicates with different `USING
ANALYZER` clauses as identical. `Match.equals()` includes the normalized
analyzer, and `ExpressionTranslator.visitMatch()` uses that analyzer to select
the inverted index and to build the `MatchPredicate`, but this visitor falls
through to class-plus-children comparison for
`MatchAny`/`MatchAll`/`MatchPhrase`/related nodes.
A reduced accepted shape is:
```text
Filter(f.k = d.k,
f.txt MATCH_ANY 'foo' USING ANALYZER english,
f.v * 2 > sum_alias)
Apply(correlation: d.k)
CrossJoin(Scan fact f, Scan dim_unique d)
Aggregate(sum(f2.v) AS sum_alias)
Filter(f2.k = d.k,
f2.txt MATCH_ANY 'foo' USING ANALYZER chinese)
Scan fact f2
```
After slot replacement, the inner `MATCH_ANY` has the same class and
children as the outer predicate, so this check records the outer expression as
the matched inner filter even though the analyzer remains different. The
rewrite then computes the window aggregate under the outer analyzer's row set
instead of the scalar subquery's inner analyzer row set. Please make `Match`
comparison use semantic equality, or add a `visitMatch()` override that
requires analyzer equality before classifying the predicates as the same inner
filter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]