Copilot commented on code in PR #21666:
URL: https://github.com/apache/datafusion/pull/21666#discussion_r3094166706


##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -358,229 +391,285 @@ impl SharedBuildAccumulator {
     /// # Returns
     /// * `Result<()>` - Ok if successful, Err if filter update failed or mode 
mismatch
     pub(crate) async fn report_build_data(&self, data: PartitionBuildData) -> 
Result<()> {
-        // Store data in the accumulator
-        {
+        let finalize_input = {

Review Comment:
   The `report_build_data` doc comment still says "have reported (barrier 
wait)", but this method no longer uses `tokio::sync::Barrier` (it uses 
`Notify`/`CompletionState`). Please update the docs to match the current 
synchronization mechanism so the comment doesn’t mislead future changes.



##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -262,13 +262,40 @@ struct PartitionData {
 /// Build-side data organized by partition mode
 enum AccumulatedBuildData {
     Partitioned {
-        partitions: Vec<Option<PartitionData>>,
+        partitions: Vec<PartitionStatus>,
+        completed_partitions: usize,
     },
     CollectLeft {
-        data: Option<PartitionData>,
+        data: PartitionStatus,
+        reported_count: usize,
+        expected_reports: usize,
     },
 }
 
+enum CompletionState {
+    Pending,
+    Finalizing,
+    Ready(std::result::Result<(), String>),

Review Comment:
   `CompletionState::Ready` stores errors as `Result<(), String>`, which forces 
later callers to lose the original `DataFusionError` variant/backtrace/context. 
Consider storing `Result<(), Arc<DataFusionError>>` (or 
`datafusion_common::SharedResult<()>`) in `CompletionState` instead, so you can 
propagate `DataFusionError::Shared(...)` to all waiters without stringifying.
   ```suggestion
       Ready(std::result::Result<(), Arc<datafusion_common::DataFusionError>>),
   ```



##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -358,229 +391,285 @@ impl SharedBuildAccumulator {
     /// # Returns
     /// * `Result<()>` - Ok if successful, Err if filter update failed or mode 
mismatch
     pub(crate) async fn report_build_data(&self, data: PartitionBuildData) -> 
Result<()> {
-        // Store data in the accumulator
-        {
+        let finalize_input = {
             let mut guard = self.inner.lock();
+            self.store_build_data(&mut guard, data)?;
+            self.take_finalize_input_if_ready(&mut guard)
+        };
 
-            match (data, &mut *guard) {
-                // Partitioned mode
-                (
-                    PartitionBuildData::Partitioned {
-                        partition_id,
-                        pushdown,
-                        bounds,
-                    },
-                    AccumulatedBuildData::Partitioned { partitions },
-                ) => {
-                    partitions[partition_id] = Some(PartitionData { pushdown, 
bounds });
-                }
-                // CollectLeft mode (store once, deduplicate across partitions)
-                (
-                    PartitionBuildData::CollectLeft { pushdown, bounds },
-                    AccumulatedBuildData::CollectLeft { data },
-                ) => {
-                    // Deduplicate - all partitions report the same data in 
CollectLeft
-                    if data.is_none() {
-                        *data = Some(PartitionData { pushdown, bounds });
-                    }
+        if let Some(finalize_input) = finalize_input {
+            self.finish(finalize_input);
+        }
+
+        self.wait_for_completion().await
+    }
+
+    pub(crate) fn report_canceled_partition(&self, partition_id: usize) {
+        let finalize_input = {
+            let mut guard = self.inner.lock();
+            self.store_canceled_partition(&mut guard, partition_id);
+            self.take_finalize_input_if_ready(&mut guard)
+        };
+
+        if let Some(finalize_input) = finalize_input {
+            self.finish(finalize_input);
+        }
+    }
+
+    fn store_build_data(
+        &self,
+        guard: &mut AccumulatorState,
+        data: PartitionBuildData,
+    ) -> Result<()> {
+        match (data, &mut guard.data) {
+            (
+                PartitionBuildData::Partitioned {
+                    partition_id,
+                    pushdown,
+                    bounds,
+                },
+                AccumulatedBuildData::Partitioned {
+                    partitions,
+                    completed_partitions,
+                },
+            ) => {
+                if matches!(partitions[partition_id], 
PartitionStatus::Pending) {
+                    *completed_partitions += 1;
                 }
-                // Mismatched modes - should never happen
-                _ => {
-                    return datafusion_common::internal_err!(
-                        "Build data mode mismatch in report_build_data"
-                    );
+                partitions[partition_id] =
+                    PartitionStatus::Reported(PartitionData { pushdown, bounds 
});
+            }
+            (
+                PartitionBuildData::CollectLeft { pushdown, bounds },
+                AccumulatedBuildData::CollectLeft {
+                    data,
+                    reported_count,
+                    ..
+                },
+            ) => {
+                if matches!(data, PartitionStatus::Pending) {
+                    *data = PartitionStatus::Reported(PartitionData { 
pushdown, bounds });
                 }
+                *reported_count += 1;
+            }
+            _ => {
+                return datafusion_common::internal_err!(
+                    "Build data mode mismatch in report_build_data"
+                );
             }
         }
+        Ok(())
+    }
 
-        // Wait for all partitions to report
-        if self.barrier.wait().await.is_leader() {
-            // All partitions have reported, so we can create and update the 
filter
-            let inner = self.inner.lock();
-
-            match &*inner {
-                // CollectLeft: Simple conjunction of bounds and membership 
check
-                AccumulatedBuildData::CollectLeft { data } => {
-                    if let Some(partition_data) = data {
-                        // Create membership predicate (InList for small build 
sides, hash lookup otherwise)
-                        let membership_expr = create_membership_predicate(
-                            &self.on_right,
-                            partition_data.pushdown.clone(),
-                            &HASH_JOIN_SEED,
-                            self.probe_schema.as_ref(),
-                        )?;
-
-                        // Create bounds check expression (if bounds available)
-                        let bounds_expr = create_bounds_predicate(
-                            &self.on_right,
-                            &partition_data.bounds,
-                        );
-
-                        // Combine membership and bounds expressions for 
multi-layer optimization:
-                        // - Bounds (min/max): Enable statistics-based pruning 
(Parquet row group/file skipping)
-                        // - Membership (InList/hash lookup): Enables:
-                        //   * Precise filtering (exact value matching)
-                        //   * Bloom filter utilization (if present in Parquet 
files)
-                        //   * Better pruning for data types where min/max 
isn't effective (e.g., UUIDs)
-                        // Together, they provide complementary benefits and 
maximize data skipping.
-                        // Only update the filter if we have something to push 
down
-                        if let Some(filter_expr) = match (membership_expr, 
bounds_expr) {
-                            (Some(membership), Some(bounds)) => {
-                                // Both available: combine with AND
-                                Some(Arc::new(BinaryExpr::new(
-                                    bounds,
-                                    Operator::And,
-                                    membership,
-                                ))
-                                    as Arc<dyn PhysicalExpr>)
-                            }
-                            (Some(membership), None) => {
-                                // Membership available but no bounds
-                                // This is reachable when we have data but 
bounds aren't available
-                                // (e.g., unsupported data types or no columns 
with bounds)
-                                Some(membership)
-                            }
-                            (None, Some(bounds)) => {
-                                // Bounds available but no membership.
-                                // This should be unreachable in practice: we 
can always push down a reference
-                                // to the hash table.
-                                // But it seems safer to handle it defensively.
-                                Some(bounds)
-                            }
-                            (None, None) => {
-                                // No filter available (e.g., empty build side)
-                                // Don't update the filter, but continue to 
mark complete
-                                None
-                            }
-                        } {
-                            self.dynamic_filter.update(filter_expr)?;
-                        }
+    fn store_canceled_partition(
+        &self,
+        guard: &mut AccumulatorState,
+        partition_id: usize,
+    ) {
+        if let AccumulatedBuildData::Partitioned {
+            partitions,
+            completed_partitions,
+        } = &mut guard.data
+            && matches!(partitions[partition_id], PartitionStatus::Pending)
+        {
+            partitions[partition_id] = PartitionStatus::CanceledUnknown;
+            *completed_partitions += 1;
+        }
+    }
+
+    fn take_finalize_input_if_ready(
+        &self,
+        guard: &mut AccumulatorState,
+    ) -> Option<FinalizeInput> {
+        if !matches!(guard.completion, CompletionState::Pending) {
+            return None;
+        }
+
+        let finalize_input = match &guard.data {
+            AccumulatedBuildData::Partitioned {
+                partitions,
+                completed_partitions,
+            } if *completed_partitions == partitions.len() => {
+                Some(FinalizeInput::Partitioned(partitions.clone()))
+            }
+            AccumulatedBuildData::CollectLeft {
+                data,
+                reported_count,
+                expected_reports,
+            } if *reported_count == *expected_reports => {
+                Some(FinalizeInput::CollectLeft(data.clone()))
+            }
+            _ => None,
+        }?;
+
+        guard.completion = CompletionState::Finalizing;
+        Some(finalize_input)
+    }
+
+    fn finish(&self, finalize_input: FinalizeInput) {
+        let result = self
+            .build_filter(finalize_input)
+            .map_err(|err| err.to_string());
+        self.dynamic_filter.mark_complete();
+
+        let mut guard = self.inner.lock();
+        guard.completion = CompletionState::Ready(result);
+        drop(guard);
+        self.completion_notify.notify_waiters();
+    }
+
+    async fn wait_for_completion(&self) -> Result<()> {
+        loop {
+            let notified = {
+                let guard = self.inner.lock();
+                match &guard.completion {
+                    CompletionState::Ready(Ok(())) => return Ok(()),
+                    CompletionState::Ready(Err(err)) => {
+                        return Err(DataFusionError::Execution(err.clone()));
+                    }

Review Comment:
   `wait_for_completion` converts stored failures into 
`DataFusionError::Execution(err.clone())`, which changes the error category and 
can make debugging harder. If you keep an `Arc<DataFusionError>` in 
`CompletionState`, you can return `Err(DataFusionError::Shared(err_arc))` here 
and preserve the original error semantics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to