adriangb commented on code in PR #21996:
URL: https://github.com/apache/datafusion/pull/21996#discussion_r3178368227


##########
datafusion/optimizer/src/request_statistics.rs:
##########
@@ -0,0 +1,343 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`RequestStatistics`] walks the optimized logical plan once and
+//! attaches a `Vec<StatisticsRequest>` to each `TableScan` describing
+//! what stats the surrounding plan shape would benefit from. The
+//! physical planner reads these and threads them into
+//! `ScanArgs::with_statistics_requests`, where the `TableProvider`
+//! decides what it can answer cheaply.
+//!
+//! This rule is meant to run **last** in the optimizer pipeline, after
+//! every other rule has finished rewriting — that way the requests
+//! reflect the plan shape the physical planner is actually going to
+//! plan, not an intermediate one that a later rule reshaped.
+//!
+//! The rule itself never changes plan structure; it only annotates
+//! `TableScan` nodes. Idempotent: running it twice yields the same
+//! result.
+
+use std::collections::{BTreeSet, HashMap, HashSet};
+
+use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
+use datafusion_common::{Column, Result, TableReference};
+use datafusion_expr::LogicalPlan;
+use datafusion_expr_common::statistics::StatisticsRequest;
+
+use crate::optimizer::ApplyOrder;
+use crate::{OptimizerConfig, OptimizerRule};
+
+/// Optimizer rule that annotates each `TableScan` with the statistics
+/// the optimizer / physical planner / table provider could benefit
+/// from, derived from the surrounding plan shape.
+///
+/// Heuristics (one entry per relevant node):
+/// - `Sort`   → `Min` / `Max` / `NullCount` on each sort key.
+/// - `Filter` → `Min` / `Max` / `NullCount` / `DistinctCount` on every
+///   column referenced in the predicate.
+/// - `Join`   → `DistinctCount` / `NullCount` on join keys (both sides).
+/// - Always   → `RowCount` per scan.
+///
+/// Columns are attributed back to a source `TableScan` by walking each
+/// `TableScan`'s output schema; an unqualified column with a unique
+/// name in the plan resolves to its source. Ambiguous names (same
+/// column name in multiple TableScans) and renames through projections
+/// are accepted as a known POC limitation — the worst case is "we
+/// over-request" or "we miss a column", never incorrectness.
+#[derive(Default, Debug)]
+pub struct RequestStatistics;
+
+impl RequestStatistics {
+    #[expect(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for RequestStatistics {
+    fn name(&self) -> &str {
+        "request_statistics"
+    }
+
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        // We need the whole plan to derive per-table requests, so we
+        // run our own walk inside `rewrite` instead of letting the
+        // framework descend.
+        None
+    }
+
+    fn supports_rewrite(&self) -> bool {
+        true
+    }
+
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        _config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>> {
+        let requests = derive_requests(&plan);
+        if requests.is_empty() {
+            return Ok(Transformed::no(plan));
+        }
+
+        let plan = plan.transform_down(|node| {
+            if let LogicalPlan::TableScan(scan) = node {
+                let new_requests =
+                    
requests.get(&scan.table_name).cloned().unwrap_or_default();
+                if new_requests == scan.statistics_requests {
+                    return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
+                }
+                let mut scan = scan;
+                scan.statistics_requests = new_requests;
+                Ok(Transformed::yes(LogicalPlan::TableScan(scan)))
+            } else {
+                Ok(Transformed::no(node))
+            }
+        })?;
+        Ok(plan)
+    }
+}
+
+/// Walk the plan and build a `(TableReference -> Vec<StatisticsRequest>)`
+/// map. The result is sorted/de-duplicated for stability.
+fn derive_requests(
+    plan: &LogicalPlan,
+) -> HashMap<TableReference, Vec<StatisticsRequest>> {
+    // Per-table accumulators. We use a BTreeSet so requests come out in
+    // a stable, deterministic order regardless of plan-walk order.
+    let mut acc: HashMap<TableReference, BTreeSet<RequestKey>> = 
HashMap::new();
+
+    // Map column-name -> originating TableReference (last writer wins
+    // when names collide; we accept that imprecision as a POC).

Review Comment:
   This needs to be sorted out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to