hongkunxu opened a new pull request, #18529:
URL: https://github.com/apache/pinot/pull/18529

   # Part 2 — Materialized View:  Query Rewrite
   
   > **Depends on:** _<link to Part 1 PR>_ — the metadata model, consistency 
manager, analyzer, and Minion task framework. This PR will not compile or run 
without it.
   
   ## Summary
   
   This PR adds the broker-side query rewrite half of Apache Pinot's 
Materialized View (MV) feature for the Single-Stage Query Engine (SSE). Once an 
MV is built and kept fresh by the Part 1 framework, this PR lets the broker 
**transparently rewrite** a user's query against the base table to an 
equivalent query against the MV — no SQL change from the user.
   
   **Key capabilities:**
   - Transparent query rewrite at the broker layer — no SQL changes required 
from users
   - Per-MV opt-in flag (`rewriteEnabled`) and per-MV staleness SLO 
(`stalenessThresholdMs`) — both stored on the MV definition (extends Part 1's 
`MaterializedViewDefinitionMetadata`)
   - Pluggable subsumption strategies for exact match, projection subset, and 
aggregation rollup
   - Hybrid execution mode that splits queries across MV (historical, `ts < 
watermarkMs`) and base table (real-time, `ts >= watermarkMs`) with automatic 
result merging
   - Master broker switch 
(`pinot.broker.query.enable.materialized.view.rewrite=false` by default) — 
upgrades are safe; no behavior change until an operator opts in
   - New `materializedViewQueried` field on `BrokerResponseNative` so operators 
can see which MV served each query
   
   ## Design Doc
   
https://docs.google.com/document/d/1ToJfN42IMNySEY8YODb99Beis9YpLa8A8OWLcPcvG0M/edit?usp=sharing
   
   ## Architecture (rewrite path)
   
   The diagram in the Companion PR (Part 1) shows the full end-to-end system. 
This PR delivers the broker-side pieces: `MaterializedViewQueryRewriteEngine`, 
`MaterializedViewMetadataCache`, the eligibility gate, the strategy chain, and 
the FULL / SPLIT execution-mode branching.
   
   ## Key Components
   
   | Component | Module | Description |
   |---|---|---|
   | `MaterializedViewHandler` (interface) + `DefaultMaterializedViewHandler` | 
`pinot-materialized-view` | Pluggable broker-side rewrite SPI loaded 
reflectively by `loadHandler(...)` |
   | `MaterializedViewQueryRewriteEngine` | `pinot-materialized-view` | Walks 
subsumption strategies, picks lowest-cost plan; runs the eligibility gate 
(`rewriteEnabled` + `watermarkMs > 0` + staleness SLO) |
   | `MaterializedViewMetadataCache` | `pinot-materialized-view` | Broker-side 
reverse index from raw base-table name to MV cache entries; pre-computes 
`viewProjectionMap` + flattened-AND filter conjuncts; double-ZK-listener 
(definition path infrequent, runtime path per-task) |
   | `ExactSubsumptionStrategy` | `pinot-materialized-view` | Matches queries 
identical to the MV definition (cost ≈ 1) |
   | `ScanSubsumptionStrategy` | `pinot-materialized-view` | Matches scan 
queries whose projection is a subset of the MV (cost ≈ 2) |
   | `AggregationSubsumptionStrategy` | `pinot-materialized-view` | Matches 
aggregation queries via re-aggregation over MV columns (cost ≈ 3) |
   | `AggregationEquivalenceRegistry` | `pinot-materialized-view` | Maps base 
aggregations (SUM, MIN, MAX, COUNT, HLL/HLL+/Theta) to MV-side re-aggregation; 
the Part 1 analyzer is extended to reject MVs whose aggregations are not 
registered here |
   
   ## Configuration Reference
   
   ### Broker config
   
   | Key | Default | Description |
   |---|---|---|
   | `pinot.broker.query.enable.materialized.view.rewrite` | `false` | Master 
switch for MV rewrite. Required to enable broker-side rewrites; default-off 
makes upgrades safe. |
   | `pinot.broker.materialized.view.handler.class` | 
`DefaultMaterializedViewHandler` | Pluggable handler class loaded via 
`Class.forName`. |
   
   ### Per-MV opt-in (no per-query option)
   
   V2 moves opt-in from a per-query `useMaterializedView` option to a 
per-MV-table flag on the MV's `materializedViewConfig`. These two fields are 
added by this PR to the existing `MaterializedViewDefinitionMetadata` (Part 1):
   
   | Field | Default | Description |
   |---|---|---|
   | `rewriteEnabled` | `true` | Operator kill switch — set to `false` to keep 
ingestion running while temporarily routing all queries to the base table (e.g. 
during MV migration). |
   | `stalenessThresholdMs` | `0` (no SLO) | Broker excludes the MV from 
rewrite when `now − watermarkMs > stalenessThresholdMs`. Set this to bound the 
maximum age of MV-served data. |
   
   ## Execution Modes
   
   - **`FULL_REWRITE`** — the entire query is satisfiable from the MV; broker 
routes to the MV table only.
   - **`SPLIT_REWRITE`** — the query window crosses `watermarkMs`. Broker fires 
two parallel sub-queries:
     - MV side: `ts < watermarkMs`
     - Base side: `ts >= watermarkMs`
     - Results are merged at the broker (`BrokerReduceService` MV branch).
   
   V1 uses a single split point (`watermarkMs`). Per-bucket N-way routing — 
using the full `partitions` map for fine-grained MV/base interleaving — is 
deferred to V2; the persistent state shape from Part 1 is already 
forward-compatible.
   
   ## Eligibility Gate
   
   For each query against a base table that has a registered MV, the broker 
evaluates in order:
   
   1. Master broker switch enabled
   2. MV's `rewriteEnabled = true`
   3. MV's `watermarkMs > 0` (at least one VALID partition exists)
   4. `now − watermarkMs <= stalenessThresholdMs` (when `stalenessThresholdMs > 
0`)
   5. Strategy chain finds a matching subsumption rule
   
   If any check fails, the query is served from the base table unchanged.
   
   ## Quick Start
   
   ```
   bin/pinot-admin.sh QuickStart -type MATERIALIZED_VIEW
   ```
   
   (Same quickstart as Part 1, plus broker-side rewrite is now active.) The 
following query hits `airlineStats` but the broker transparently rewrites it to 
`airlineStatsMv`:
   
   ```sql
   SELECT Carrier, SUM(ArrDelay) AS total_delay, COUNT(*) AS flights
   FROM airlineStats GROUP BY Carrier ORDER BY total_delay DESC LIMIT 10;
   ```
   
   The response includes `"materializedViewQueried": "airlineStatsMv_OFFLINE"` 
so operators can see which MV served each query.
   
   ## Query Rewrite Examples
   
   ### 1. ExactSubsumption — scan projection
   
   Hits the scan MV. Query exactly matches the scan MV definition.
   
       SELECT DaysSinceEpoch, Carrier, Origin, Dest, DestCityName
       FROM airlineStats
   
   ### 2. ScanSubsumption — projection subset with residual filter
   
   Hits the scan MV. User projection ⊂ MV projection; `WHERE Dest = 'IAH'` 
becomes a residual filter on the MV.
   
       SELECT DaysSinceEpoch, Origin, Dest, DestCityName
       FROM airlineStats
       WHERE Dest = 'IAH'
   
   ### 3. ExactSubsumption — aggregation query
   
   Hits the aggregation MV. Query exactly matches MV definition (same GROUP BY 
+ same aggregations).
   
       SELECT DaysSinceEpoch, Carrier, Origin, Dest, DestCityName,
              SUM(ArrDelayMinutes) AS ArrDelayMinutes_sum,
              SUM(Cancelled)       AS Cancelled_sum
       FROM airlineStats
       GROUP BY DaysSinceEpoch, Carrier, Origin, Dest, DestCityName
   
   ### 4. AggSubsumption — re-aggregation with fewer GROUP BY keys
   
   Hits the aggregation MV. User GROUP BY ⊂ MV GROUP BY (drops `DestCityName`); 
rewritten to `SUM(ArrDelayMinutes_sum)` over MV.
   
       SELECT DaysSinceEpoch, Carrier, Origin, Dest,
              SUM(ArrDelayMinutes) AS ArrDelayMinutes_sum
       FROM airlineStats
       GROUP BY DaysSinceEpoch, Origin, Carrier, Dest
   
   ### 5. AggSubsumption + SketchMergeEquivalence — HLL rewrite
   
   Hits the HLL MV. `DISTINCTCOUNTHLL(TailNum)` on base table → sketch merge 
over MV's `DISTINCTCOUNTRAWHLL` column `hll_tailnum`.
   
       SELECT Origin, Dest,
              DISTINCTCOUNTHLL(TailNum) AS hll_tailnum
       FROM airlineStats
       GROUP BY Origin, Dest
   
   ## Common Errors
   
   | Error message (excerpt) | When | How to fix |
   |---|---|---|
   | `aggregation '<X>' for which no re-aggregation rule is registered` | At MV 
create time: an MV uses an aggregation that the broker would have no way to 
re-aggregate at query time | Use SUM/MIN/MAX/COUNT or one of the registered 
sketch families (HLL, HLL+, Theta). AVG / PERCENTILE etc. are not supported. |
   
   (For ingestion-time errors — bucketTimePeriod, LIMIT, OFFSET, nested SELECT, 
mutable source, etc. — see the Companion PR.)
   
   ## Operational Notes
   
   - **Upgrade order (full feature):** controller → minion (Part 1) → broker 
(this PR). The broker-side rewrite defaults OFF 
(`pinot.broker.query.enable.materialized.view.rewrite=false`); existing 
clusters see no behavior change after upgrade until an operator opts in.
   - **Wire-format additions** (backward-compatible — old readers ignore 
unknown fields):
     - New fields on `MaterializedViewDefinitionMetadata`: `rewriteEnabled`, 
`stalenessThresholdMs`.
     - New `BrokerResponseNative` field `materializedViewQueried` (Jackson 
`@JsonInclude(NON_NULL)` so absent when unused).
   
   ## Testing
   
   - **Unit tests**: subsumption strategies (`ExactSubsumptionStrategyTest`, 
`ScanSubsumptionStrategyTest`, `AggregationSubsumptionStrategyTest`), query 
rewrite engine (`MaterializedViewQueryRewriteEngineTest`), broker metadata 
cache (`MaterializedViewMetadataCacheTest`), broker reduce merge for SPLIT 
(`BrokerReduceServiceTest`), aggregation equivalence registry 
(`AggregationEquivalenceRegistryTest`), MV-analyzer extension (aggregation 
eligibility cases added to `MaterializedViewAnalyzerTest`).
   - **Integration test**: rewrite half of 
`MaterializedViewClusterIntegrationTest` — end-to-end coverage of query rewrite 
(FULL and SPLIT modes), `rewriteEnabled` kill switch, staleness SLO eviction, 
and the `materializedViewQueried` response field.
   
   ## Modules Affected
   
   - `pinot-broker` — Broker factories for MV-aware single-stage / gRPC handlers
   - `pinot-common` — `BrokerResponseNative.materializedViewQueried` field
   - `pinot-core` — Broker reduce branch for MV-rewritten queries
   - `pinot-materialized-view` — Query rewrite engine, handler SPI, subsumption 
strategies, aggregation equivalence registry; extends Part 1's 
`MaterializedViewDefinitionMetadata` with `rewriteEnabled` / 
`stalenessThresholdMs`; extends `MaterializedViewAnalyzer` with the 
aggregation-eligibility check
   - `pinot-spi` — Config-key constants 
(`pinot.broker.query.enable.materialized.view.rewrite`, 
`pinot.broker.materialized.view.handler.*`)
   
   ## Limitations / V2 Defer
   
   - Only applicable to the SSE engine; MSE rewrite is out of scope.
   - N-way per-bucket broker routing (use the full `partitions` map for 
fine-grained MV/base interleaving instead of the single `watermarkMs` split 
point) — deferred to V2 once V1 has production telemetry. Persistent state 
shape is already forward-compatible.
   - AVG / PERCENTILE / other non-distributive aggregations not supported 
(analyzer rejects MVs that use them; only SUM/MIN/MAX/COUNT and HLL/HLL+/Theta 
sketches have re-aggregation rules).
   - No per-query opt-in / override — feature is governed entirely by the 
broker master switch + per-MV `rewriteEnabled`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to