adriangb commented on code in PR #18393:
URL: https://github.com/apache/datafusion/pull/18393#discussion_r2548058426


##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1471,6 +1502,29 @@ async fn collect_left_input(
     // Convert Box to Arc for sharing with SharedBuildAccumulator
     let hash_map: Arc<dyn JoinHashMapType> = hashmap.into();
 
+    let membership = if num_rows == 0 {
+        PushdownStrategy::Empty
+    } else {
+        // If the build side is small enough we can use IN list pushdown.
+        // If it's too big we fall back to pushing down a reference to the 
hash table.
+        // See `PushdownStrategy` for more details.
+        let estimated_size = left_values
+            .iter()
+            .map(|arr| arr.get_array_memory_size())

Review Comment:
   Makes sense. But maybe it's still okay since we would end up duplicating the 
values in the InListExpr? Either way like you say I think it's not a big deal, 
it's just a ballpark estimate...



##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -333,81 +402,154 @@ impl SharedBuildAccumulator {
                 // CollectLeft: Simple conjunction of bounds and membership 
check
                 AccumulatedBuildData::CollectLeft { data } => {
                     if let Some(partition_data) = data {
+                        // Create membership predicate (InList for small build 
sides, hash lookup otherwise)
+                        let membership_expr = create_membership_predicate(
+                            &self.on_right,
+                            partition_data.pushdown.clone(),
+                            &HASH_JOIN_SEED,
+                            self.probe_schema.as_ref(),
+                        )?;
+
                         // Create bounds check expression (if bounds available)
-                        let Some(filter_expr) = create_bounds_predicate(
+                        let bounds_expr = create_bounds_predicate(
                             &self.on_right,
                             &partition_data.bounds,
-                        ) else {
-                            // No bounds available, nothing to update
-                            return Ok(());
+                        );
+
+                        // Combine membership and bounds expressions
+                        let filter_expr = match (membership_expr, bounds_expr) 
{
+                            (Some(membership), Some(bounds)) => {
+                                // Both available: combine with AND
+                                Arc::new(BinaryExpr::new(
+                                    bounds,
+                                    Operator::And,
+                                    membership,
+                                ))
+                                    as Arc<dyn PhysicalExpr>
+                            }
+                            (Some(membership), None) => membership,
+                            (None, Some(bounds)) => bounds,
+                            (None, None) => {
+                                // No filter available, nothing to update
+                                return Ok(());
+                            }
                         };
 
                         self.dynamic_filter.update(filter_expr)?;
                     }
                 }
                 // Partitioned: CASE expression routing to per-partition 
filters
                 AccumulatedBuildData::Partitioned { partitions } => {
-                    // Collect all partition data, skipping empty partitions
+                    // Collect all partition data (should all be Some at this 
point)
                     let partition_data: Vec<_> =
                         partitions.iter().filter_map(|p| p.as_ref()).collect();
 
-                    if partition_data.is_empty() {
-                        // All partitions are empty: no rows can match, skip 
the probe side entirely
-                        self.dynamic_filter.update(lit(false))?;
-                        return Ok(());
-                    }
+                    if !partition_data.is_empty() {
+                        // Build a CASE expression that combines range checks 
AND membership checks
+                        // CASE (hash_repartition(join_keys) % num_partitions)
+                        //   WHEN 0 THEN (col >= min_0 AND col <= max_0 AND 
...) AND membership_check_0
+                        //   WHEN 1 THEN (col >= min_1 AND col <= max_1 AND 
...) AND membership_check_1
+                        //   ...
+                        //   ELSE false
+                        // END
+
+                        let num_partitions = partition_data.len();
+
+                        // Create base expression: hash_repartition(join_keys) 
% num_partitions
+                        let routing_hash_expr = Arc::new(HashExpr::new(
+                            self.on_right.clone(),
+                            self.repartition_random_state.clone(),
+                            "hash_repartition".to_string(),
+                        ))
+                            as Arc<dyn PhysicalExpr>;
+
+                        let modulo_expr = Arc::new(BinaryExpr::new(
+                            routing_hash_expr,
+                            Operator::Modulo,
+                            lit(ScalarValue::UInt64(Some(num_partitions as 
u64))),
+                        ))
+                            as Arc<dyn PhysicalExpr>;
+
+                        // Create WHEN branches for each partition
+                        let when_then_branches: Vec<(
+                            Arc<dyn PhysicalExpr>,
+                            Arc<dyn PhysicalExpr>,
+                        )> = partitions
+                            .iter()
+                            .enumerate()
+                            .filter_map(|(partition_id, partition_opt)| {
+                                partition_opt.as_ref().and_then(|partition| {
+                                    // Skip empty partitions - they would 
always return false anyway
+                                    match &partition.pushdown {
+                                        PushdownStrategy::Empty => None,
+                                        _ => Some((partition_id, partition)),
+                                    }
+                                })
+                            })
+                            .map(|(partition_id, partition)| -> Result<_> {
+                                // WHEN partition_id
+                                let when_expr =
+                                    lit(ScalarValue::UInt64(Some(partition_id 
as u64)));
+
+                                // THEN: Combine bounds check AND membership 
predicate
+
+                                // 1. Create membership predicate (InList for 
small build sides, hash lookup otherwise)
+                                let membership_expr = 
create_membership_predicate(
+                                    &self.on_right,
+                                    partition.pushdown.clone(),
+                                    &HASH_JOIN_SEED,
+                                    self.probe_schema.as_ref(),
+                                )?;
+
+                                // 2. Create bounds check expression for this 
partition (if bounds available)
+                                let bounds_expr = create_bounds_predicate(
+                                    &self.on_right,
+                                    &partition.bounds,
+                                );
+
+                                // 3. Combine membership and bounds expressions
+                                let then_expr = match (membership_expr, 
bounds_expr) {
+                                    (Some(membership), Some(bounds)) => {
+                                        // Both available: combine with AND
+                                        Arc::new(BinaryExpr::new(
+                                            bounds,
+                                            Operator::And,
+                                            membership,
+                                        ))
+                                            as Arc<dyn PhysicalExpr>
+                                    }
+                                    (Some(membership), None) => membership,
+                                    (None, Some(bounds)) => bounds,
+                                    (None, None) => {
+                                        // No filter for this partition - 
should not happen due to filter_map above
+                                        // but handle defensively by returning 
a "true" literal
+                                        lit(true)
+                                    }
+                                };
+
+                                Ok((when_expr, then_expr))
+                            })
+                            .collect::<Result<Vec<_>>>()?;
+
+                        // Optimize for single partition: skip CASE expression 
entirely
+                        let filter_expr = if when_then_branches.is_empty() {
+                            // All partitions are empty: no rows can match
+                            lit(false)
+                        } else if when_then_branches.len() == 1 {
+                            // Single partition: just use the condition 
directly
+                            // since hash % 1 == 0 always, the WHEN 0 branch 
will always match
+                            Arc::clone(&when_then_branches[0].1)
+                        } else {

Review Comment:
   Okay will add!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to