This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new dae99acc34 feat: selectivity metrics (for Explain Analyze) in Hash
Join (#18488)
dae99acc34 is described below
commit dae99acc34c3cadfbe6f4a3238c7b3879d6c9f1a
Author: feniljain <[email protected]>
AuthorDate: Thu Nov 13 12:18:17 2025 +0530
feat: selectivity metrics (for Explain Analyze) in Hash Join (#18488)
## Which issue does this PR close?
- Closes #18409
## What changes are included in this PR?
Added a distinct element calculator in core hash join loop. It also
works on an assumption that indices will be returned in an increasing
order, I couldn't see a place where this assumption is broken, but if
that's not the case, please do help me out.
Also, I am not 100% sure my implementation for `avg_fanout` is correct,
so do let me know if that needs changes.
## Are these changes tested?
No failures in `sqllogictests`/tests in `datafusion/core/tests/sql/`,
should I add a test case for this?
---
datafusion/core/tests/sql/explain_analyze.rs | 21 ++++++++++
.../physical-plan/src/joins/hash_join/stream.rs | 47 +++++++++++++++++++++-
datafusion/physical-plan/src/joins/utils.rs | 18 ++++++++-
3 files changed, 84 insertions(+), 2 deletions(-)
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index 929de7a530..1a323d0749 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -1156,3 +1156,24 @@ async fn nested_loop_join_selectivity() {
);
}
}
+
+#[tokio::test]
+async fn explain_analyze_hash_join() {
+ let sql = "EXPLAIN ANALYZE \
+ SELECT * \
+ FROM generate_series(10) as t1(a) \
+ JOIN generate_series(20) as t2(b) \
+ ON t1.a=t2.b";
+
+ for (level, needle, should_contain) in [
+ (ExplainAnalyzeLevel::Summary, "probe_hit_rate", true),
+ (ExplainAnalyzeLevel::Summary, "avg_fanout", true),
+ ] {
+ let plan = collect_plan(sql, level).await;
+ assert_eq!(
+ plan.contains(needle),
+ should_contain,
+ "plan for level {level:?} unexpected content: {plan}"
+ );
+ }
+}
diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs
b/datafusion/physical-plan/src/joins/hash_join/stream.rs
index d3d47a555a..e955843abd 100644
--- a/datafusion/physical-plan/src/joins/hash_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs
@@ -42,7 +42,7 @@ use crate::{
RecordBatchStream, SendableRecordBatchStream,
};
-use arrow::array::{ArrayRef, UInt32Array, UInt64Array};
+use arrow::array::{Array, ArrayRef, UInt32Array, UInt64Array};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::{
@@ -297,6 +297,35 @@ pub(super) fn lookup_join_hashmap(
Ok((build_indices, probe_indices, next_offset))
}
+/// Counts the number of distinct elements in the input array.
+///
+/// The input array must be sorted (e.g., `[0, 1, 1, 2, 2, ...]`) and contain
no null values.
+#[inline]
+fn count_distinct_sorted_indices(indices: &UInt32Array) -> usize {
+ if indices.is_empty() {
+ return 0;
+ }
+
+ debug_assert!(indices.null_count() == 0);
+
+ let values_buf = indices.values();
+ let values = values_buf.as_ref();
+ let mut iter = values.iter();
+ let Some(&first) = iter.next() else {
+ return 0;
+ };
+
+ let mut count = 1usize;
+ let mut last = first;
+ for &value in iter {
+ if value != last {
+ last = value;
+ count += 1;
+ }
+ }
+ count
+}
+
impl HashJoinStream {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
@@ -480,6 +509,10 @@ impl HashJoinStream {
let state = self.state.try_as_process_probe_batch_mut()?;
let build_side = self.build_side.try_as_ready_mut()?;
+ self.join_metrics
+ .probe_hit_rate
+ .add_total(state.batch.num_rows());
+
let timer = self.join_metrics.join_time.timer();
// if the left side is empty, we can skip the (potentially expensive)
join operation
@@ -509,6 +542,18 @@ impl HashJoinStream {
state.offset,
)?;
+ let distinct_right_indices_count =
count_distinct_sorted_indices(&right_indices);
+
+ self.join_metrics
+ .probe_hit_rate
+ .add_part(distinct_right_indices_count);
+
+ self.join_metrics.avg_fanout.add_part(left_indices.len());
+
+ self.join_metrics
+ .avg_fanout
+ .add_total(distinct_right_indices_count);
+
// apply join filter if exists
let (left_indices, right_indices) = if let Some(filter) = &self.filter
{
apply_join_filter_to_indices(
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index 2cd24d284b..f837423d2b 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -27,7 +27,9 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use crate::joins::SharedBitmapBuilder;
-use crate::metrics::{self, BaselineMetrics, ExecutionPlanMetricsSet,
MetricBuilder};
+use crate::metrics::{
+ self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricType,
+};
use crate::projection::{ProjectionExec, ProjectionExpr};
use crate::{
ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning,
Statistics,
@@ -1328,6 +1330,10 @@ pub(crate) struct BuildProbeJoinMetrics {
pub(crate) input_batches: metrics::Count,
/// Number of rows consumed by probe-side this operator
pub(crate) input_rows: metrics::Count,
+ /// Fraction of probe rows that found more than one match
+ pub(crate) probe_hit_rate: metrics::RatioMetrics,
+ /// Average number of build matches per matched probe row
+ pub(crate) avg_fanout: metrics::RatioMetrics,
}
// This Drop implementation updates the elapsed compute part of the metrics.
@@ -1371,6 +1377,14 @@ impl BuildProbeJoinMetrics {
let input_rows = MetricBuilder::new(metrics).counter("input_rows",
partition);
+ let probe_hit_rate = MetricBuilder::new(metrics)
+ .with_type(MetricType::SUMMARY)
+ .ratio_metrics("probe_hit_rate", partition);
+
+ let avg_fanout = MetricBuilder::new(metrics)
+ .with_type(MetricType::SUMMARY)
+ .ratio_metrics("avg_fanout", partition);
+
Self {
build_time,
build_input_batches,
@@ -1380,6 +1394,8 @@ impl BuildProbeJoinMetrics {
input_batches,
input_rows,
baseline,
+ probe_hit_rate,
+ avg_fanout,
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]