isidentical commented on code in PR #3868: URL: https://github.com/apache/arrow-datafusion/pull/3868#discussion_r1000039870
########## datafusion/physical-expr/src/expressions/binary.rs: ########## @@ -640,6 +640,155 @@ impl PhysicalExpr for BinaryExpr { self.evaluate_with_resolved_args(left, &left_data_type, right, &right_data_type) .map(|a| ColumnarValue::Array(a)) } + + fn expr_stats(&self) -> Arc<dyn PhysicalExprStats> { + Arc::new(BinaryExprStats { + op: self.op, + left: Arc::clone(self.left()), + right: Arc::clone(self.right()), + }) + } +} + +struct BinaryExprStats { + op: Operator, + left: Arc<dyn PhysicalExpr>, + right: Arc<dyn PhysicalExpr>, +} + +impl PhysicalExprStats for BinaryExprStats { + fn boundaries(&self, columns: &[ColumnStatistics]) -> Option<ExprBoundaries> { + match &self.op { + Operator::Eq + | Operator::Gt + | Operator::Lt + | Operator::LtEq + | Operator::GtEq => { + let l_bounds = self.left.expr_stats().boundaries(columns)?; + let r_bounds = self.right.expr_stats().boundaries(columns)?; + match (l_bounds.reduce(), r_bounds.reduce()) { + (_, Some(r)) => compare_left_boundaries(&self.op, &l_bounds, r), + (Some(scalar_value), _) => { + compare_left_boundaries(&self.op.swap()?, &r_bounds, scalar_value) + } + _ => None, + } + } + _ => None, + } + } +} + +// Compute the general selectivity of a comparison predicate (>, >=, <, <=) between +// two expressions (one of which must have a single value). Returns new statistics +// for the variadic expression. +// +// The variadic boundaries represent the lhs side, and the scalar value represents +// the rhs side. +fn compare_left_boundaries( + op: &Operator, + variadic_bounds: &ExprBoundaries, + scalar_value: ScalarValue, +) -> Option<ExprBoundaries> { + let variadic_min = variadic_bounds.min_value.clone(); + let variadic_max = variadic_bounds.max_value.clone(); + + // Faulty statistics, give up now (because the code below assumes this is + // not the case for min/max). + if variadic_min > variadic_max { + return None; + } + + // Direct selectivity is applicable when we can determine that this comparison will + // always be true or false (e.g. `x > 10` where the `x`'s min value is 11 or `a < 5` + // where the `a`'s max value is 4) (with the assuption that min/max are correct). + let (always_selects, never_selects) = match op { + Operator::Lt => (scalar_value > variadic_max, scalar_value <= variadic_min), + Operator::LtEq => (scalar_value >= variadic_max, scalar_value < variadic_min), + Operator::Gt => (scalar_value < variadic_min, scalar_value >= variadic_max), + Operator::GtEq => (scalar_value <= variadic_min, scalar_value > variadic_max), + Operator::Eq => ( + // Since min/max can be artificial (e.g. the min or max value of a column + // might be just a guess), we can't assume variadic_min == literal_value + // would always select. + false, + scalar_value < variadic_min || scalar_value > variadic_max, + ), + _ => unreachable!(), + }; + + // Both can not be true at the same time. + assert!(!(always_selects && never_selects)); + + let selectivity = match (always_selects, never_selects) { + (true, _) => Some(1.0), + (_, true) => Some(0.0), + (false, false) => { + // If there is a partial overlap, then we can estimate the selectivity + // by computing the ratio of the existing overlap to the total range. Since we + // currently don't have access to a value distribution histogram, the part below + // assumes a uniform distribution by default. + + // Our [min, max] is inclusive, so we need to add 1 to the difference. + let total_range = variadic_max.distance(&variadic_min)? + 1; + let overlap_between_boundaries = match op { + Operator::Lt => scalar_value.distance(&variadic_min)?, + Operator::Gt => variadic_max.distance(&scalar_value)?, + Operator::LtEq => scalar_value.distance(&variadic_min)? + 1, + Operator::GtEq => variadic_max.distance(&scalar_value)? + 1, + Operator::Eq => 1, + _ => unreachable!(), + }; + + Some(overlap_between_boundaries as f64 / total_range as f64) + } + }?; + + // The selectivity can't be be greater than 1.0. + assert!(selectivity <= 1.0); + let distinct_count = variadic_bounds + .distinct_count + .map(|distinct_count| (distinct_count as f64 * selectivity).round() as usize); + + // Now, we know what is the upper/lower bound is for this column after the + // predicate is applied. + let (new_min, new_max) = match op { + // TODO: for lt/gt, we technically should shrink the possibility space + // by one since a < 5 means that 5 is not a possible value for `a`. However, + // it is currently tricky to do so (e.g. for floats, we can get away with 4.999 + // so we need a smarter logic to find out what is the closest value that is + // different from the scalar_value). + Operator::Lt | Operator::LtEq => { + // We only want to update the upper bound when we know it will help us (e.g. + // it is actually smaller than what we have right now) and it is a valid + // value (e.g. [0, 100] < -100 would update the boundaries to [0, -100] if + // there weren't the selectivity check). + if scalar_value < variadic_max && selectivity > 0.0 { + (variadic_min, scalar_value) + } else { + (variadic_min, variadic_max) + } + } + Operator::Gt | Operator::GtEq => { + // Same as above, but this time we want to limit the lower bound. + if scalar_value > variadic_min && selectivity > 0.0 { + (scalar_value, variadic_max) + } else { + (variadic_min, variadic_max) + } + } + // For equality, we don't have the range problem so even if the selectivity + // is 0.0, we can still update the boundaries. + Operator::Eq => (scalar_value.clone(), scalar_value), + _ => unreachable!(), + }; + + Some(ExprBoundaries { + min_value: new_min, Review Comment: Part of the discussion below and #3898. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org