Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
xudong963 commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2115539220
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +127,984 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
+/// The outer vectors represent the columns while the inner
+/// vectors represent the containers.
+/// The order must match the order of the partition columns in
+/// [`PartitionPruningStatistics::partition_schema`].
+partition_values: Vec,
+/// The number of containers.
+/// Stored since the partition values are column-major and if
+/// there are no columns we wouldn't know the number of containers.
+num_containers: usize,
+/// The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// it must only be the schema of the partition columns,
+/// in the same order as the values in
[`PartitionPruningStatistics::partition_values`].
+partition_schema: SchemaRef,
+}
+
+impl PartitionPruningStatistics {
+/// Create a new instance of [`PartitionPruningStatistics`].
+///
+/// Args:
+/// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
+/// The outer vector represents the containers while the inner
+/// vector represents the partition values for each column.
+/// Note that this is the **opposite** of the order of the
+/// partition columns in `PartitionPruningStatistics::partition_schema`.
+/// * `partition_schema`: The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// instead it must only be the schema of the partition columns,
+/// in the same order as the values in `partition_values`.
+pub fn try_new(
+partition_values: Vec>,
+partition_fields: Vec,
+) -> Result {
+let num_containers = partition_values.len();
+let partition_schema = Arc::new(Schema::new(partition_fields));
+let mut partition_values_by_column =
+vec![
+Vec::with_capacity(partition_values.len());
+partition_schema.fields().len()
+];
+for partition_value in partition_values {
+for (i, value) in partition_value.into_iter().enumerate() {
+partition_values_by_column[i].push(value);
+}
+}
+Ok(Self {
+partition_values: partition_values_by_column
+.into_iter()
+.map(|v| {
+if v.is_empty() {
+Ok(Arc::new(NullArray::new(0)) as ArrayRef)
+} else {
+ScalarValue::iter_to_array(v)
+}
+})
+.collect::, _>>()?,
+num_containers,
+partition_schema,
+})
+}
+}
+
+impl PruningStatistics for PartitionPruningStatistics {
+fn min_values(&self, column: &Column) -> Option {
+let index = self.partition_schema.index_of(column.name()).ok()?;
+self.partition_values.get(index).and_then(|v| {
+if v.is_empty() || v.null_count() == v.len() {
+// If the array is empty or all nulls, return None
+None
+} else {
+// Otherwise, return the array as is
+Some(Arc::clone(v))
+}
+})
+}
+
+fn max_values(&self, column: &Column) -> Option {
+self.min_values(column)
+}
+
+fn num_containers(&self) -> usize {
+self.num_containers
+}
+
+fn null_counts(&self, _column: &Column) -> Option {
+None
+}
+
+fn row_counts(&self, _column: &Column) -> Option {
+None
+}
+
+fn contained(
+&self,
+column: &Column,
+values: &HashSet,
+) -> Option {
+let index = self.partition_schema.index_of(column.name()).ok()?;
+let array = self.partition_values.get(index)?;
+let boolean_array = values.iter().try_fold(None, |acc, v| {
+let arrow_value = v.to_scalar().ok()?;
+let eq_result = arrow::compute::kernels::cmp::eq(array,
&arrow_value).ok()?;
+match acc {
+None => Some(Some(eq_result)),
+Some(acc_array) => {
+arrow::compute::kernels::boolean::and(&acc_array,
&eq_result)
+.map(Some)
+.ok()
+
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
alamb merged PR #16139: URL: https://github.com/apache/datafusion/pull/16139 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
alamb commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2115649463
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +127,984 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
+/// The outer vectors represent the columns while the inner
+/// vectors represent the containers.
+/// The order must match the order of the partition columns in
+/// [`PartitionPruningStatistics::partition_schema`].
+partition_values: Vec,
+/// The number of containers.
+/// Stored since the partition values are column-major and if
+/// there are no columns we wouldn't know the number of containers.
+num_containers: usize,
+/// The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// it must only be the schema of the partition columns,
+/// in the same order as the values in
[`PartitionPruningStatistics::partition_values`].
+partition_schema: SchemaRef,
+}
+
+impl PartitionPruningStatistics {
+/// Create a new instance of [`PartitionPruningStatistics`].
+///
+/// Args:
+/// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
+/// The outer vector represents the containers while the inner
+/// vector represents the partition values for each column.
+/// Note that this is the **opposite** of the order of the
+/// partition columns in `PartitionPruningStatistics::partition_schema`.
+/// * `partition_schema`: The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// instead it must only be the schema of the partition columns,
+/// in the same order as the values in `partition_values`.
+pub fn try_new(
+partition_values: Vec>,
+partition_fields: Vec,
+) -> Result {
+let num_containers = partition_values.len();
+let partition_schema = Arc::new(Schema::new(partition_fields));
+let mut partition_values_by_column =
+vec![
+Vec::with_capacity(partition_values.len());
+partition_schema.fields().len()
+];
+for partition_value in partition_values {
+for (i, value) in partition_value.into_iter().enumerate() {
+partition_values_by_column[i].push(value);
+}
+}
+Ok(Self {
+partition_values: partition_values_by_column
+.into_iter()
+.map(|v| {
+if v.is_empty() {
+Ok(Arc::new(NullArray::new(0)) as ArrayRef)
+} else {
+ScalarValue::iter_to_array(v)
+}
+})
+.collect::, _>>()?,
+num_containers,
+partition_schema,
+})
+}
+}
+
+impl PruningStatistics for PartitionPruningStatistics {
+fn min_values(&self, column: &Column) -> Option {
+let index = self.partition_schema.index_of(column.name()).ok()?;
+self.partition_values.get(index).and_then(|v| {
+if v.is_empty() || v.null_count() == v.len() {
+// If the array is empty or all nulls, return None
+None
+} else {
+// Otherwise, return the array as is
+Some(Arc::clone(v))
+}
+})
+}
+
+fn max_values(&self, column: &Column) -> Option {
+self.min_values(column)
+}
+
+fn num_containers(&self) -> usize {
+self.num_containers
+}
+
+fn null_counts(&self, _column: &Column) -> Option {
+None
+}
+
+fn row_counts(&self, _column: &Column) -> Option {
+None
+}
+
+fn contained(
+&self,
+column: &Column,
+values: &HashSet,
+) -> Option {
+let index = self.partition_schema.index_of(column.name()).ok()?;
+let array = self.partition_values.get(index)?;
+let boolean_array = values.iter().try_fold(None, |acc, v| {
+let arrow_value = v.to_scalar().ok()?;
+let eq_result = arrow::compute::kernels::cmp::eq(array,
&arrow_value).ok()?;
+match acc {
+None => Some(Some(eq_result)),
+Some(acc_array) => {
+arrow::compute::kernels::boolean::and(&acc_array,
&eq_result)
+.map(Some)
+.ok()
+
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
alamb commented on PR #16139: URL: https://github.com/apache/datafusion/pull/16139#issuecomment-2921985245 Thanks @adriangb and @xudong963 -- this looks good to me 🚀 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
alamb commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2115644544
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +127,984 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
+/// The outer vectors represent the columns while the inner
+/// vectors represent the containers.
+/// The order must match the order of the partition columns in
+/// [`PartitionPruningStatistics::partition_schema`].
+partition_values: Vec,
+/// The number of containers.
+/// Stored since the partition values are column-major and if
+/// there are no columns we wouldn't know the number of containers.
+num_containers: usize,
+/// The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// it must only be the schema of the partition columns,
+/// in the same order as the values in
[`PartitionPruningStatistics::partition_values`].
+partition_schema: SchemaRef,
+}
+
+impl PartitionPruningStatistics {
+/// Create a new instance of [`PartitionPruningStatistics`].
+///
+/// Args:
+/// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
+/// The outer vector represents the containers while the inner
+/// vector represents the partition values for each column.
+/// Note that this is the **opposite** of the order of the
+/// partition columns in `PartitionPruningStatistics::partition_schema`.
+/// * `partition_schema`: The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// instead it must only be the schema of the partition columns,
+/// in the same order as the values in `partition_values`.
+pub fn try_new(
+partition_values: Vec>,
+partition_fields: Vec,
+) -> Result {
+let num_containers = partition_values.len();
+let partition_schema = Arc::new(Schema::new(partition_fields));
+let mut partition_values_by_column =
+vec![
+Vec::with_capacity(partition_values.len());
+partition_schema.fields().len()
+];
+for partition_value in partition_values {
+for (i, value) in partition_value.into_iter().enumerate() {
+partition_values_by_column[i].push(value);
+}
+}
+Ok(Self {
+partition_values: partition_values_by_column
+.into_iter()
+.map(|v| {
+if v.is_empty() {
+Ok(Arc::new(NullArray::new(0)) as ArrayRef)
+} else {
+ScalarValue::iter_to_array(v)
+}
+})
+.collect::, _>>()?,
+num_containers,
+partition_schema,
+})
+}
+}
+
+impl PruningStatistics for PartitionPruningStatistics {
+fn min_values(&self, column: &Column) -> Option {
+let index = self.partition_schema.index_of(column.name()).ok()?;
+self.partition_values.get(index).and_then(|v| {
+if v.is_empty() || v.null_count() == v.len() {
+// If the array is empty or all nulls, return None
+None
+} else {
+// Otherwise, return the array as is
+Some(Arc::clone(v))
+}
+})
+}
+
+fn max_values(&self, column: &Column) -> Option {
+self.min_values(column)
+}
+
+fn num_containers(&self) -> usize {
+self.num_containers
+}
+
+fn null_counts(&self, _column: &Column) -> Option {
+None
+}
+
+fn row_counts(&self, _column: &Column) -> Option {
+None
+}
+
+fn contained(
+&self,
+column: &Column,
+values: &HashSet,
+) -> Option {
+let index = self.partition_schema.index_of(column.name()).ok()?;
+let array = self.partition_values.get(index)?;
+let boolean_array = values.iter().try_fold(None, |acc, v| {
+let arrow_value = v.to_scalar().ok()?;
+let eq_result = arrow::compute::kernels::cmp::eq(array,
&arrow_value).ok()?;
+match acc {
+None => Some(Some(eq_result)),
+Some(acc_array) => {
+arrow::compute::kernels::boolean::and(&acc_array,
&eq_result)
+.map(Some)
+.ok()
+
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
adriangb commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2114104284
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +127,984 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
+/// The outer vectors represent the columns while the inner
+/// vectors represent the containers.
+/// The order must match the order of the partition columns in
+/// [`PartitionPruningStatistics::partition_schema`].
+partition_values: Vec,
+/// The number of containers.
+/// Stored since the partition values are column-major and if
+/// there are no columns we wouldn't know the number of containers.
+num_containers: usize,
+/// The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// it must only be the schema of the partition columns,
+/// in the same order as the values in
[`PartitionPruningStatistics::partition_values`].
+partition_schema: SchemaRef,
+}
+
+impl PartitionPruningStatistics {
+/// Create a new instance of [`PartitionPruningStatistics`].
+///
+/// Args:
+/// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
+/// The outer vector represents the containers while the inner
+/// vector represents the partition values for each column.
+/// Note that this is the **opposite** of the order of the
+/// partition columns in `PartitionPruningStatistics::partition_schema`.
+/// * `partition_schema`: The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// instead it must only be the schema of the partition columns,
+/// in the same order as the values in `partition_values`.
+pub fn try_new(
+partition_values: Vec>,
+partition_fields: Vec,
+) -> Result {
+let num_containers = partition_values.len();
+let partition_schema = Arc::new(Schema::new(partition_fields));
+let mut partition_values_by_column =
+vec![
+Vec::with_capacity(partition_values.len());
+partition_schema.fields().len()
+];
+for partition_value in partition_values {
+for (i, value) in partition_value.into_iter().enumerate() {
+partition_values_by_column[i].push(value);
+}
+}
+Ok(Self {
+partition_values: partition_values_by_column
+.into_iter()
+.map(|v| {
+if v.is_empty() {
+Ok(Arc::new(NullArray::new(0)) as ArrayRef)
+} else {
+ScalarValue::iter_to_array(v)
+}
+})
+.collect::, _>>()?,
+num_containers,
+partition_schema,
+})
+}
+}
+
+impl PruningStatistics for PartitionPruningStatistics {
+fn min_values(&self, column: &Column) -> Option {
+let index = self.partition_schema.index_of(column.name()).ok()?;
+self.partition_values.get(index).and_then(|v| {
+if v.is_empty() || v.null_count() == v.len() {
+// If the array is empty or all nulls, return None
+None
+} else {
+// Otherwise, return the array as is
+Some(Arc::clone(v))
+}
+})
+}
+
+fn max_values(&self, column: &Column) -> Option {
+self.min_values(column)
+}
+
+fn num_containers(&self) -> usize {
+self.num_containers
+}
+
+fn null_counts(&self, _column: &Column) -> Option {
+None
+}
+
+fn row_counts(&self, _column: &Column) -> Option {
+None
+}
+
+fn contained(
+&self,
+column: &Column,
+values: &HashSet,
+) -> Option {
+let index = self.partition_schema.index_of(column.name()).ok()?;
+let array = self.partition_values.get(index)?;
+let boolean_array = values.iter().try_fold(None, |acc, v| {
+let arrow_value = v.to_scalar().ok()?;
+let eq_result = arrow::compute::kernels::cmp::eq(array,
&arrow_value).ok()?;
+match acc {
+None => Some(Some(eq_result)),
+Some(acc_array) => {
+arrow::compute::kernels::boolean::and(&acc_array,
&eq_result)
+.map(Some)
+.ok()
+
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
adriangb commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2114102421
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +127,984 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
+/// The outer vectors represent the columns while the inner
+/// vectors represent the containers.
+/// The order must match the order of the partition columns in
+/// [`PartitionPruningStatistics::partition_schema`].
+partition_values: Vec,
+/// The number of containers.
+/// Stored since the partition values are column-major and if
+/// there are no columns we wouldn't know the number of containers.
+num_containers: usize,
+/// The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// it must only be the schema of the partition columns,
+/// in the same order as the values in
[`PartitionPruningStatistics::partition_values`].
+partition_schema: SchemaRef,
+}
+
+impl PartitionPruningStatistics {
+/// Create a new instance of [`PartitionPruningStatistics`].
+///
+/// Args:
+/// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
+/// The outer vector represents the containers while the inner
+/// vector represents the partition values for each column.
+/// Note that this is the **opposite** of the order of the
+/// partition columns in `PartitionPruningStatistics::partition_schema`.
+/// * `partition_schema`: The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// instead it must only be the schema of the partition columns,
+/// in the same order as the values in `partition_values`.
+pub fn try_new(
+partition_values: Vec>,
+partition_fields: Vec,
+) -> Result {
+let num_containers = partition_values.len();
+let partition_schema = Arc::new(Schema::new(partition_fields));
+let mut partition_values_by_column =
+vec![
+Vec::with_capacity(partition_values.len());
+partition_schema.fields().len()
+];
+for partition_value in partition_values {
+for (i, value) in partition_value.into_iter().enumerate() {
+partition_values_by_column[i].push(value);
+}
+}
+Ok(Self {
+partition_values: partition_values_by_column
+.into_iter()
+.map(|v| {
+if v.is_empty() {
+Ok(Arc::new(NullArray::new(0)) as ArrayRef)
+} else {
+ScalarValue::iter_to_array(v)
+}
+})
+.collect::, _>>()?,
+num_containers,
+partition_schema,
+})
+}
+}
+
+impl PruningStatistics for PartitionPruningStatistics {
+fn min_values(&self, column: &Column) -> Option {
+let index = self.partition_schema.index_of(column.name()).ok()?;
+self.partition_values.get(index).and_then(|v| {
+if v.is_empty() || v.null_count() == v.len() {
+// If the array is empty or all nulls, return None
+None
+} else {
+// Otherwise, return the array as is
+Some(Arc::clone(v))
+}
+})
+}
+
+fn max_values(&self, column: &Column) -> Option {
+self.min_values(column)
+}
+
+fn num_containers(&self) -> usize {
+self.num_containers
+}
+
+fn null_counts(&self, _column: &Column) -> Option {
+None
+}
+
+fn row_counts(&self, _column: &Column) -> Option {
+None
+}
+
+fn contained(
+&self,
+column: &Column,
+values: &HashSet,
+) -> Option {
+let index = self.partition_schema.index_of(column.name()).ok()?;
+let array = self.partition_values.get(index)?;
+let boolean_array = values.iter().try_fold(None, |acc, v| {
+let arrow_value = v.to_scalar().ok()?;
+let eq_result = arrow::compute::kernels::cmp::eq(array,
&arrow_value).ok()?;
+match acc {
+None => Some(Some(eq_result)),
+Some(acc_array) => {
+arrow::compute::kernels::boolean::and(&acc_array,
&eq_result)
+.map(Some)
+.ok()
+
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
xudong963 commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2113890230
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +127,984 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
+/// The outer vectors represent the columns while the inner
+/// vectors represent the containers.
+/// The order must match the order of the partition columns in
+/// [`PartitionPruningStatistics::partition_schema`].
+partition_values: Vec,
+/// The number of containers.
+/// Stored since the partition values are column-major and if
+/// there are no columns we wouldn't know the number of containers.
+num_containers: usize,
+/// The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// it must only be the schema of the partition columns,
+/// in the same order as the values in
[`PartitionPruningStatistics::partition_values`].
+partition_schema: SchemaRef,
+}
+
+impl PartitionPruningStatistics {
+/// Create a new instance of [`PartitionPruningStatistics`].
+///
+/// Args:
+/// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
+/// The outer vector represents the containers while the inner
+/// vector represents the partition values for each column.
+/// Note that this is the **opposite** of the order of the
+/// partition columns in `PartitionPruningStatistics::partition_schema`.
+/// * `partition_schema`: The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// instead it must only be the schema of the partition columns,
+/// in the same order as the values in `partition_values`.
+pub fn try_new(
+partition_values: Vec>,
+partition_fields: Vec,
+) -> Result {
+let num_containers = partition_values.len();
+let partition_schema = Arc::new(Schema::new(partition_fields));
+let mut partition_values_by_column =
+vec![
+Vec::with_capacity(partition_values.len());
+partition_schema.fields().len()
+];
+for partition_value in partition_values {
+for (i, value) in partition_value.into_iter().enumerate() {
+partition_values_by_column[i].push(value);
+}
+}
+Ok(Self {
+partition_values: partition_values_by_column
+.into_iter()
+.map(|v| {
+if v.is_empty() {
+Ok(Arc::new(NullArray::new(0)) as ArrayRef)
+} else {
+ScalarValue::iter_to_array(v)
+}
+})
+.collect::, _>>()?,
+num_containers,
+partition_schema,
+})
+}
+}
+
+impl PruningStatistics for PartitionPruningStatistics {
+fn min_values(&self, column: &Column) -> Option {
+let index = self.partition_schema.index_of(column.name()).ok()?;
+self.partition_values.get(index).and_then(|v| {
+if v.is_empty() || v.null_count() == v.len() {
+// If the array is empty or all nulls, return None
+None
+} else {
+// Otherwise, return the array as is
+Some(Arc::clone(v))
+}
+})
+}
+
+fn max_values(&self, column: &Column) -> Option {
+self.min_values(column)
+}
+
+fn num_containers(&self) -> usize {
+self.num_containers
+}
+
+fn null_counts(&self, _column: &Column) -> Option {
+None
+}
+
+fn row_counts(&self, _column: &Column) -> Option {
+None
+}
+
+fn contained(
+&self,
+column: &Column,
+values: &HashSet,
+) -> Option {
+let index = self.partition_schema.index_of(column.name()).ok()?;
+let array = self.partition_values.get(index)?;
+let boolean_array = values.iter().try_fold(None, |acc, v| {
+let arrow_value = v.to_scalar().ok()?;
+let eq_result = arrow::compute::kernels::cmp::eq(array,
&arrow_value).ok()?;
+match acc {
+None => Some(Some(eq_result)),
+Some(acc_array) => {
+arrow::compute::kernels::boolean::and(&acc_array,
&eq_result)
+.map(Some)
+.ok()
+
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
adriangb commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2112613924
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +126,1002 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
Review Comment:
I went with
https://github.com/apache/datafusion/commit/0def286886df92f16e57e6cf091df32b9113cdf4.
I'll do another round to fix clippy and remove a collect()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
alamb commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2112606619
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +126,1002 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
Review Comment:
I think `BooleanArray::from_iter_values(vec[true, false])` might do the trick
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
adriangb commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2112477825
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +126,1002 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
Review Comment:
I tried in
[23e3425](https://github.com/apache/datafusion/pull/16139/commits/23e342589b077b60cb5c05c367e77836d0a657fd)
but it's obviously wrong
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
adriangb commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2112474979
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +126,1002 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
Review Comment:
@alamb given an `array: Arc` and `values: HashSet`
what is the best way to do the comparison to get back a boolean array of length
`array` where each items is true if the value from the array is in `values` and
false if not? I'm struggling to find the right kernels, etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
alamb commented on PR #16139: URL: https://github.com/apache/datafusion/pull/16139#issuecomment-2917083133 > I'm not able to request reviews. I think only commiters can do that and I'm not a commiter (yet). I think you will need to do the gitbox thing with your apache account (when it is activated) and then you will be an official committer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
adriangb commented on PR #16139: URL: https://github.com/apache/datafusion/pull/16139#issuecomment-2916950224 > Sorry for late, I'll check tomorrow (feel free to directly invite me to review by the button, then I'll notice more) I'm not able to request reviews. I think only commiters can do that and I'm not a commiter (yet). https://github.com/user-attachments/assets/cce67653-ffa6-4980-bdad-aee88d2d86d1"; /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
xudong963 commented on PR #16139: URL: https://github.com/apache/datafusion/pull/16139#issuecomment-2916919380 Sorry for late, I'll check tomorrow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
adriangb commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2112282032
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +126,1002 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
Review Comment:
I can try to push it here 😄
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
alamb commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2112252563
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +126,1002 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
Review Comment:
as a follow on PR, it would be great to avoid this Vec and instead
use a `Vec` which is what the eventual APIs need and are much more
efficient than ScalarCalue
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +126,1002 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
+/// The outer vectors represent the columns while the inner
+/// vectors represent the containers.
+/// The order must match the order of the partition columns in
+/// [`PartitionPruningStatistics::partition_schema`].
+partition_values: Vec>,
+/// The number of containers.
+/// Stored since the partition values are column-major and if
+/// there are no columns we wouldn't know the number of containers.
+num_containers: usize,
+/// The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// it must only be the schema of the partition columns,
+/// in the same order as the values in
[`PartitionPruningStatistics::partition_values`].
+partition_schema: SchemaRef,
+}
+
+impl PartitionPruningStatistics {
+/// Create a new instance of [`PartitionPruningStatistics`].
+///
+/// Args:
+/// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
+/// The outer vector represents the containers while the inner
+/// vector represents the partition values for each column.
+/// Note that this is the **opposite** of the order of the
+/// partition columns in `PartitionPruningStatistics::partition_schema`.
+/// * `partition_schema`: The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// instead it must only be the schema of the partition columns,
+/// in the same order as the values in `partition_values`.
+pub fn new(
+partition_values: Vec>,
+partition_fields: Vec,
+) -> Self {
+let num_containers = partition_values.len();
+let partition_schema = Arc::new(Schema::new(partition_fields));
+let mut partition_values_by_column =
+vec![vec![]; partition_schema.fields().len()];
+for partition_value in partition_values {
+for (i, value) in partition_value.into_iter().enumerate() {
+partition_values_by_column[i].push(value);
+}
+}
+Self {
+partition_values: partition_values_by_column,
+num_containers,
+partition_schema,
+}
+}
+}
+
+impl PruningStatistics for PartitionPruningStatistics {
+fn min_values(&self, column: &Column) -> Option {
+let index = self.partition_schema.index_of(column.name()).ok()?;
+let partition_values = self.partition_values.get(index)?;
+match ScalarValue::iter_to_array(partition_values.iter().cloned()) {
Review Comment:
It is always sad to me that this API requires a clone of ScalarValue 😢
(nothing you did in this PR)
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +126,1002 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
+/// The outer vectors represent the columns while the inner
+/// vectors represent the containers.
+/// The order must match the
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
adriangb commented on PR #16139: URL: https://github.com/apache/datafusion/pull/16139#issuecomment-2904518547 Thank you @kosiew that was great feedback 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
kosiew commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2103751109
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +126,1032 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
+/// The outer vectors represent the columns while the inner
+/// vectors represent the containers.
+/// The order must match the order of the partition columns in
+/// [`PartitionPruningStatistics::partition_schema`].
+partition_values: Vec>,
+/// The number of containers.
+/// Stored since the partition values are column-major and if
+/// there are no columns we wouldn't know the number of containers.
+num_containers: usize,
+/// The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// it must only be the schema of the partition columns,
+/// in the same order as the values in
[`PartitionPruningStatistics::partition_values`].
+partition_schema: SchemaRef,
+}
+
+impl PartitionPruningStatistics {
+/// Create a new instance of [`PartitionPruningStatistics`].
+///
+/// Args:
+/// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
+/// The outer vector represents the containers while the inner
+/// vector represents the partition values for each column.
Review Comment:
constructor accepts partition_values as a Vec, documented as “outer
vector represents the containers while the inner vector represents the
partition values for each column.” In code however, each inner Vec is treated
as the values for one container, then transpose that into column-major storage.
The phrasing “inner vector represents the partition values for each column”
can be read as “one column’s values across containers.”
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +126,1032 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+/// Values for each column for each container.
+/// The outer vectors represent the columns while the inner
+/// vectors represent the containers.
+/// The order must match the order of the partition columns in
+/// [`PartitionPruningStatistics::partition_schema`].
+partition_values: Vec>,
+/// The number of containers.
+/// Stored since the partition values are column-major and if
+/// there are no columns we wouldn't know the number of containers.
+num_containers: usize,
+/// The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// it must only be the schema of the partition columns,
+/// in the same order as the values in
[`PartitionPruningStatistics::partition_values`].
+partition_schema: SchemaRef,
+}
+
+impl PartitionPruningStatistics {
+/// Create a new instance of [`PartitionPruningStatistics`].
+///
+/// Args:
+/// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
+/// The outer vector represents the containers while the inner
+/// vector represents the partition values for each column.
+/// Note that this is the **opposite** of the order of the
+/// partition columns in `PartitionPruningStatistics::partition_schema`.
+/// * `partition_schema`: The schema of the partition columns.
+/// This must **not** be the schema of the entire file or table:
+/// instead it must only be the schema of the partition columns,
+/// in the same order as the values in `partition_values`.
+pub fn new(
+partition_values: Vec>,
+partition_fields: Vec,
+) -> Self {
+let num_containers = partition_values.len();
+let partition_schema = Arc::new(Schema::new(partition_fields));
+let mut partition_valeus_by_column =
Review Comment:
```suggestion
let mut partition_values_by_column =
```
##
datafusion/common/src/pruning.rs:
##
@@ -122,3 +126,1032 @@ pub trait PruningStatistics {
values: &HashSet,
) -> Option;
}
+
+///
Re: [PR] Add new stats pruning helpers to allow combining partition values in file level stats [datafusion]
adriangb commented on PR #16139: URL: https://github.com/apache/datafusion/pull/16139#issuecomment-2898676926 @xudong963 any chance you can review this since you've already approved the same code (with less tests!) in the original PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
