alamb commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2112252563


##########
datafusion/common/src/pruning.rs:
##########
@@ -122,3 +126,1002 @@ pub trait PruningStatistics {
         values: &HashSet<ScalarValue>,
     ) -> Option<BooleanArray>;
 }
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+    /// Values for each column for each container.

Review Comment:
   as a follow on PR, it would be great to avoid this Vec<Scalar> and instead 
use a `Vec<ArrayRef>` which is what the eventual APIs need and are much more 
efficient than ScalarCalue



##########
datafusion/common/src/pruning.rs:
##########
@@ -122,3 +126,1002 @@ pub trait PruningStatistics {
         values: &HashSet<ScalarValue>,
     ) -> Option<BooleanArray>;
 }
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+    /// Values for each column for each container.
+    /// The outer vectors represent the columns while the inner
+    /// vectors represent the containers.
+    /// The order must match the order of the partition columns in
+    /// [`PartitionPruningStatistics::partition_schema`].
+    partition_values: Vec<Vec<ScalarValue>>,
+    /// The number of containers.
+    /// Stored since the partition values are column-major and if
+    /// there are no columns we wouldn't know the number of containers.
+    num_containers: usize,
+    /// The schema of the partition columns.
+    /// This must **not** be the schema of the entire file or table:
+    /// it must only be the schema of the partition columns,
+    /// in the same order as the values in 
[`PartitionPruningStatistics::partition_values`].
+    partition_schema: SchemaRef,
+}
+
+impl PartitionPruningStatistics {
+    /// Create a new instance of [`PartitionPruningStatistics`].
+    ///
+    /// Args:
+    /// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
+    ///   The outer vector represents the containers while the inner
+    ///   vector represents the partition values for each column.
+    ///   Note that this is the **opposite** of the order of the
+    ///   partition columns in `PartitionPruningStatistics::partition_schema`.
+    /// * `partition_schema`: The schema of the partition columns.
+    ///   This must **not** be the schema of the entire file or table:
+    ///   instead it must only be the schema of the partition columns,
+    ///   in the same order as the values in `partition_values`.
+    pub fn new(
+        partition_values: Vec<Vec<ScalarValue>>,
+        partition_fields: Vec<FieldRef>,
+    ) -> Self {
+        let num_containers = partition_values.len();
+        let partition_schema = Arc::new(Schema::new(partition_fields));
+        let mut partition_values_by_column =
+            vec![vec![]; partition_schema.fields().len()];
+        for partition_value in partition_values {
+            for (i, value) in partition_value.into_iter().enumerate() {
+                partition_values_by_column[i].push(value);
+            }
+        }
+        Self {
+            partition_values: partition_values_by_column,
+            num_containers,
+            partition_schema,
+        }
+    }
+}
+
+impl PruningStatistics for PartitionPruningStatistics {
+    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+        let index = self.partition_schema.index_of(column.name()).ok()?;
+        let partition_values = self.partition_values.get(index)?;
+        match ScalarValue::iter_to_array(partition_values.iter().cloned()) {

Review Comment:
   It is always sad to me that this API requires a clone of ScalarValue 😢  
(nothing you did in this PR)



##########
datafusion/common/src/pruning.rs:
##########
@@ -122,3 +126,1002 @@ pub trait PruningStatistics {
         values: &HashSet<ScalarValue>,
     ) -> Option<BooleanArray>;
 }
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+    /// Values for each column for each container.
+    /// The outer vectors represent the columns while the inner
+    /// vectors represent the containers.
+    /// The order must match the order of the partition columns in
+    /// [`PartitionPruningStatistics::partition_schema`].
+    partition_values: Vec<Vec<ScalarValue>>,
+    /// The number of containers.
+    /// Stored since the partition values are column-major and if
+    /// there are no columns we wouldn't know the number of containers.
+    num_containers: usize,
+    /// The schema of the partition columns.
+    /// This must **not** be the schema of the entire file or table:
+    /// it must only be the schema of the partition columns,
+    /// in the same order as the values in 
[`PartitionPruningStatistics::partition_values`].
+    partition_schema: SchemaRef,
+}
+
+impl PartitionPruningStatistics {
+    /// Create a new instance of [`PartitionPruningStatistics`].
+    ///
+    /// Args:
+    /// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
+    ///   The outer vector represents the containers while the inner
+    ///   vector represents the partition values for each column.
+    ///   Note that this is the **opposite** of the order of the
+    ///   partition columns in `PartitionPruningStatistics::partition_schema`.
+    /// * `partition_schema`: The schema of the partition columns.
+    ///   This must **not** be the schema of the entire file or table:
+    ///   instead it must only be the schema of the partition columns,
+    ///   in the same order as the values in `partition_values`.
+    pub fn new(
+        partition_values: Vec<Vec<ScalarValue>>,
+        partition_fields: Vec<FieldRef>,
+    ) -> Self {
+        let num_containers = partition_values.len();
+        let partition_schema = Arc::new(Schema::new(partition_fields));
+        let mut partition_values_by_column =
+            vec![vec![]; partition_schema.fields().len()];
+        for partition_value in partition_values {
+            for (i, value) in partition_value.into_iter().enumerate() {
+                partition_values_by_column[i].push(value);
+            }
+        }
+        Self {
+            partition_values: partition_values_by_column,
+            num_containers,
+            partition_schema,
+        }
+    }
+}
+
+impl PruningStatistics for PartitionPruningStatistics {
+    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+        let index = self.partition_schema.index_of(column.name()).ok()?;
+        let partition_values = self.partition_values.get(index)?;
+        match ScalarValue::iter_to_array(partition_values.iter().cloned()) {
+            Ok(array) => Some(array),
+            Err(_) => {
+                log::warn!(
+                    "Failed to convert min values to array for column {}",
+                    column.name()
+                );
+                None
+            }
+        }
+    }
+
+    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+        self.min_values(column)
+    }
+
+    fn num_containers(&self) -> usize {
+        self.num_containers
+    }
+
+    fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
+        None
+    }
+
+    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
+        None
+    }
+
+    fn contained(
+        &self,
+        column: &Column,
+        values: &HashSet<ScalarValue>,
+    ) -> Option<BooleanArray> {
+        let index = self.partition_schema.index_of(column.name()).ok()?;
+        let partition_values = self.partition_values.get(index)?;
+        let array = BooleanArray::from(
+            partition_values
+                .iter()
+                .map(|pv| Some(values.contains(pv)))
+                .collect::<Vec<_>>(),
+        );
+        Some(array)
+    }
+}
+
+/// Prune a set of containers represented by their statistics.
+/// Each [`Statistics`] represents a container (e.g. a file or a partition of 
files).
+#[derive(Clone)]
+pub struct PrunableStatistics {
+    /// Statistics for each container.
+    /// These are taken as a reference since they may be rather large / 
expensive to clone
+    /// and we often won't return all of them as ArrayRefs (we only return the 
columns the predicate requests).
+    statistics: Vec<Arc<Statistics>>,
+    /// The schema of the file these statistics are for.
+    schema: SchemaRef,
+}
+
+impl PrunableStatistics {
+    /// Create a new instance of [`PrunableStatistics`].
+    /// Each [`Statistics`] represents a container (e.g. a file or a partition 
of files).
+    /// The `schema` is the schema of the data in the containers and should 
apply to all files.
+    pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self {
+        Self { statistics, schema }
+    }
+
+    fn get_exact_column_statistics(
+        &self,
+        column: &Column,
+        get_stat: impl Fn(&ColumnStatistics) -> &Precision<ScalarValue>,
+    ) -> Option<ArrayRef> {
+        let index = self.schema.index_of(column.name()).ok()?;
+        let mut has_value = false;
+        match ScalarValue::iter_to_array(self.statistics.iter().map(|s| {
+            s.column_statistics
+                .get(index)
+                .and_then(|stat| {
+                    if let Precision::Exact(min) = get_stat(stat) {
+                        has_value = true;
+                        Some(min.clone())
+                    } else {
+                        None
+                    }
+                })
+                .unwrap_or(ScalarValue::Null)
+        })) {
+            // If there is any non-null value and no errors, return the array
+            Ok(array) => has_value.then_some(array),
+            Err(_) => {
+                log::warn!(
+                    "Failed to convert min values to array for column {}",
+                    column.name()
+                );
+                None
+            }
+        }
+    }
+}
+
+impl PruningStatistics for PrunableStatistics {
+    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+        self.get_exact_column_statistics(column, |stat| &stat.min_value)
+    }
+
+    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+        self.get_exact_column_statistics(column, |stat| &stat.max_value)
+    }
+
+    fn num_containers(&self) -> usize {
+        self.statistics.len()
+    }
+
+    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
+        let index = self.schema.index_of(column.name()).ok()?;
+        if self.statistics.iter().any(|s| {
+            s.column_statistics
+                .get(index)
+                .is_some_and(|stat| 
stat.null_count.is_exact().unwrap_or(false))
+        }) {
+            Some(Arc::new(
+                self.statistics
+                    .iter()
+                    .map(|s| {
+                        s.column_statistics.get(index).and_then(|stat| {
+                            if let Precision::Exact(null_count) = 
&stat.null_count {
+                                u64::try_from(*null_count).ok()
+                            } else {
+                                None
+                            }
+                        })
+                    })
+                    .collect::<UInt64Array>(),
+            ))
+        } else {
+            None
+        }
+    }
+
+    fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
+        // If the column does not exist in the schema, return None
+        if self.schema.index_of(column.name()).is_err() {
+            return None;
+        }
+        if self
+            .statistics
+            .iter()
+            .any(|s| s.num_rows.is_exact().unwrap_or(false))
+        {
+            Some(Arc::new(
+                self.statistics
+                    .iter()
+                    .map(|s| {
+                        if let Precision::Exact(row_count) = &s.num_rows {
+                            u64::try_from(*row_count).ok()
+                        } else {
+                            None
+                        }
+                    })
+                    .collect::<UInt64Array>(),
+            ))
+        } else {
+            None
+        }
+    }
+
+    fn contained(
+        &self,
+        _column: &Column,
+        _values: &HashSet<ScalarValue>,
+    ) -> Option<BooleanArray> {
+        None
+    }
+}
+
+/// Combine multiple [`PruningStatistics`] into a single
+/// [`CompositePruningStatistics`].
+/// This can be used to combine statistics from different sources,
+/// for example partition values and file statistics.
+/// This allows pruning with filters that depend on multiple sources of 
statistics,
+/// such as `WHERE partition_col = data_col`.
+/// This is done by iterating over the statistics and returning the first
+/// one that has information for the requested column.
+/// If multiple statistics have information for the same column,
+/// the first one is returned without any regard for completeness or accuracy.
+/// That is: if the first statistics has information for a column, even if it 
is incomplete,
+/// that is returned even if a later statistics has more complete information.
+pub struct CompositePruningStatistics {
+    pub statistics: Vec<Box<dyn PruningStatistics>>,
+}
+
+impl CompositePruningStatistics {
+    /// Create a new instance of [`CompositePruningStatistics`] from
+    /// a vector of [`PruningStatistics`].
+    pub fn new(statistics: Vec<Box<dyn PruningStatistics>>) -> Self {
+        assert!(!statistics.is_empty());
+        // Check that all statistics have the same number of containers
+        let num_containers = statistics[0].num_containers();
+        for stats in &statistics {
+            assert_eq!(num_containers, stats.num_containers());
+        }
+        Self { statistics }
+    }
+}
+
+impl PruningStatistics for CompositePruningStatistics {
+    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+        for stats in &self.statistics {
+            if let Some(array) = stats.min_values(column) {
+                return Some(array);
+            }
+        }
+        None
+    }
+
+    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+        for stats in &self.statistics {
+            if let Some(array) = stats.max_values(column) {
+                return Some(array);
+            }
+        }
+        None
+    }
+
+    fn num_containers(&self) -> usize {
+        self.statistics[0].num_containers()
+    }
+
+    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
+        for stats in &self.statistics {
+            if let Some(array) = stats.null_counts(column) {
+                return Some(array);
+            }
+        }
+        None
+    }
+
+    fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
+        for stats in &self.statistics {
+            if let Some(array) = stats.row_counts(column) {
+                return Some(array);
+            }
+        }
+        None
+    }
+
+    fn contained(
+        &self,
+        column: &Column,
+        values: &HashSet<ScalarValue>,
+    ) -> Option<BooleanArray> {
+        for stats in &self.statistics {
+            if let Some(array) = stats.contained(column, values) {
+                return Some(array);
+            }
+        }
+        None
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::{
+        cast::{as_int32_array, as_uint64_array},
+        ColumnStatistics,
+    };
+
+    use super::*;
+    use arrow::datatypes::{DataType, Field};
+    use std::sync::Arc;
+
+    #[test]
+    fn test_partition_pruning_statistics() {
+        let partition_values = vec![
+            vec![ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(2))],
+            vec![ScalarValue::Int32(Some(3)), ScalarValue::Int32(Some(4))],
+        ];
+        let partition_fields = vec![
+            Arc::new(Field::new("a", DataType::Int32, false)),
+            Arc::new(Field::new("b", DataType::Int32, false)),
+        ];
+        let partition_stats =
+            PartitionPruningStatistics::new(partition_values, 
partition_fields);
+
+        let column_a = Column::new_unqualified("a");
+        let column_b = Column::new_unqualified("b");
+
+        // Partition values don't know anything about nulls or row counts
+        assert!(partition_stats.null_counts(&column_a).is_none());
+        assert!(partition_stats.row_counts(&column_a).is_none());
+        assert!(partition_stats.null_counts(&column_b).is_none());
+        assert!(partition_stats.row_counts(&column_b).is_none());
+
+        // Min/max values are the same as the partition values
+        let min_values_a =
+            as_int32_array(&partition_stats.min_values(&column_a).unwrap())
+                .unwrap()
+                .into_iter()
+                .collect::<Vec<_>>();
+        let expected_values_a = vec![Some(1), Some(3)];
+        assert_eq!(min_values_a, expected_values_a);
+        let max_values_a =
+            as_int32_array(&partition_stats.max_values(&column_a).unwrap())
+                .unwrap()
+                .into_iter()
+                .collect::<Vec<_>>();
+        let expected_values_a = vec![Some(1), Some(3)];
+        assert_eq!(max_values_a, expected_values_a);
+
+        let min_values_b =
+            as_int32_array(&partition_stats.min_values(&column_b).unwrap())
+                .unwrap()
+                .into_iter()
+                .collect::<Vec<_>>();
+        let expected_values_b = vec![Some(2), Some(4)];
+        assert_eq!(min_values_b, expected_values_b);
+        let max_values_b =
+            as_int32_array(&partition_stats.max_values(&column_b).unwrap())
+                .unwrap()
+                .into_iter()
+                .collect::<Vec<_>>();
+        let expected_values_b = vec![Some(2), Some(4)];
+        assert_eq!(max_values_b, expected_values_b);
+
+        // Contained values are only true for the partition values
+        let values = HashSet::from([ScalarValue::Int32(Some(1))]);
+        let contained_a = partition_stats.contained(&column_a, 
&values).unwrap();
+        let expected_contained_a = BooleanArray::from(vec![true, false]);
+        assert_eq!(contained_a, expected_contained_a);
+        let contained_b = partition_stats.contained(&column_b, 
&values).unwrap();
+        let expected_contained_b = BooleanArray::from(vec![false, false]);
+        assert_eq!(contained_b, expected_contained_b);
+
+        // The number of containers is the length of the partition values
+        assert_eq!(partition_stats.num_containers(), 2);
+    }
+
+    #[test]
+    fn test_partition_pruning_statistics_empty() {
+        let partition_values = vec![];
+        let partition_fields = vec![
+            Arc::new(Field::new("a", DataType::Int32, false)),
+            Arc::new(Field::new("b", DataType::Int32, false)),
+        ];
+        let partition_stats =
+            PartitionPruningStatistics::new(partition_values, 
partition_fields);
+
+        let column_a = Column::new_unqualified("a");
+        let column_b = Column::new_unqualified("b");
+
+        // Partition values don't know anything about nulls or row counts
+        assert!(partition_stats.null_counts(&column_a).is_none());
+        assert!(partition_stats.row_counts(&column_a).is_none());
+        assert!(partition_stats.null_counts(&column_b).is_none());
+        assert!(partition_stats.row_counts(&column_b).is_none());
+
+        // Min/max values are all missing
+        assert!(partition_stats.min_values(&column_a).is_none());
+        assert!(partition_stats.max_values(&column_a).is_none());
+        assert!(partition_stats.min_values(&column_b).is_none());
+        assert!(partition_stats.max_values(&column_b).is_none());
+
+        // Contained values are all empty
+        let values = HashSet::from([ScalarValue::Int32(Some(1))]);
+        let contained_a = partition_stats.contained(&column_a, &values);
+        let expected_contained_a = 
BooleanArray::from(Vec::<Option<bool>>::new());
+        assert_eq!(contained_a, Some(expected_contained_a));
+    }
+
+    #[test]
+    fn test_statistics_pruning_statistics() {
+        let statistics = vec![
+            Arc::new(
+                Statistics::default()
+                    .add_column_statistics(
+                        ColumnStatistics::new_unknown()
+                            
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))

Review Comment:
   a very minor thing that might make this easier to read is to use the 
`ScalarValue::from`
   
   ```suggestion
                               
.with_min_value(Precision::Exact(ScalarValue::from(0i32))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to