This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new dae99acc34 feat: selectivity metrics (for Explain Analyze) in Hash 
Join (#18488)
dae99acc34 is described below

commit dae99acc34c3cadfbe6f4a3238c7b3879d6c9f1a
Author: feniljain <[email protected]>
AuthorDate: Thu Nov 13 12:18:17 2025 +0530

    feat: selectivity metrics (for Explain Analyze) in Hash Join (#18488)
    
    ## Which issue does this PR close?
    
    - Closes #18409
    
    ## What changes are included in this PR?
    
    Added a distinct element calculator in core hash join loop. It also
    works on an assumption that indices will be returned in an increasing
    order, I couldn't see a place where this assumption is broken, but if
    that's not the case, please do help me out.
    
    Also, I am not 100% sure my implementation for `avg_fanout` is correct,
    so do let me know if that needs changes.
    
    ## Are these changes tested?
    
    No failures in `sqllogictests`/tests in `datafusion/core/tests/sql/`,
    should I add a test case for this?
---
 datafusion/core/tests/sql/explain_analyze.rs       | 21 ++++++++++
 .../physical-plan/src/joins/hash_join/stream.rs    | 47 +++++++++++++++++++++-
 datafusion/physical-plan/src/joins/utils.rs        | 18 ++++++++-
 3 files changed, 84 insertions(+), 2 deletions(-)

diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index 929de7a530..1a323d0749 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -1156,3 +1156,24 @@ async fn nested_loop_join_selectivity() {
         );
     }
 }
+
+#[tokio::test]
+async fn explain_analyze_hash_join() {
+    let sql = "EXPLAIN ANALYZE \
+            SELECT * \
+            FROM generate_series(10) as t1(a) \
+            JOIN generate_series(20) as t2(b) \
+            ON t1.a=t2.b";
+
+    for (level, needle, should_contain) in [
+        (ExplainAnalyzeLevel::Summary, "probe_hit_rate", true),
+        (ExplainAnalyzeLevel::Summary, "avg_fanout", true),
+    ] {
+        let plan = collect_plan(sql, level).await;
+        assert_eq!(
+            plan.contains(needle),
+            should_contain,
+            "plan for level {level:?} unexpected content: {plan}"
+        );
+    }
+}
diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs 
b/datafusion/physical-plan/src/joins/hash_join/stream.rs
index d3d47a555a..e955843abd 100644
--- a/datafusion/physical-plan/src/joins/hash_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs
@@ -42,7 +42,7 @@ use crate::{
     RecordBatchStream, SendableRecordBatchStream,
 };
 
-use arrow::array::{ArrayRef, UInt32Array, UInt64Array};
+use arrow::array::{Array, ArrayRef, UInt32Array, UInt64Array};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use datafusion_common::{
@@ -297,6 +297,35 @@ pub(super) fn lookup_join_hashmap(
     Ok((build_indices, probe_indices, next_offset))
 }
 
+/// Counts the number of distinct elements in the input array.
+///
+/// The input array must be sorted (e.g., `[0, 1, 1, 2, 2, ...]`) and contain 
no null values.
+#[inline]
+fn count_distinct_sorted_indices(indices: &UInt32Array) -> usize {
+    if indices.is_empty() {
+        return 0;
+    }
+
+    debug_assert!(indices.null_count() == 0);
+
+    let values_buf = indices.values();
+    let values = values_buf.as_ref();
+    let mut iter = values.iter();
+    let Some(&first) = iter.next() else {
+        return 0;
+    };
+
+    let mut count = 1usize;
+    let mut last = first;
+    for &value in iter {
+        if value != last {
+            last = value;
+            count += 1;
+        }
+    }
+    count
+}
+
 impl HashJoinStream {
     #[allow(clippy::too_many_arguments)]
     pub(super) fn new(
@@ -480,6 +509,10 @@ impl HashJoinStream {
         let state = self.state.try_as_process_probe_batch_mut()?;
         let build_side = self.build_side.try_as_ready_mut()?;
 
+        self.join_metrics
+            .probe_hit_rate
+            .add_total(state.batch.num_rows());
+
         let timer = self.join_metrics.join_time.timer();
 
         // if the left side is empty, we can skip the (potentially expensive) 
join operation
@@ -509,6 +542,18 @@ impl HashJoinStream {
             state.offset,
         )?;
 
+        let distinct_right_indices_count = 
count_distinct_sorted_indices(&right_indices);
+
+        self.join_metrics
+            .probe_hit_rate
+            .add_part(distinct_right_indices_count);
+
+        self.join_metrics.avg_fanout.add_part(left_indices.len());
+
+        self.join_metrics
+            .avg_fanout
+            .add_total(distinct_right_indices_count);
+
         // apply join filter if exists
         let (left_indices, right_indices) = if let Some(filter) = &self.filter 
{
             apply_join_filter_to_indices(
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index 2cd24d284b..f837423d2b 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -27,7 +27,9 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use crate::joins::SharedBitmapBuilder;
-use crate::metrics::{self, BaselineMetrics, ExecutionPlanMetricsSet, 
MetricBuilder};
+use crate::metrics::{
+    self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricType,
+};
 use crate::projection::{ProjectionExec, ProjectionExpr};
 use crate::{
     ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, 
Statistics,
@@ -1328,6 +1330,10 @@ pub(crate) struct BuildProbeJoinMetrics {
     pub(crate) input_batches: metrics::Count,
     /// Number of rows consumed by probe-side this operator
     pub(crate) input_rows: metrics::Count,
+    /// Fraction of probe rows that found more than one match
+    pub(crate) probe_hit_rate: metrics::RatioMetrics,
+    /// Average number of build matches per matched probe row
+    pub(crate) avg_fanout: metrics::RatioMetrics,
 }
 
 // This Drop implementation updates the elapsed compute part of the metrics.
@@ -1371,6 +1377,14 @@ impl BuildProbeJoinMetrics {
 
         let input_rows = MetricBuilder::new(metrics).counter("input_rows", 
partition);
 
+        let probe_hit_rate = MetricBuilder::new(metrics)
+            .with_type(MetricType::SUMMARY)
+            .ratio_metrics("probe_hit_rate", partition);
+
+        let avg_fanout = MetricBuilder::new(metrics)
+            .with_type(MetricType::SUMMARY)
+            .ratio_metrics("avg_fanout", partition);
+
         Self {
             build_time,
             build_input_batches,
@@ -1380,6 +1394,8 @@ impl BuildProbeJoinMetrics {
             input_batches,
             input_rows,
             baseline,
+            probe_hit_rate,
+            avg_fanout,
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to