This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d9b080882 [Minor] propagate distinct_count as inexact through unions 
(#20846)
8d9b080882 is described below

commit 8d9b080882179b618a2057e042fc32865f6484b4
Author: Burak Şen <[email protected]>
AuthorDate: Thu Mar 12 07:32:00 2026 +0300

    [Minor] propagate distinct_count as inexact through unions (#20846)
    
    ## Which issue does this PR close?
    Does not close but part of #20766
    
    ## Rationale for this change
    As @jonathanc-n describes here is the Trino's formula about inexact
    formula:
    ```
    // for unioning A + B
    
    // calculate A overlap with B using min/max statistics
    overlap_a = percent of overlap that A has with B
    overlap_b = percent of overlap that B has with A
    
    new_distinct_count = max(overlap_a * NDV_a, overlap_b * NDV_b) // find 
interesect
                                         + (1 - overlap_a) * NDV_a  // overlap 
for just a
                                         + (1 - overlap_b) * NDV_b // overlap 
for just b
    ```
    
    ## What changes are included in this PR?
    Instead of absent set `distinct_count` with inexact precision depending
    on overlaps and distinct counts
    
    ## Are these changes tested?
    I've added unit tests
    
    ## Are there any user-facing changes?
    No
---
 datafusion/physical-plan/src/union.rs | 313 +++++++++++++++++++++++++++++++++-
 1 file changed, 311 insertions(+), 2 deletions(-)

diff --git a/datafusion/physical-plan/src/union.rs 
b/datafusion/physical-plan/src/union.rs
index 384a715820..dafcd6ee40 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -854,7 +854,7 @@ fn col_stats_union(
     mut left: ColumnStatistics,
     right: &ColumnStatistics,
 ) -> ColumnStatistics {
-    left.distinct_count = Precision::Absent;
+    left.distinct_count = union_distinct_count(&left, right);
     left.min_value = left.min_value.min(&right.min_value);
     left.max_value = left.max_value.max(&right.max_value);
     left.sum_value = left.sum_value.add(&right.sum_value);
@@ -863,6 +863,123 @@ fn col_stats_union(
     left
 }
 
+fn union_distinct_count(
+    left: &ColumnStatistics,
+    right: &ColumnStatistics,
+) -> Precision<usize> {
+    let (ndv_left, ndv_right) = match (
+        left.distinct_count.get_value(),
+        right.distinct_count.get_value(),
+    ) {
+        (Some(&l), Some(&r)) => (l, r),
+        _ => return Precision::Absent,
+    };
+
+    // Even with exact inputs, the union NDV depends on how
+    // many distinct values are shared between the left and right.
+    // We can only estimate this via range overlap. Thus both paths
+    // below return `Inexact`.
+    if let Some(ndv) = estimate_ndv_with_overlap(left, right, ndv_left, 
ndv_right) {
+        return Precision::Inexact(ndv);
+    }
+
+    Precision::Inexact(ndv_left + ndv_right)
+}
+
+/// Estimates the distinct count for a union using range overlap,
+/// following the approach used by Trino:
+///
+/// Assumes values are distributed uniformly within each input's
+/// `[min, max]` range (the standard assumption when only summary
+/// statistics are available, classic for scalar-based statistics
+/// propagation). Under uniformity the fraction of an input's
+/// distinct values that land in a sub-range equals the fraction of
+/// the range that sub-range covers.
+///
+/// The combined value space is split into three disjoint regions:
+///
+/// ```text
+///   |-- only A --|-- overlap --|-- only B --|
+/// ```
+///
+/// * **Only in A/B** – values outside the other input's range
+///   contribute `(1 − overlap_a) · NDV_a` and `(1 − overlap_b) · NDV_b`.
+/// * **Overlap** – both inputs may produce values here. We take
+///   `max(overlap_a · NDV_a, overlap_b · NDV_b)` rather than the
+///   sum because values in the same sub-range are likely shared
+///   (the smaller set is assumed to be a subset of the larger).
+///   This is conservative: it avoids inflating the NDV estimate,
+///   which is safer for downstream join-order decisions.
+///
+/// The formula ranges between `[max(NDV_a, NDV_b), NDV_a + NDV_b]`,
+/// from full overlap to no overlap. Boundary cases confirm this:
+/// disjoint ranges → `NDV_a + NDV_b`, identical ranges →
+/// `max(NDV_a, NDV_b)`.
+///
+/// ```text
+/// NDV = max(overlap_a * NDV_a, overlap_b * NDV_b)   [intersection]
+///     + (1 - overlap_a) * NDV_a                      [only in A]
+///     + (1 - overlap_b) * NDV_b                      [only in B]
+/// ```
+fn estimate_ndv_with_overlap(
+    left: &ColumnStatistics,
+    right: &ColumnStatistics,
+    ndv_left: usize,
+    ndv_right: usize,
+) -> Option<usize> {
+    let min_left = left.min_value.get_value()?;
+    let max_left = left.max_value.get_value()?;
+    let min_right = right.min_value.get_value()?;
+    let max_right = right.max_value.get_value()?;
+
+    let range_left = max_left.distance(min_left)?;
+    let range_right = max_right.distance(min_right)?;
+
+    // Constant columns (range == 0) can't use the proportional overlap
+    // formula below, so check interval overlap directly instead.
+    if range_left == 0 || range_right == 0 {
+        let overlaps = min_left <= max_right && min_right <= max_left;
+        return Some(if overlaps {
+            usize::max(ndv_left, ndv_right)
+        } else {
+            ndv_left + ndv_right
+        });
+    }
+
+    let overlap_min = if min_left >= min_right {
+        min_left
+    } else {
+        min_right
+    };
+    let overlap_max = if max_left <= max_right {
+        max_left
+    } else {
+        max_right
+    };
+
+    // Short-circuit: when there's no overlap the formula naturally
+    // degrades to ndv_left + ndv_right (overlap_range = 0 gives
+    // overlap_left = overlap_right = 0), but returning early avoids
+    // the floating-point math and a fallible `distance()` call.
+    if overlap_min > overlap_max {
+        return Some(ndv_left + ndv_right);
+    }
+
+    let overlap_range = overlap_max.distance(overlap_min)? as f64;
+
+    let overlap_left = overlap_range / range_left as f64;
+    let overlap_right = overlap_range / range_right as f64;
+
+    let intersection = f64::max(
+        overlap_left * ndv_left as f64,
+        overlap_right * ndv_right as f64,
+    );
+    let only_left = (1.0 - overlap_left) * ndv_left as f64;
+    let only_right = (1.0 - overlap_right) * ndv_right as f64;
+
+    Some((intersection + only_left + only_right).round() as usize)
+}
+
 fn stats_union(mut left: Statistics, right: Statistics) -> Statistics {
     let Statistics {
         num_rows: right_num_rows,
@@ -890,6 +1007,7 @@ mod tests {
     use arrow::compute::SortOptions;
     use arrow::datatypes::DataType;
     use datafusion_common::ScalarValue;
+    use datafusion_common::stats::Precision;
     use datafusion_physical_expr::equivalence::convert_to_orderings;
     use datafusion_physical_expr::expressions::col;
 
@@ -1014,7 +1132,7 @@ mod tests {
             total_byte_size: Precision::Exact(52),
             column_statistics: vec![
                 ColumnStatistics {
-                    distinct_count: Precision::Absent,
+                    distinct_count: Precision::Inexact(6),
                     max_value: Precision::Exact(ScalarValue::Int64(Some(34))),
                     min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
                     sum_value: Precision::Exact(ScalarValue::Int64(Some(84))),
@@ -1043,6 +1161,197 @@ mod tests {
         assert_eq!(result, expected);
     }
 
+    #[test]
+    fn test_union_distinct_count() {
+        // (left_ndv, left_min, left_max, right_ndv, right_min, right_max, 
expected)
+        type NdvTestCase = (
+            Precision<usize>,
+            Option<i64>,
+            Option<i64>,
+            Precision<usize>,
+            Option<i64>,
+            Option<i64>,
+            Precision<usize>,
+        );
+        let cases: Vec<NdvTestCase> = vec![
+            // disjoint ranges: NDV = 5 + 3
+            (
+                Precision::Exact(5),
+                Some(0),
+                Some(10),
+                Precision::Exact(3),
+                Some(20),
+                Some(30),
+                Precision::Inexact(8),
+            ),
+            // identical ranges: intersection = max(10, 8) = 10
+            (
+                Precision::Exact(10),
+                Some(0),
+                Some(100),
+                Precision::Exact(8),
+                Some(0),
+                Some(100),
+                Precision::Inexact(10),
+            ),
+            // partial overlap: 50 + 50 + 25 = 125
+            (
+                Precision::Exact(100),
+                Some(0),
+                Some(100),
+                Precision::Exact(50),
+                Some(50),
+                Some(150),
+                Precision::Inexact(125),
+            ),
+            // right contained in left: 50 + 50 + 0 = 100
+            (
+                Precision::Exact(100),
+                Some(0),
+                Some(100),
+                Precision::Exact(50),
+                Some(25),
+                Some(75),
+                Precision::Inexact(100),
+            ),
+            // both constant, same value
+            (
+                Precision::Exact(1),
+                Some(5),
+                Some(5),
+                Precision::Exact(1),
+                Some(5),
+                Some(5),
+                Precision::Inexact(1),
+            ),
+            // both constant, different values
+            (
+                Precision::Exact(1),
+                Some(5),
+                Some(5),
+                Precision::Exact(1),
+                Some(10),
+                Some(10),
+                Precision::Inexact(2),
+            ),
+            // left constant within right range
+            (
+                Precision::Exact(1),
+                Some(5),
+                Some(5),
+                Precision::Exact(10),
+                Some(0),
+                Some(10),
+                Precision::Inexact(10),
+            ),
+            // left constant outside right range
+            (
+                Precision::Exact(1),
+                Some(20),
+                Some(20),
+                Precision::Exact(10),
+                Some(0),
+                Some(10),
+                Precision::Inexact(11),
+            ),
+            // right constant within left range
+            (
+                Precision::Exact(10),
+                Some(0),
+                Some(10),
+                Precision::Exact(1),
+                Some(5),
+                Some(5),
+                Precision::Inexact(10),
+            ),
+            // right constant outside left range
+            (
+                Precision::Exact(10),
+                Some(0),
+                Some(10),
+                Precision::Exact(1),
+                Some(20),
+                Some(20),
+                Precision::Inexact(11),
+            ),
+            // missing min/max falls back to sum (exact + exact)
+            (
+                Precision::Exact(10),
+                None,
+                None,
+                Precision::Exact(5),
+                None,
+                None,
+                Precision::Inexact(15),
+            ),
+            // missing min/max falls back to sum (exact + inexact)
+            (
+                Precision::Exact(10),
+                None,
+                None,
+                Precision::Inexact(5),
+                None,
+                None,
+                Precision::Inexact(15),
+            ),
+            // missing min/max falls back to sum (inexact + inexact)
+            (
+                Precision::Inexact(7),
+                None,
+                None,
+                Precision::Inexact(3),
+                None,
+                None,
+                Precision::Inexact(10),
+            ),
+            // one side absent
+            (
+                Precision::Exact(10),
+                None,
+                None,
+                Precision::Absent,
+                None,
+                None,
+                Precision::Absent,
+            ),
+            // one side absent (inexact + absent)
+            (
+                Precision::Inexact(4),
+                None,
+                None,
+                Precision::Absent,
+                None,
+                None,
+                Precision::Absent,
+            ),
+        ];
+
+        for (
+            i,
+            (left_ndv, left_min, left_max, right_ndv, right_min, right_max, 
expected),
+        ) in cases.into_iter().enumerate()
+        {
+            let to_sv = |v| Precision::Exact(ScalarValue::Int64(Some(v)));
+            let left = ColumnStatistics {
+                distinct_count: left_ndv,
+                min_value: left_min.map(to_sv).unwrap_or(Precision::Absent),
+                max_value: left_max.map(to_sv).unwrap_or(Precision::Absent),
+                ..Default::default()
+            };
+            let right = ColumnStatistics {
+                distinct_count: right_ndv,
+                min_value: right_min.map(to_sv).unwrap_or(Precision::Absent),
+                max_value: right_max.map(to_sv).unwrap_or(Precision::Absent),
+                ..Default::default()
+            };
+            assert_eq!(
+                union_distinct_count(&left, &right),
+                expected,
+                "case {i} failed"
+            );
+        }
+    }
+
     #[tokio::test]
     async fn test_union_equivalence_properties() -> Result<()> {
         let schema = create_test_schema()?;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to