LiaCastaneda commented on code in PR #17444:
URL: https://github.com/apache/datafusion/pull/17444#discussion_r2337060417


##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1188,29 +1189,123 @@ impl ExecutionPlan for HashJoinExec {
     }
 }
 
-/// Compute min/max bounds for each column in the given arrays
-fn compute_bounds(arrays: &[ArrayRef]) -> Result<Vec<ColumnBounds>> {
-    arrays
-        .iter()
-        .map(|array| {
-            if array.is_empty() {
-                // Return NULL values for empty arrays
-                return Ok(ColumnBounds::new(
-                    ScalarValue::try_from(array.data_type())?,
-                    ScalarValue::try_from(array.data_type())?,
-                ));
+/// Accumulator for collecting min/max bounds from build-side data during hash 
join.
+///
+/// This struct encapsulates the logic for progressively computing column 
bounds
+/// (minimum and maximum values) for a specific join key expression as batches
+/// are processed during the build phase of a hash join.
+///
+/// The bounds are used for dynamic filter pushdown optimization, where filters
+/// based on the actual data ranges can be pushed down to the probe side to
+/// eliminate unnecessary data early.
+struct CollectLeftAccumulator {
+    /// The physical expression to evaluate for each batch
+    expr: Arc<dyn PhysicalExpr>,
+    /// Accumulator for tracking the minimum value across all batches
+    min: MinAccumulator,
+    /// Accumulator for tracking the maximum value across all batches
+    max: MaxAccumulator,
+}
+
+impl CollectLeftAccumulator {
+    /// Creates a new accumulator for tracking bounds of a join key expression.
+    ///
+    /// # Arguments
+    /// * `expr` - The physical expression to track bounds for
+    /// * `schema` - The schema of the input data
+    ///
+    /// # Returns
+    /// A new `CollectLeftAccumulator` instance configured for the 
expression's data type
+    fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> 
Result<Self> {
+        /// Recursively unwraps dictionary types to get the underlying value 
type.
+        fn dictionary_value_type(data_type: &DataType) -> DataType {
+            match data_type {
+                DataType::Dictionary(_, value_type) => {
+                    dictionary_value_type(value_type.as_ref())
+                }
+                _ => data_type.clone(),
             }
+        }
+
+        let data_type = expr
+            .data_type(schema)
+            // Min/Max can operate on dictionary data but expect to be 
initialized with the underlying value type
+            .map(|dt| dictionary_value_type(&dt))?;
+        Ok(Self {
+            expr,
+            min: MinAccumulator::try_new(&data_type)?,
+            max: MaxAccumulator::try_new(&data_type)?,
+        })
+    }
 
-            // Use Arrow kernels for efficient min/max computation
-            let min_val = min_batch(array)?;
-            let max_val = max_batch(array)?;

Review Comment:
   I think the accumulator `update_batch` also uses `min_batch` which uses 
`min_max_batch_generic` which was the expensive function, but I will try to 
bring this and see if it solves 
https://github.com/apache/datafusion/issues/17486



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to