alamb commented on code in PR #21815:
URL: https://github.com/apache/datafusion/pull/21815#discussion_r3227677884
##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -563,6 +569,30 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send +
Sync {
Ok(Arc::new(Statistics::new_unknown(&self.schema())))
}
+ /// Returns statistics for a specific partition of this `ExecutionPlan`
node.
Review Comment:
If we are going to add (yet antoher) statistics API I suggest we add one
that will be easier to extend in the future -- specifically one that can also
add new parameters without a major API change
For example, what do you think about something like
```rust
trait ExecutionPlan {
...
fn statistics_with_args(
&self,
args: StatisticsArgs,
) -> Result<Arc<Statistics>> ...
....
/// Arguments passed to [`ExecutionPlan::statistics_with_args`] call
struct StatisticsArgs {
partition: Option<usize>,
ctx: &StatisticsContext,
}
```
That way we can add new parameters without major downstream churn again.
This is similar to what we have done with other APIs like `call_with_args`
https://github.com/apache/datafusion/blob/bb1c8e658942e8b8c4bd6d7636dd9eb52c395d54/datafusion/catalog/src/table.rs#L526-L531
##########
datafusion/physical-plan/src/statistics_context.rs:
##########
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Context for computing statistics in physical plans.
+//!
+//! [`StatisticsContext`] provides external context to
+//! [`ExecutionPlan::partition_statistics_with_context`], enabling operators
+//! to receive pre-computed child statistics and additional context for
+//! statistics computation.
+
+use crate::ExecutionPlan;
+use datafusion_common::Result;
+use datafusion_common::Statistics;
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::rc::Rc;
+use std::sync::Arc;
+
+/// Per-call memoization cache for [`compute_statistics`].
+///
+/// Keyed by `(plan node pointer address, partition)`. Created once per
+/// top-level [`compute_statistics`] call and shared across all recursive
+/// and operator-internal calls via [`StatisticsContext`].
+///
+/// The pointer-based key is safe within a single synchronous
+/// `compute_statistics` call: all `Arc<dyn ExecutionPlan>` nodes are held
+/// by the plan tree for the duration of the walk, so addresses cannot be
+/// reused.
+#[derive(Debug, Default)]
+struct StatsCache(HashMap<(usize, Option<usize>), Arc<Statistics>>);
+
+impl StatsCache {
+ fn get(
+ &self,
+ plan: &dyn ExecutionPlan,
+ partition: Option<usize>,
+ ) -> Option<&Arc<Statistics>> {
+ let key = (
+ plan as *const dyn ExecutionPlan as *const () as usize,
+ partition,
+ );
+ self.0.get(&key)
+ }
+
+ fn insert(
+ &mut self,
+ plan: &dyn ExecutionPlan,
+ partition: Option<usize>,
+ stats: Arc<Statistics>,
+ ) {
+ let key = (
+ plan as *const dyn ExecutionPlan as *const () as usize,
+ partition,
+ );
+ self.0.insert(key, stats);
+ }
+}
+
+/// Context passed to [`ExecutionPlan::partition_statistics_with_context`]
+/// carrying external information that operators can use when computing
+/// their statistics.
+#[derive(Debug)]
+pub struct StatisticsContext {
+ /// Pre-computed statistics for each child of the current node,
+ /// in the same order as [`ExecutionPlan::children`].
+ child_stats: Vec<Arc<Statistics>>,
Review Comment:
If we have a StatsCache maybe we could always use that (rather than *also*
having Vec of statistics) 🤔
##########
datafusion/physical-plan/src/statistics_context.rs:
##########
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Context for computing statistics in physical plans.
+//!
+//! [`StatisticsContext`] provides external context to
+//! [`ExecutionPlan::partition_statistics_with_context`], enabling operators
+//! to receive pre-computed child statistics and additional context for
+//! statistics computation.
+
+use crate::ExecutionPlan;
+use datafusion_common::Result;
+use datafusion_common::Statistics;
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::rc::Rc;
+use std::sync::Arc;
+
+/// Per-call memoization cache for [`compute_statistics`].
+///
+/// Keyed by `(plan node pointer address, partition)`. Created once per
+/// top-level [`compute_statistics`] call and shared across all recursive
+/// and operator-internal calls via [`StatisticsContext`].
+///
+/// The pointer-based key is safe within a single synchronous
+/// `compute_statistics` call: all `Arc<dyn ExecutionPlan>` nodes are held
+/// by the plan tree for the duration of the walk, so addresses cannot be
+/// reused.
+#[derive(Debug, Default)]
+struct StatsCache(HashMap<(usize, Option<usize>), Arc<Statistics>>);
+
+impl StatsCache {
+ fn get(
+ &self,
+ plan: &dyn ExecutionPlan,
+ partition: Option<usize>,
+ ) -> Option<&Arc<Statistics>> {
+ let key = (
+ plan as *const dyn ExecutionPlan as *const () as usize,
+ partition,
+ );
+ self.0.get(&key)
+ }
+
+ fn insert(
+ &mut self,
+ plan: &dyn ExecutionPlan,
+ partition: Option<usize>,
+ stats: Arc<Statistics>,
+ ) {
+ let key = (
+ plan as *const dyn ExecutionPlan as *const () as usize,
+ partition,
+ );
+ self.0.insert(key, stats);
+ }
+}
+
+/// Context passed to [`ExecutionPlan::partition_statistics_with_context`]
+/// carrying external information that operators can use when computing
+/// their statistics.
+#[derive(Debug)]
+pub struct StatisticsContext {
+ /// Pre-computed statistics for each child of the current node,
+ /// in the same order as [`ExecutionPlan::children`].
+ child_stats: Vec<Arc<Statistics>>,
+ /// Shared memoization cache for the current `compute_statistics` walk
+ cache: Option<Rc<RefCell<StatsCache>>>,
+}
+
+impl StatisticsContext {
+ /// Creates a new context with pre-computed child statistics.
+ pub fn new(child_stats: Vec<Arc<Statistics>>) -> Self {
+ Self {
+ child_stats,
+ cache: None,
+ }
+ }
+
+ /// Creates an empty context (for leaf nodes or when child stats
+ /// are not available).
+ pub fn empty() -> Self {
+ Self {
+ child_stats: Vec::new(),
+ cache: None,
+ }
+ }
+
+ /// Returns the pre-computed overall (`None`) statistics for each child
node.
+ /// For per-partition stats, use [`Self::compute_child_statistics`].
+ pub fn child_stats(&self) -> &[Arc<Statistics>] {
+ &self.child_stats
+ }
+
+ /// Computes statistics for a child plan, using the shared cache
+ /// from the current [`compute_statistics`] walk.
+ ///
+ /// Use this when [`Self::child_stats`] does not provide the right
+ /// granularity: partition-preserving operators needing per-partition
+ /// child stats (via `Some(partition)`), or partition-merging operators
+ /// needing overall stats (via `None`).
+ pub fn compute_child_statistics(
+ &self,
+ plan: &dyn ExecutionPlan,
+ partition: Option<usize>,
+ ) -> Result<Arc<Statistics>> {
+ match &self.cache {
+ Some(cache) => compute_statistics_inner(plan, partition, cache),
+ None => compute_statistics(plan, partition),
+ }
+ }
+}
+
+impl Clone for StatisticsContext {
+ fn clone(&self) -> Self {
+ Self {
+ child_stats: self.child_stats.clone(),
+ cache: self.cache.clone(),
+ }
+ }
+}
+
+impl Default for StatisticsContext {
+ fn default() -> Self {
+ Self::empty()
+ }
+}
+
+/// Computes statistics for a plan node by first recursively computing
+/// overall (`None`) statistics for all children, then calling
+/// [`ExecutionPlan::partition_statistics_with_context`] with the pre-computed
+/// child statistics.
+///
+/// Results are memoized within a single call: operators that internally
Review Comment:
I think putting this caching mechanism into `StatisticsArgs` could
potentially make the API cleaner and more encapsulated
##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -576,9 +577,17 @@ impl ExecutionPlan for FilterExec {
/// The output statistics of a filtering operation can be estimated if the
/// predicate's selectivity value can be determined for the incoming data.
- fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
- let input_stats =
- Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
+ fn partition_statistics_with_context(
+ &self,
+ partition: Option<usize>,
+ ctx: &StatisticsContext,
+ ) -> Result<Arc<Statistics>> {
+ let input_stats = match partition {
+ Some(_) => Arc::unwrap_or_clone(
+ ctx.compute_child_statistics(self.input.as_ref(), partition)?,
+ ),
+ None => Arc::unwrap_or_clone(Arc::clone(&ctx.child_stats()[0])),
+ };
Review Comment:
If we added a `StatisticsArgs` structure as I proposed above, we could
perhaps have this as a method on `StatisticsArgs`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]