geoffreyclaude opened a new pull request, #16685:
URL: https://github.com/apache/datafusion/pull/16685

   ## Which issue does this PR close?
   
   - Draft attempt at #13583.
   
   ## How to review?
   
   - The sqllogic tests in the 
`datafusion/sqllogictest/test_files/match_recognize` cover extensive 
`MATCH_RECOGNIZE` cases, both as `EXPLAIN` and actual queries. They have 
**not** been validated, so the actual queries definitely contain critical 
correctness bugs.
   - The core change is the parsing of the `MATCH_RECOGNIZE` AST into a Logical 
Plan, which happens in `datafusion/sql/src/relation/mod.rs`
   - The actual pattern matching happens in `pattern_matcher.rs` (which is an 
almost fully "vibe-coded" file, so full of surprising AI weirdness.)
   - The main question a preliminary review should answer is if the generated 
Logical and Execution Plans make sense, leveraging existing DataFusion nodes, 
or if it would be better done almost from scratch inside a single large node.
   
   ## MATCH_RECOGNIZE in DataFusion  
   _A walk-through of the current draft implementation_
   
   > ⚠️ **WARNING – Experimental Feature**  
   > This implementation is **not production ready**. Large portions of the 
code – especially `pattern_matcher.rs` and 
`functions-window/src/match_recognize.rs` – were vibe-coded with minimal 
review.  
   >  
   > Expect missing edge-case handling, correctness issues, and performance 
problems.
   >  
   > **This document itself is AI-generated** and may contain inaccuracies or 
omissions. Use it only as a starting point; always verify details against the 
source code before relying on them.
   
   This note explains how a `MATCH_RECOGNIZE` statement is translated from SQL 
into DataFusion's logical and physical plans.
   
   The supported features are based off the current [Snowflake 
documentation](https://docs.snowflake.com/en/sql-reference/constructs/match_recognize),
 and limited by the `sqlparser-rs` crate's support of MATCH_RECOGNIZE (no 
FINAL/RUNNING for MEASURES, EXCLUDE and PERMUTE of symbols only.)
   
   > ⚠️ **Symbol predicates inside the DEFINE clause are *not yet supported*.**
   
   ---
   
   ## 1.  High-level flow
   
   1. **SQL parsing** – the SQL module recognises the new grammar (table factor 
`MATCH_RECOGNIZE (…)`) and produces a `TableFactor::MatchRecognize` AST node.
   
   2. **Logical planning** – the SQL planner turns the AST into a hierarchy of 
logical plan nodes:
      * normalisation of `DEFINE`, `PATTERN`, `MEASURES`, `ROWS PER MATCH`, 
`AFTER MATCH`
      * explicit `Projection`, `WindowAgg`, `Filter`
      * a new `LogicalPlan::MatchRecognizePattern` node that carries the 
compiled pattern.
   
   3. **Physical planning** – the core planner detects the new logical node and 
produces a `MatchRecognizePatternExec`.  
      All remaining operators (projection, window, filter, repartition, …) are 
produced exactly the same way as for "ordinary" SQL.
   
   4. **Execution** – `MatchRecognizePatternExec` implements pattern matching 
at runtime, augments every output record batch with five metadata columns and 
yields the augmented stream.  
      Upstream projections / filters / windows consume those virtual columns.
   
   The rest of this document focuses on step 2 – how the planner constructs the 
logical plan.
   
   ---
   
   ## 2.  SQL planner extensions
   
   ### 2.1  New planner context
   
   `PlannerContext` now contains an optional `MatchRecognizeContext`.  
   When the planner descends into a `MATCH_RECOGNIZE` clause it enables the 
context to enforce the special scoping rules for
   
   * **qualified identifiers** – they must be of the form `symbol.column`, e.g. 
`A.price`;
   * **window and aggregate functions** – extra implicit arguments are added 
(see below).
   
   The context also exposes the `PARTITION BY`, `ORDER BY` and `ROWS PER MATCH` 
clauses so that helper functions can derive default window frames or adjust 
partitioning.
   
   ### 2.2  Handling `DEFINE`
   
   For every symbol reference found in the *pattern* the planner must be able 
to supply a predicate expression:
   
   ```
   DEFINE
       A AS price < 50,
       B AS price > 60
   ```
   
   * All symbols that appear in the pattern are collected first.
   * If a symbol has an explicit definition, that definition is planned into a 
regular `Expr`.
   * If it does **not** have a definition, the planner synthesises the constant 
expression `TRUE`.
   * All predicates are gathered in `defines : Vec<(Expr, String)>`, each 
carrying the predicate as well as the symbol name.
   
   The planner then inserts a **projection** immediately above the input:
   
   ```
   … → Projection {
           // unchanged input columns
           company,
           price_date,
           price,
           // one boolean column per symbol
           predicate_for_A AS __mr_symbol_A,
           predicate_for_B AS __mr_symbol_B,
           …
       }
   ```
   
   Those columns serve one single purpose: they are consumed by the pattern 
matcher at execution time.
   
   If a `DEFINE` expression contains window functions itself the planner 
inserts a **window node** underneath this projection first; after rebasing the 
expressions the overall shape becomes:
   
   ```
   Projection(add __mr_symbol_…)
     Window
       Input
   ```
   
   ### 2.3  Handling `PATTERN`
   
   `PATTERN` is compiled into a nested value of the enum
   `datafusion_expr::match_recognize::Pattern` (symbol, concatenation, 
alternation, repetition, …).
   
   The planner then creates a dedicated logical node
   
   ```
   LogicalPlan::MatchRecognizePattern {
       input:      Projection(with __mr_symbol_…)
       partition_by: […]
       order_by: […]
       pattern:    Pattern     // compiled tree
       after_skip: Option<…>
       rows_per_match: Option<…>
       symbols:    Vec<String> // ["A","B",…] in declaration order
   }
   ```
   
   The node itself is *purely declarative* – it only describes the pattern; the 
projection added earlier already made all predicates available.
   
   ### 2.4  Handling `MEASURES`
   
   `MEASURES` is conceptually just another projection applied **after** pattern 
detection.
   
   1. Each measure expression is individually planned through
      `sql_to_expr_with_match_recognize_measures_context`.
      That function
      * enables the special context so that `A.price` is valid,
      * **implicitly** appends hidden columns expected by specialised functions 
 
        (`FIRST`, `LAST`, `PREV`, `NEXT`, `CLASSIFIER`, …) and
      * asks every registered `ExprPlanner` to post-process the expression.  
        The default planners turn symbol predicates into the dedicated
        window UDF calls (`mr_first`, `mr_prev`, `classifier`, …) and rewrite
        aggregate functions such as
   
        ```
        COUNT(A.*)  ->  COUNT( CASE WHEN __mr_classifier = 'A' THEN 1 END )
        SUM(A.price) -> SUM( CASE WHEN __mr_classifier = 'A' THEN A.price END )
        ```
   
   2. If at least one measure contains a window function, another **window 
node** is pushed below the final projection (including a sort & repartition 
identical to ordinary SQL).
   
   3. Finally the planner calls `rows_filter` and `rows_projection` helpers to 
apply the semantics of `ROWS PER MATCH`:
   
      * default (`ONE ROW`)  →  filter on `__mr_is_last_match_row`
      * `ALL ROWS SHOW`      →  filter on `__mr_is_included_row`
      * `ALL ROWS OMIT EMPTY`→  `__mr_is_included_row` **and** classifier ≠ 
`'(empty)'`
      * `WITH UNMATCHED`     →  no additional filter
   
      and to choose the projection list (last-row only or all input columns).
   
   The complete logical plan therefore has the following skeleton (greatly 
simplified):
   
   ```
   01)Filter <- ROWS PER MATCH filter
   02)--Projection <- MEASURES projections
   03)----WindowAggr <- MEASURES Window functions
   04)------MatchRecognizePattern <- Runs PATTERN on symbols, emits metadata 
virtual columns
   05)--------Projection <- DEFINE symbols projected to virtual columns 
`__mr_symbol_<…>`
   06)----------WindowAggr <- DEFINE Window functions
   07)------------TableScan <- Input
   ```
   
   ### 2.5  Virtual columns
   
   The pattern executor generates five metadata columns:
   
   | Name | Type | Meaning |
   |------|------|---------|
   | `__mr_classifier`            | `Utf8`   | symbol the current row matched 
(or `'(empty)'`) |
   | `__mr_match_number`          | `UInt64` | running match counter, starts at 
1 |
   | `__mr_match_sequence_number` | `UInt64` | position inside current match, 
starts at 1 |
   | `__mr_is_last_match_row`     | `Boolean`| true on the final row of every 
match |
   | `__mr_is_included_row`       | `Boolean`| true if row is *not* excluded |
   
   They are appended to the schema in `pattern_schema()` and used directly
   by filters, partitioning and measures.
   
   ---
   
   ## 3.  Physical planning and execution
   
   * The core planner recognises `LogicalPlan::MatchRecognizePattern` and
     instantiates `MatchRecognizePatternExec`.
   
   * `MatchRecognizePatternExec`
     * receives the compiled `Pattern`, `partition_by` and `order_by`
     * exposes ordering/partitioning requirements identical to `WindowAggExec`
     * implements `execute()` by
       1. materialising a completely ordered partition,
       2. running the pattern matcher (`PatternMatcher`) which scans the 
partition once, emits matches and populates the metadata columns,
       3. honouring `AFTER MATCH SKIP` and `ROWS PER MATCH`.
   
   * All projections / window aggregates / filters produced earlier continue to 
behave exactly as they do for ordinary queries.
   
   ---
   
   ## 4.  Summary
   
   1. `MATCH_RECOGNIZE` is implemented entirely as a *normal* combination of 
projections, filters and window aggregates plus one dedicated pattern-matching 
node.
   
   2. `DEFINE` ⇒ boolean columns (`__mr_symbol_*`)  
      `PATTERN` ⇒ `MatchRecognizePattern` node  
      `MEASURES` ⇒ projection of window / aggregate functions over metadata
   
   3. Everything above the pattern node reuses DataFusion's existing machinery; 
physical execution differs only in the single custom executor that performs the 
row-wise NFA scan.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to