LiaCastaneda commented on code in PR #17444:
URL: https://github.com/apache/datafusion/pull/17444#discussion_r2337060417
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1188,29 +1189,123 @@ impl ExecutionPlan for HashJoinExec {
}
}
-/// Compute min/max bounds for each column in the given arrays
-fn compute_bounds(arrays: &[ArrayRef]) -> Result<Vec<ColumnBounds>> {
- arrays
- .iter()
- .map(|array| {
- if array.is_empty() {
- // Return NULL values for empty arrays
- return Ok(ColumnBounds::new(
- ScalarValue::try_from(array.data_type())?,
- ScalarValue::try_from(array.data_type())?,
- ));
+/// Accumulator for collecting min/max bounds from build-side data during hash
join.
+///
+/// This struct encapsulates the logic for progressively computing column
bounds
+/// (minimum and maximum values) for a specific join key expression as batches
+/// are processed during the build phase of a hash join.
+///
+/// The bounds are used for dynamic filter pushdown optimization, where filters
+/// based on the actual data ranges can be pushed down to the probe side to
+/// eliminate unnecessary data early.
+struct CollectLeftAccumulator {
+ /// The physical expression to evaluate for each batch
+ expr: Arc<dyn PhysicalExpr>,
+ /// Accumulator for tracking the minimum value across all batches
+ min: MinAccumulator,
+ /// Accumulator for tracking the maximum value across all batches
+ max: MaxAccumulator,
+}
+
+impl CollectLeftAccumulator {
+ /// Creates a new accumulator for tracking bounds of a join key expression.
+ ///
+ /// # Arguments
+ /// * `expr` - The physical expression to track bounds for
+ /// * `schema` - The schema of the input data
+ ///
+ /// # Returns
+ /// A new `CollectLeftAccumulator` instance configured for the
expression's data type
+ fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &SchemaRef) ->
Result<Self> {
+ /// Recursively unwraps dictionary types to get the underlying value
type.
+ fn dictionary_value_type(data_type: &DataType) -> DataType {
+ match data_type {
+ DataType::Dictionary(_, value_type) => {
+ dictionary_value_type(value_type.as_ref())
+ }
+ _ => data_type.clone(),
}
+ }
+
+ let data_type = expr
+ .data_type(schema)
+ // Min/Max can operate on dictionary data but expect to be
initialized with the underlying value type
+ .map(|dt| dictionary_value_type(&dt))?;
+ Ok(Self {
+ expr,
+ min: MinAccumulator::try_new(&data_type)?,
+ max: MaxAccumulator::try_new(&data_type)?,
+ })
+ }
- // Use Arrow kernels for efficient min/max computation
- let min_val = min_batch(array)?;
- let max_val = max_batch(array)?;
Review Comment:
I think the accumulator `update_batch` also uses `min_batch` which uses
`min_max_batch_generic` which was the expensive function, but I will try to
bring this and see if it solves
https://github.com/apache/datafusion/issues/17486
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]