This is an automated email from the ASF dual-hosted git repository.
berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 71996fba9b make AnalysisContext aware of empty sets to represent
certainly false bounds (#14279)
71996fba9b is described below
commit 71996fba9bdc797ba142dc41d6e830ae60d8e1ac
Author: Burak Şen <[email protected]>
AuthorDate: Tue Jan 28 17:38:13 2025 +0300
make AnalysisContext aware of empty sets to represent certainly false
bounds (#14279)
* ready for review
* fmt and lint
* Apply suggestions from code review
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
* apply reviews
* fix test
* Update analysis.rs
* Update analysis.rs
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
Co-authored-by: berkaysynnada <[email protected]>
---
datafusion-examples/examples/expr_api.rs | 2 +-
datafusion/physical-expr/src/analysis.rs | 185 ++++++++++++++++++++++---------
datafusion/physical-plan/src/filter.rs | 25 +++--
3 files changed, 150 insertions(+), 62 deletions(-)
diff --git a/datafusion-examples/examples/expr_api.rs
b/datafusion-examples/examples/expr_api.rs
index 6bfde2ebbf..2908edbb75 100644
--- a/datafusion-examples/examples/expr_api.rs
+++ b/datafusion-examples/examples/expr_api.rs
@@ -270,7 +270,7 @@ fn range_analysis_demo() -> Result<()> {
// In this case, we can see that, as expected, `analyze` has figured out
// that in this case, `date` must be in the range `['2020-09-01',
'2020-10-01']`
let expected_range = Interval::try_new(september_1, october_1)?;
- assert_eq!(analysis_result.boundaries[0].interval, expected_range);
+ assert_eq!(analysis_result.boundaries[0].interval, Some(expected_range));
Ok(())
}
diff --git a/datafusion/physical-expr/src/analysis.rs
b/datafusion/physical-expr/src/analysis.rs
index b602a9cba4..ceec21c711 100644
--- a/datafusion/physical-expr/src/analysis.rs
+++ b/datafusion/physical-expr/src/analysis.rs
@@ -81,8 +81,12 @@ impl AnalysisContext {
#[derive(Clone, Debug, PartialEq)]
pub struct ExprBoundaries {
pub column: Column,
- /// Minimum and maximum values this expression can have.
- pub interval: Interval,
+ /// Minimum and maximum values this expression can have. A `None` value
+ /// indicates that evaluating the given column results in an empty set.
+ /// For example, if the column `a` has values in the range [10, 20],
+ /// and there is a filter asserting that `a > 50`, then the resulting
interval
+ /// range of `a` will be `None`.
+ pub interval: Option<Interval>,
/// Maximum number of distinct values this expression can produce, if
known.
pub distinct_count: Precision<usize>,
}
@@ -118,7 +122,7 @@ impl ExprBoundaries {
let column = Column::new(field.name(), col_index);
Ok(ExprBoundaries {
column,
- interval,
+ interval: Some(interval),
distinct_count: col_stats.distinct_count,
})
}
@@ -133,7 +137,7 @@ impl ExprBoundaries {
.map(|(i, field)| {
Ok(Self {
column: Column::new(field.name(), i),
- interval: Interval::make_unbounded(field.data_type())?,
+ interval:
Some(Interval::make_unbounded(field.data_type())?),
distinct_count: Precision::Absent,
})
})
@@ -161,40 +165,71 @@ pub fn analyze(
context: AnalysisContext,
schema: &Schema,
) -> Result<AnalysisContext> {
- let target_boundaries = context.boundaries;
-
- let mut graph = ExprIntervalGraph::try_new(Arc::clone(expr), schema)?;
-
- let columns = collect_columns(expr)
- .into_iter()
- .map(|c| Arc::new(c) as _)
- .collect::<Vec<_>>();
-
- let target_expr_and_indices =
graph.gather_node_indices(columns.as_slice());
-
- let mut target_indices_and_boundaries = target_expr_and_indices
+ let initial_boundaries = &context.boundaries;
+ if initial_boundaries
.iter()
- .filter_map(|(expr, i)| {
- target_boundaries.iter().find_map(|bound| {
- expr.as_any()
- .downcast_ref::<Column>()
- .filter(|expr_column| bound.column.eq(*expr_column))
- .map(|_| (*i, bound.interval.clone()))
- })
- })
- .collect::<Vec<_>>();
-
- match graph
- .update_ranges(&mut target_indices_and_boundaries,
Interval::CERTAINLY_TRUE)?
+ .all(|bound| bound.interval.is_none())
{
- PropagationResult::Success => {
- shrink_boundaries(graph, target_boundaries,
target_expr_and_indices)
+ if initial_boundaries
+ .iter()
+ .any(|bound| bound.distinct_count != Precision::Exact(0))
+ {
+ return internal_err!(
+ "ExprBoundaries has a non-zero distinct count although it
represents an empty table"
+ );
+ }
+ if context.selectivity != Some(0.0) {
+ return internal_err!(
+ "AnalysisContext has a non-zero selectivity although it
represents an empty table"
+ );
}
- PropagationResult::Infeasible => {
- Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0))
+ Ok(context)
+ } else if initial_boundaries
+ .iter()
+ .any(|bound| bound.interval.is_none())
+ {
+ internal_err!(
+ "AnalysisContext is an inconsistent state. Some columns
represent empty table while others don't"
+ )
+ } else {
+ let mut target_boundaries = context.boundaries;
+ let mut graph = ExprIntervalGraph::try_new(Arc::clone(expr), schema)?;
+ let columns = collect_columns(expr)
+ .into_iter()
+ .map(|c| Arc::new(c) as _)
+ .collect::<Vec<_>>();
+
+ let mut target_indices_and_boundaries = vec![];
+ let target_expr_and_indices =
graph.gather_node_indices(columns.as_slice());
+
+ for (expr, index) in &target_expr_and_indices {
+ if let Some(column) = expr.as_any().downcast_ref::<Column>() {
+ if let Some(bound) =
+ target_boundaries.iter().find(|b| b.column == *column)
+ {
+ // Now, it's safe to unwrap
+ target_indices_and_boundaries
+ .push((*index,
bound.interval.as_ref().unwrap().clone()));
+ }
+ }
}
- PropagationResult::CannotPropagate => {
- Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0))
+
+ match graph
+ .update_ranges(&mut target_indices_and_boundaries,
Interval::CERTAINLY_TRUE)?
+ {
+ PropagationResult::Success => {
+ shrink_boundaries(graph, target_boundaries,
target_expr_and_indices)
+ }
+ PropagationResult::Infeasible => {
+ // If the propagation result is infeasible, set intervals to
None
+ target_boundaries
+ .iter_mut()
+ .for_each(|bound| bound.interval = None);
+
Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0))
+ }
+ PropagationResult::CannotPropagate => {
+
Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0))
+ }
}
}
}
@@ -215,12 +250,12 @@ fn shrink_boundaries(
.iter_mut()
.find(|bound| bound.column.eq(column))
{
- bound.interval = graph.get_interval(*i);
+ bound.interval = Some(graph.get_interval(*i));
};
}
});
- let selectivity = calculate_selectivity(&target_boundaries,
&initial_boundaries);
+ let selectivity = calculate_selectivity(&target_boundaries,
&initial_boundaries)?;
if !(0.0..=1.0).contains(&selectivity) {
return internal_err!("Selectivity is out of limit: {}", selectivity);
@@ -235,16 +270,31 @@ fn shrink_boundaries(
fn calculate_selectivity(
target_boundaries: &[ExprBoundaries],
initial_boundaries: &[ExprBoundaries],
-) -> f64 {
+) -> Result<f64> {
// Since the intervals are assumed uniform and the values
// are not correlated, we need to multiply the selectivities
// of multiple columns to get the overall selectivity.
- initial_boundaries
- .iter()
- .zip(target_boundaries.iter())
- .fold(1.0, |acc, (initial, target)| {
- acc * cardinality_ratio(&initial.interval, &target.interval)
- })
+ if target_boundaries.len() != initial_boundaries.len() {
+ return Err(internal_datafusion_err!(
+ "The number of columns in the initial and target boundaries should
be the same"
+ ));
+ }
+ let mut acc: f64 = 1.0;
+ for (initial, target) in initial_boundaries.iter().zip(target_boundaries) {
+ match (initial.interval.as_ref(), target.interval.as_ref()) {
+ (Some(initial), Some(target)) => {
+ acc *= cardinality_ratio(initial, target);
+ }
+ (None, Some(_)) => {
+ return internal_err!(
+ "Initial boundary cannot be None while having a Some() target
boundary"
+ );
+ }
+ _ => return Ok(0.0),
+ }
+ }
+
+ Ok(acc)
}
#[cfg(test)]
@@ -313,16 +363,6 @@ mod tests {
Some(16),
Some(19),
),
- // (a > 10 AND a < 20) AND (a > 20 AND a < 30)
- (
- col("a")
- .gt(lit(10))
- .and(col("a").lt(lit(20)))
- .and(col("a").gt(lit(20)))
- .and(col("a").lt(lit(30))),
- None,
- None,
- ),
];
for (expr, lower, upper) in test_cases {
let boundaries =
ExprBoundaries::try_new_unbounded(&schema).unwrap();
@@ -335,7 +375,9 @@ mod tests {
df_schema.as_ref(),
)
.unwrap();
- let actual = &analysis_result.boundaries[0].interval;
+ let Some(actual) = &analysis_result.boundaries[0].interval else {
+ panic!("The analysis result should contain non-empty intervals
for all columns");
+ };
let expected = Interval::make(lower, upper).unwrap();
assert_eq!(
&expected, actual,
@@ -344,6 +386,41 @@ mod tests {
}
}
+ #[test]
+ fn test_analyze_empty_set_boundary_exprs() {
+ let schema = Arc::new(Schema::new(vec![make_field("a",
DataType::Int32)]));
+
+ let test_cases: Vec<Expr> = vec![
+ // a > 10 AND a < 10
+ col("a").gt(lit(10)).and(col("a").lt(lit(10))),
+ // a > 5 AND (a < 20 OR a > 20)
+ // a > 10 AND a < 20
+ // (a > 10 AND a < 20) AND (a > 20 AND a < 30)
+ col("a")
+ .gt(lit(10))
+ .and(col("a").lt(lit(20)))
+ .and(col("a").gt(lit(20)))
+ .and(col("a").lt(lit(30))),
+ ];
+
+ for expr in test_cases {
+ let boundaries =
ExprBoundaries::try_new_unbounded(&schema).unwrap();
+ let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
+ let physical_expr =
+ create_physical_expr(&expr, &df_schema,
&ExecutionProps::new()).unwrap();
+ let analysis_result = analyze(
+ &physical_expr,
+ AnalysisContext::new(boundaries),
+ df_schema.as_ref(),
+ )
+ .unwrap();
+
+ for boundary in analysis_result.boundaries {
+ assert!(boundary.interval.is_none());
+ }
+ }
+ }
+
#[test]
fn test_analyze_invalid_boundary_exprs() {
let schema = Arc::new(Schema::new(vec![make_field("a",
DataType::Int32)]));
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index ae4a15ba52..ec860b3a9f 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -41,7 +41,7 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::stats::Precision;
use datafusion_common::{
- internal_err, plan_err, project_schema, DataFusionError, Result,
+ internal_err, plan_err, project_schema, DataFusionError, Result,
ScalarValue,
};
use datafusion_execution::TaskContext;
use datafusion_expr::Operator;
@@ -457,6 +457,15 @@ fn collect_new_statistics(
..
},
)| {
+ let Some(interval) = interval else {
+ // If the interval is `None`, we can say that there are no
rows:
+ return ColumnStatistics {
+ null_count: Precision::Exact(0),
+ max_value: Precision::Exact(ScalarValue::Null),
+ min_value: Precision::Exact(ScalarValue::Null),
+ distinct_count: Precision::Exact(0),
+ };
+ };
let (lower, upper) = interval.into_bounds();
let (min_value, max_value) = if lower.eq(&upper) {
(Precision::Exact(lower), Precision::Exact(upper))
@@ -1078,14 +1087,16 @@ mod tests {
statistics.column_statistics,
vec![
ColumnStatistics {
- min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
- max_value:
Precision::Inexact(ScalarValue::Int32(Some(100))),
- ..Default::default()
+ min_value: Precision::Exact(ScalarValue::Null),
+ max_value: Precision::Exact(ScalarValue::Null),
+ distinct_count: Precision::Exact(0),
+ null_count: Precision::Exact(0),
},
ColumnStatistics {
- min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
- max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
- ..Default::default()
+ min_value: Precision::Exact(ScalarValue::Null),
+ max_value: Precision::Exact(ScalarValue::Null),
+ distinct_count: Precision::Exact(0),
+ null_count: Precision::Exact(0),
},
]
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]