isidentical commented on code in PR #3868:
URL: https://github.com/apache/arrow-datafusion/pull/3868#discussion_r1000039870


##########
datafusion/physical-expr/src/expressions/binary.rs:
##########
@@ -640,6 +640,155 @@ impl PhysicalExpr for BinaryExpr {
         self.evaluate_with_resolved_args(left, &left_data_type, right, 
&right_data_type)
             .map(|a| ColumnarValue::Array(a))
     }
+
+    fn expr_stats(&self) -> Arc<dyn PhysicalExprStats> {
+        Arc::new(BinaryExprStats {
+            op: self.op,
+            left: Arc::clone(self.left()),
+            right: Arc::clone(self.right()),
+        })
+    }
+}
+
+struct BinaryExprStats {
+    op: Operator,
+    left: Arc<dyn PhysicalExpr>,
+    right: Arc<dyn PhysicalExpr>,
+}
+
+impl PhysicalExprStats for BinaryExprStats {
+    fn boundaries(&self, columns: &[ColumnStatistics]) -> 
Option<ExprBoundaries> {
+        match &self.op {
+            Operator::Eq
+            | Operator::Gt
+            | Operator::Lt
+            | Operator::LtEq
+            | Operator::GtEq => {
+                let l_bounds = self.left.expr_stats().boundaries(columns)?;
+                let r_bounds = self.right.expr_stats().boundaries(columns)?;
+                match (l_bounds.reduce(), r_bounds.reduce()) {
+                    (_, Some(r)) => compare_left_boundaries(&self.op, 
&l_bounds, r),
+                    (Some(scalar_value), _) => {
+                        compare_left_boundaries(&self.op.swap()?, &r_bounds, 
scalar_value)
+                    }
+                    _ => None,
+                }
+            }
+            _ => None,
+        }
+    }
+}
+
+// Compute the general selectivity of a comparison predicate (>, >=, <, <=) 
between
+// two expressions (one of which must have a single value). Returns new 
statistics
+// for the variadic expression.
+//
+// The variadic boundaries represent the lhs side, and the scalar value 
represents
+// the rhs side.
+fn compare_left_boundaries(
+    op: &Operator,
+    variadic_bounds: &ExprBoundaries,
+    scalar_value: ScalarValue,
+) -> Option<ExprBoundaries> {
+    let variadic_min = variadic_bounds.min_value.clone();
+    let variadic_max = variadic_bounds.max_value.clone();
+
+    // Faulty statistics, give up now (because the code below assumes this is
+    // not the case for min/max).
+    if variadic_min > variadic_max {
+        return None;
+    }
+
+    // Direct selectivity is applicable when we can determine that this 
comparison will
+    // always be true or false (e.g. `x > 10` where the `x`'s min value is 11 
or `a < 5`
+    // where the `a`'s max value is 4) (with the assuption that min/max are 
correct).
+    let (always_selects, never_selects) = match op {
+        Operator::Lt => (scalar_value > variadic_max, scalar_value <= 
variadic_min),
+        Operator::LtEq => (scalar_value >= variadic_max, scalar_value < 
variadic_min),
+        Operator::Gt => (scalar_value < variadic_min, scalar_value >= 
variadic_max),
+        Operator::GtEq => (scalar_value <= variadic_min, scalar_value > 
variadic_max),
+        Operator::Eq => (
+            // Since min/max can be artificial (e.g. the min or max value of a 
column
+            // might be just a guess), we can't assume variadic_min == 
literal_value
+            // would always select.
+            false,
+            scalar_value < variadic_min || scalar_value > variadic_max,
+        ),
+        _ => unreachable!(),
+    };
+
+    // Both can not be true at the same time.
+    assert!(!(always_selects && never_selects));
+
+    let selectivity = match (always_selects, never_selects) {
+        (true, _) => Some(1.0),
+        (_, true) => Some(0.0),
+        (false, false) => {
+            // If there is a partial overlap, then we can estimate the 
selectivity
+            // by computing the ratio of the existing overlap to the total 
range. Since we
+            // currently don't have access to a value distribution histogram, 
the part below
+            // assumes a uniform distribution by default.
+
+            // Our [min, max] is inclusive, so we need to add 1 to the 
difference.
+            let total_range = variadic_max.distance(&variadic_min)? + 1;
+            let overlap_between_boundaries = match op {
+                Operator::Lt => scalar_value.distance(&variadic_min)?,
+                Operator::Gt => variadic_max.distance(&scalar_value)?,
+                Operator::LtEq => scalar_value.distance(&variadic_min)? + 1,
+                Operator::GtEq => variadic_max.distance(&scalar_value)? + 1,
+                Operator::Eq => 1,
+                _ => unreachable!(),
+            };
+
+            Some(overlap_between_boundaries as f64 / total_range as f64)
+        }
+    }?;
+
+    // The selectivity can't be be greater than 1.0.
+    assert!(selectivity <= 1.0);
+    let distinct_count = variadic_bounds
+        .distinct_count
+        .map(|distinct_count| (distinct_count as f64 * selectivity).round() as 
usize);
+
+    // Now, we know what is the upper/lower bound is for this column after the
+    // predicate is applied.
+    let (new_min, new_max) = match op {
+        // TODO: for lt/gt, we technically should shrink the possibility space
+        // by one since a < 5 means that 5 is not a possible value for `a`. 
However,
+        // it is currently tricky to do so (e.g. for floats, we can get away 
with 4.999
+        // so we need a smarter logic to find out what is the closest value 
that is
+        // different from the scalar_value).
+        Operator::Lt | Operator::LtEq => {
+            // We only want to update the upper bound when we know it will 
help us (e.g.
+            // it is actually smaller than what we have right now) and it is a 
valid
+            // value (e.g. [0, 100] < -100 would update the boundaries to [0, 
-100] if
+            // there weren't the selectivity check).
+            if scalar_value < variadic_max && selectivity > 0.0 {
+                (variadic_min, scalar_value)
+            } else {
+                (variadic_min, variadic_max)
+            }
+        }
+        Operator::Gt | Operator::GtEq => {
+            // Same as above, but this time we want to limit the lower bound.
+            if scalar_value > variadic_min && selectivity > 0.0 {
+                (scalar_value, variadic_max)
+            } else {
+                (variadic_min, variadic_max)
+            }
+        }
+        // For equality, we don't have the range problem so even if the 
selectivity
+        // is 0.0, we can still update the boundaries.
+        Operator::Eq => (scalar_value.clone(), scalar_value),
+        _ => unreachable!(),
+    };
+
+    Some(ExprBoundaries {
+        min_value: new_min,

Review Comment:
   Part of the discussion below and #3898.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to