gabotechs commented on code in PR #21666:
URL: https://github.com/apache/datafusion/pull/21666#discussion_r3107041470


##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -358,229 +391,283 @@ impl SharedBuildAccumulator {
     /// # Returns
     /// * `Result<()>` - Ok if successful, Err if filter update failed or mode 
mismatch
     pub(crate) async fn report_build_data(&self, data: PartitionBuildData) -> 
Result<()> {
-        // Store data in the accumulator
-        {
+        let finalize_input = {
             let mut guard = self.inner.lock();
+            self.store_build_data(&mut guard, data)?;
+            self.take_finalize_input_if_ready(&mut guard)
+        };
 
-            match (data, &mut *guard) {
-                // Partitioned mode
-                (
-                    PartitionBuildData::Partitioned {
-                        partition_id,
-                        pushdown,
-                        bounds,
-                    },
-                    AccumulatedBuildData::Partitioned { partitions },
-                ) => {
-                    partitions[partition_id] = Some(PartitionData { pushdown, 
bounds });
-                }
-                // CollectLeft mode (store once, deduplicate across partitions)
-                (
-                    PartitionBuildData::CollectLeft { pushdown, bounds },
-                    AccumulatedBuildData::CollectLeft { data },
-                ) => {
-                    // Deduplicate - all partitions report the same data in 
CollectLeft
-                    if data.is_none() {
-                        *data = Some(PartitionData { pushdown, bounds });
-                    }
+        if let Some(finalize_input) = finalize_input {
+            self.finish(finalize_input);
+        }
+
+        self.wait_for_completion().await
+    }
+
+    pub(crate) fn report_canceled_partition(&self, partition_id: usize) {
+        let finalize_input = {
+            let mut guard = self.inner.lock();
+            self.store_canceled_partition(&mut guard, partition_id);
+            self.take_finalize_input_if_ready(&mut guard)
+        };
+
+        if let Some(finalize_input) = finalize_input {
+            self.finish(finalize_input);
+        }
+    }
+
+    fn store_build_data(
+        &self,
+        guard: &mut AccumulatorState,
+        data: PartitionBuildData,
+    ) -> Result<()> {
+        match (data, &mut guard.data) {
+            (
+                PartitionBuildData::Partitioned {
+                    partition_id,
+                    pushdown,
+                    bounds,
+                },
+                AccumulatedBuildData::Partitioned {
+                    partitions,
+                    completed_partitions,
+                },
+            ) => {
+                if matches!(partitions[partition_id], 
PartitionStatus::Pending) {
+                    *completed_partitions += 1;
                 }
-                // Mismatched modes - should never happen
-                _ => {
-                    return datafusion_common::internal_err!(
-                        "Build data mode mismatch in report_build_data"
-                    );
+                partitions[partition_id] =
+                    PartitionStatus::Reported(PartitionData { pushdown, bounds 
});
+            }
+            (
+                PartitionBuildData::CollectLeft { pushdown, bounds },
+                AccumulatedBuildData::CollectLeft {
+                    data,
+                    reported_count,
+                    ..
+                },
+            ) => {
+                if matches!(data, PartitionStatus::Pending) {
+                    *data = PartitionStatus::Reported(PartitionData { 
pushdown, bounds });
                 }
+                *reported_count += 1;
+            }
+            _ => {
+                return datafusion_common::internal_err!(
+                    "Build data mode mismatch in report_build_data"
+                );
             }
         }
+        Ok(())
+    }
 
-        // Wait for all partitions to report
-        if self.barrier.wait().await.is_leader() {
-            // All partitions have reported, so we can create and update the 
filter
-            let inner = self.inner.lock();
-
-            match &*inner {
-                // CollectLeft: Simple conjunction of bounds and membership 
check
-                AccumulatedBuildData::CollectLeft { data } => {
-                    if let Some(partition_data) = data {
-                        // Create membership predicate (InList for small build 
sides, hash lookup otherwise)
-                        let membership_expr = create_membership_predicate(
-                            &self.on_right,
-                            partition_data.pushdown.clone(),
-                            &HASH_JOIN_SEED,
-                            self.probe_schema.as_ref(),
-                        )?;
-
-                        // Create bounds check expression (if bounds available)
-                        let bounds_expr = create_bounds_predicate(
-                            &self.on_right,
-                            &partition_data.bounds,
-                        );
-
-                        // Combine membership and bounds expressions for 
multi-layer optimization:
-                        // - Bounds (min/max): Enable statistics-based pruning 
(Parquet row group/file skipping)
-                        // - Membership (InList/hash lookup): Enables:
-                        //   * Precise filtering (exact value matching)
-                        //   * Bloom filter utilization (if present in Parquet 
files)
-                        //   * Better pruning for data types where min/max 
isn't effective (e.g., UUIDs)
-                        // Together, they provide complementary benefits and 
maximize data skipping.
-                        // Only update the filter if we have something to push 
down
-                        if let Some(filter_expr) = match (membership_expr, 
bounds_expr) {
-                            (Some(membership), Some(bounds)) => {
-                                // Both available: combine with AND
-                                Some(Arc::new(BinaryExpr::new(
-                                    bounds,
-                                    Operator::And,
-                                    membership,
-                                ))
-                                    as Arc<dyn PhysicalExpr>)
-                            }
-                            (Some(membership), None) => {
-                                // Membership available but no bounds
-                                // This is reachable when we have data but 
bounds aren't available
-                                // (e.g., unsupported data types or no columns 
with bounds)
-                                Some(membership)
-                            }
-                            (None, Some(bounds)) => {
-                                // Bounds available but no membership.
-                                // This should be unreachable in practice: we 
can always push down a reference
-                                // to the hash table.
-                                // But it seems safer to handle it defensively.
-                                Some(bounds)
-                            }
-                            (None, None) => {
-                                // No filter available (e.g., empty build side)
-                                // Don't update the filter, but continue to 
mark complete
-                                None
-                            }
-                        } {
-                            self.dynamic_filter.update(filter_expr)?;
-                        }
+    fn store_canceled_partition(
+        &self,
+        guard: &mut AccumulatorState,
+        partition_id: usize,
+    ) {
+        if let AccumulatedBuildData::Partitioned {
+            partitions,
+            completed_partitions,
+        } = &mut guard.data
+            && matches!(partitions[partition_id], PartitionStatus::Pending)
+        {
+            partitions[partition_id] = PartitionStatus::CanceledUnknown;
+            *completed_partitions += 1;
+        }
+    }
+
+    fn take_finalize_input_if_ready(
+        &self,
+        guard: &mut AccumulatorState,
+    ) -> Option<FinalizeInput> {
+        if !matches!(guard.completion, CompletionState::Pending) {
+            return None;
+        }
+
+        let finalize_input = match &guard.data {
+            AccumulatedBuildData::Partitioned {
+                partitions,
+                completed_partitions,
+            } if *completed_partitions == partitions.len() => {
+                Some(FinalizeInput::Partitioned(partitions.clone()))
+            }
+            AccumulatedBuildData::CollectLeft {
+                data,
+                reported_count,
+                expected_reports,
+            } if *reported_count == *expected_reports => {
+                Some(FinalizeInput::CollectLeft(data.clone()))
+            }
+            _ => None,
+        }?;
+
+        guard.completion = CompletionState::Finalizing;
+        Some(finalize_input)
+    }
+
+    fn finish(&self, finalize_input: FinalizeInput) {
+        let result = self.build_filter(finalize_input).map_err(Arc::new);
+        self.dynamic_filter.mark_complete();
+
+        let mut guard = self.inner.lock();
+        guard.completion = CompletionState::Ready(result);
+        drop(guard);
+        self.completion_notify.notify_waiters();
+    }
+
+    async fn wait_for_completion(&self) -> Result<()> {
+        loop {
+            let notified = {
+                let guard = self.inner.lock();
+                match &guard.completion {
+                    CompletionState::Ready(Ok(())) => return Ok(()),
+                    CompletionState::Ready(Err(err)) => {
+                        return Err(DataFusionError::Shared(Arc::clone(err)));
+                    }
+                    CompletionState::Pending | CompletionState::Finalizing => {
+                        self.completion_notify.notified()
                     }
                 }
-                // Partitioned: CASE expression routing to per-partition 
filters
-                AccumulatedBuildData::Partitioned { partitions } => {
-                    // Collect all partition data (should all be Some at this 
point)
-                    let partition_data: Vec<_> =
-                        partitions.iter().filter_map(|p| p.as_ref()).collect();
-
-                    if !partition_data.is_empty() {
-                        // Build a CASE expression that combines range checks 
AND membership checks
-                        // CASE (hash_repartition(join_keys) % num_partitions)
-                        //   WHEN 0 THEN (col >= min_0 AND col <= max_0 AND 
...) AND membership_check_0
-                        //   WHEN 1 THEN (col >= min_1 AND col <= max_1 AND 
...) AND membership_check_1
-                        //   ...
-                        //   ELSE false
-                        // END
-
-                        let num_partitions = partition_data.len();
-
-                        // Create base expression: hash_repartition(join_keys) 
% num_partitions
-                        let routing_hash_expr = Arc::new(HashExpr::new(
-                            self.on_right.clone(),
-                            self.repartition_random_state.clone(),
-                            "hash_repartition".to_string(),
-                        ))
-                            as Arc<dyn PhysicalExpr>;
-
-                        let modulo_expr = Arc::new(BinaryExpr::new(
-                            routing_hash_expr,
-                            Operator::Modulo,
-                            lit(ScalarValue::UInt64(Some(num_partitions as 
u64))),
-                        ))
-                            as Arc<dyn PhysicalExpr>;
-
-                        // Create WHEN branches for each partition
-                        let when_then_branches: Vec<(
-                            Arc<dyn PhysicalExpr>,
-                            Arc<dyn PhysicalExpr>,
-                        )> = partitions
-                            .iter()
-                            .enumerate()
-                            .filter_map(|(partition_id, partition_opt)| {
-                                partition_opt.as_ref().and_then(|partition| {
-                                    // Skip empty partitions - they would 
always return false anyway
-                                    match &partition.pushdown {
-                                        PushdownStrategy::Empty => None,
-                                        _ => Some((partition_id, partition)),
-                                    }
-                                })
-                            })
-                            .map(|(partition_id, partition)| -> Result<_> {
-                                // WHEN partition_id
-                                let when_expr =
-                                    lit(ScalarValue::UInt64(Some(partition_id 
as u64)));
-
-                                // THEN: Combine bounds check AND membership 
predicate
-
-                                // 1. Create membership predicate (InList for 
small build sides, hash lookup otherwise)
-                                let membership_expr = 
create_membership_predicate(
-                                    &self.on_right,
-                                    partition.pushdown.clone(),
-                                    &HASH_JOIN_SEED,
-                                    self.probe_schema.as_ref(),
-                                )?;
-
-                                // 2. Create bounds check expression for this 
partition (if bounds available)
-                                let bounds_expr = create_bounds_predicate(
-                                    &self.on_right,
-                                    &partition.bounds,
-                                );
-
-                                // 3. Combine membership and bounds expressions
-                                let then_expr = match (membership_expr, 
bounds_expr) {
-                                    (Some(membership), Some(bounds)) => {
-                                        // Both available: combine with AND
-                                        Arc::new(BinaryExpr::new(
-                                            bounds,
-                                            Operator::And,
-                                            membership,
-                                        ))
-                                            as Arc<dyn PhysicalExpr>
-                                    }
-                                    (Some(membership), None) => {
-                                        // Membership available but no bounds 
(e.g., unsupported data types)
-                                        membership
-                                    }
-                                    (None, Some(bounds)) => {
-                                        // Bounds available but no membership.
-                                        // This should be unreachable in 
practice: we can always push down a reference
-                                        // to the hash table.
-                                        // But it seems safer to handle it 
defensively.
-                                        bounds
-                                    }
-                                    (None, None) => {
-                                        // No filter for this partition - 
should not happen due to filter_map above
-                                        // but handle defensively by returning 
a "true" literal
-                                        lit(true)
-                                    }
-                                };
-
-                                Ok((when_expr, then_expr))
-                            })
-                            .collect::<Result<Vec<_>>>()?;
-
-                        // Optimize for single partition: skip CASE expression 
entirely
-                        let filter_expr = if when_then_branches.is_empty() {
-                            // All partitions are empty: no rows can match
-                            lit(false)
-                        } else if when_then_branches.len() == 1 {
-                            // Single partition: just use the condition 
directly
-                            // since hash % 1 == 0 always, the WHEN 0 branch 
will always match
-                            Arc::clone(&when_then_branches[0].1)
-                        } else {
-                            // Multiple partitions: create CASE expression
-                            Arc::new(CaseExpr::try_new(
-                                Some(modulo_expr),
-                                when_then_branches,
-                                Some(lit(false)), // ELSE false
-                            )?) as Arc<dyn PhysicalExpr>
-                        };
+            };
+            notified.await;
+        }
+    }

Review Comment:
   I don't understand much the difference between the previous barrier and the 
current `loop` + `Notify` approach. At first sight, it looks like a convoluted 
way of re-implementing what the barrier, and the differential factor that seems 
key to solve the deadlock is calling `report_canceled_partition` on 
`HashJoinStream` drop.
   
   Is it not possible to just gracefully handling the cancellation by doing 
something with the `Barrier` on drop?



##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1316,6 +1316,33 @@ impl ExecutionPlan for HashJoinExec {
             .with_category(MetricCategory::Rows)
             .counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition);
 
+        // Initialize build_accumulator lazily with runtime partition counts 
(only if enabled)
+        // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition 
routing
+        let repartition_random_state = REPARTITION_RANDOM_STATE;
+        let on_right_exprs = self
+            .on
+            .iter()
+            .map(|(_, right_expr)| Arc::clone(right_expr))
+            .collect::<Vec<_>>();
+        let build_accumulator = enable_dynamic_filter_pushdown
+            .then(|| {
+                self.dynamic_filter.as_ref().map(|df| {
+                    let filter = Arc::clone(&df.filter);
+                    Some(Arc::clone(df.build_accumulator.get_or_init(|| {
+                        
Arc::new(SharedBuildAccumulator::new_from_partition_mode(

Review Comment:
   :thinking: I'm trying to deduce what does this change do, but I'm seeing 
that it's not related with the fix and it's just a refactor? is that right or 
did I missed something?



##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -358,229 +391,283 @@ impl SharedBuildAccumulator {
     /// # Returns
     /// * `Result<()>` - Ok if successful, Err if filter update failed or mode 
mismatch
     pub(crate) async fn report_build_data(&self, data: PartitionBuildData) -> 
Result<()> {
-        // Store data in the accumulator
-        {
+        let finalize_input = {
             let mut guard = self.inner.lock();
+            self.store_build_data(&mut guard, data)?;
+            self.take_finalize_input_if_ready(&mut guard)
+        };
 
-            match (data, &mut *guard) {
-                // Partitioned mode
-                (
-                    PartitionBuildData::Partitioned {
-                        partition_id,
-                        pushdown,
-                        bounds,
-                    },
-                    AccumulatedBuildData::Partitioned { partitions },
-                ) => {
-                    partitions[partition_id] = Some(PartitionData { pushdown, 
bounds });
-                }
-                // CollectLeft mode (store once, deduplicate across partitions)
-                (
-                    PartitionBuildData::CollectLeft { pushdown, bounds },
-                    AccumulatedBuildData::CollectLeft { data },
-                ) => {
-                    // Deduplicate - all partitions report the same data in 
CollectLeft
-                    if data.is_none() {
-                        *data = Some(PartitionData { pushdown, bounds });
-                    }
+        if let Some(finalize_input) = finalize_input {
+            self.finish(finalize_input);
+        }
+
+        self.wait_for_completion().await
+    }
+
+    pub(crate) fn report_canceled_partition(&self, partition_id: usize) {
+        let finalize_input = {
+            let mut guard = self.inner.lock();
+            self.store_canceled_partition(&mut guard, partition_id);
+            self.take_finalize_input_if_ready(&mut guard)
+        };
+
+        if let Some(finalize_input) = finalize_input {
+            self.finish(finalize_input);
+        }
+    }
+
+    fn store_build_data(
+        &self,
+        guard: &mut AccumulatorState,
+        data: PartitionBuildData,
+    ) -> Result<()> {
+        match (data, &mut guard.data) {
+            (
+                PartitionBuildData::Partitioned {
+                    partition_id,
+                    pushdown,
+                    bounds,
+                },
+                AccumulatedBuildData::Partitioned {
+                    partitions,
+                    completed_partitions,
+                },
+            ) => {
+                if matches!(partitions[partition_id], 
PartitionStatus::Pending) {
+                    *completed_partitions += 1;
                 }
-                // Mismatched modes - should never happen
-                _ => {
-                    return datafusion_common::internal_err!(
-                        "Build data mode mismatch in report_build_data"
-                    );
+                partitions[partition_id] =
+                    PartitionStatus::Reported(PartitionData { pushdown, bounds 
});
+            }
+            (
+                PartitionBuildData::CollectLeft { pushdown, bounds },
+                AccumulatedBuildData::CollectLeft {
+                    data,
+                    reported_count,
+                    ..
+                },
+            ) => {
+                if matches!(data, PartitionStatus::Pending) {
+                    *data = PartitionStatus::Reported(PartitionData { 
pushdown, bounds });
                 }
+                *reported_count += 1;
+            }
+            _ => {
+                return datafusion_common::internal_err!(
+                    "Build data mode mismatch in report_build_data"
+                );
             }
         }
+        Ok(())
+    }
 
-        // Wait for all partitions to report
-        if self.barrier.wait().await.is_leader() {
-            // All partitions have reported, so we can create and update the 
filter
-            let inner = self.inner.lock();
-
-            match &*inner {
-                // CollectLeft: Simple conjunction of bounds and membership 
check
-                AccumulatedBuildData::CollectLeft { data } => {
-                    if let Some(partition_data) = data {
-                        // Create membership predicate (InList for small build 
sides, hash lookup otherwise)
-                        let membership_expr = create_membership_predicate(
-                            &self.on_right,
-                            partition_data.pushdown.clone(),
-                            &HASH_JOIN_SEED,
-                            self.probe_schema.as_ref(),
-                        )?;
-
-                        // Create bounds check expression (if bounds available)
-                        let bounds_expr = create_bounds_predicate(
-                            &self.on_right,
-                            &partition_data.bounds,
-                        );
-
-                        // Combine membership and bounds expressions for 
multi-layer optimization:
-                        // - Bounds (min/max): Enable statistics-based pruning 
(Parquet row group/file skipping)
-                        // - Membership (InList/hash lookup): Enables:
-                        //   * Precise filtering (exact value matching)
-                        //   * Bloom filter utilization (if present in Parquet 
files)
-                        //   * Better pruning for data types where min/max 
isn't effective (e.g., UUIDs)
-                        // Together, they provide complementary benefits and 
maximize data skipping.
-                        // Only update the filter if we have something to push 
down
-                        if let Some(filter_expr) = match (membership_expr, 
bounds_expr) {
-                            (Some(membership), Some(bounds)) => {
-                                // Both available: combine with AND
-                                Some(Arc::new(BinaryExpr::new(
-                                    bounds,
-                                    Operator::And,
-                                    membership,
-                                ))
-                                    as Arc<dyn PhysicalExpr>)
-                            }
-                            (Some(membership), None) => {
-                                // Membership available but no bounds
-                                // This is reachable when we have data but 
bounds aren't available
-                                // (e.g., unsupported data types or no columns 
with bounds)
-                                Some(membership)
-                            }
-                            (None, Some(bounds)) => {
-                                // Bounds available but no membership.
-                                // This should be unreachable in practice: we 
can always push down a reference
-                                // to the hash table.
-                                // But it seems safer to handle it defensively.
-                                Some(bounds)
-                            }
-                            (None, None) => {
-                                // No filter available (e.g., empty build side)
-                                // Don't update the filter, but continue to 
mark complete
-                                None
-                            }
-                        } {
-                            self.dynamic_filter.update(filter_expr)?;
-                        }
+    fn store_canceled_partition(
+        &self,
+        guard: &mut AccumulatorState,
+        partition_id: usize,
+    ) {
+        if let AccumulatedBuildData::Partitioned {
+            partitions,
+            completed_partitions,
+        } = &mut guard.data
+            && matches!(partitions[partition_id], PartitionStatus::Pending)
+        {
+            partitions[partition_id] = PartitionStatus::CanceledUnknown;
+            *completed_partitions += 1;
+        }
+    }
+
+    fn take_finalize_input_if_ready(
+        &self,
+        guard: &mut AccumulatorState,
+    ) -> Option<FinalizeInput> {
+        if !matches!(guard.completion, CompletionState::Pending) {
+            return None;
+        }
+
+        let finalize_input = match &guard.data {
+            AccumulatedBuildData::Partitioned {
+                partitions,
+                completed_partitions,
+            } if *completed_partitions == partitions.len() => {
+                Some(FinalizeInput::Partitioned(partitions.clone()))
+            }
+            AccumulatedBuildData::CollectLeft {
+                data,
+                reported_count,
+                expected_reports,
+            } if *reported_count == *expected_reports => {
+                Some(FinalizeInput::CollectLeft(data.clone()))
+            }
+            _ => None,
+        }?;
+
+        guard.completion = CompletionState::Finalizing;
+        Some(finalize_input)
+    }
+
+    fn finish(&self, finalize_input: FinalizeInput) {
+        let result = self.build_filter(finalize_input).map_err(Arc::new);
+        self.dynamic_filter.mark_complete();
+
+        let mut guard = self.inner.lock();
+        guard.completion = CompletionState::Ready(result);
+        drop(guard);
+        self.completion_notify.notify_waiters();
+    }

Review Comment:
   A `Mutex` with shared state + a `Notify` for broadcasting update signals 
sounds essentially a reimplementation of `tokio::sync::watch`.
   
   I don't have the full context so the current solution might actually be the 
best one, just want to double-check that other synchronization primitives where 
considered before choosing this one.



##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -5628,6 +5643,158 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn 
test_partitioned_dynamic_filter_reports_empty_canceled_partitions()

Review Comment:
   Double checked with `main` and indeed this test reproduces the error 👍 



##########
datafusion/physical-plan/src/joins/hash_join/stream.rs:
##########
@@ -421,6 +424,49 @@ impl HashJoinStream {
         }
     }
 
+    /// Transitions state after build-side data has been collected, 
automatically
+    /// reporting build data to the accumulator when one is present.
+    ///
+    /// If a `build_accumulator` is configured, this method constructs the
+    /// appropriate [`PartitionBuildData`], schedules the reporting future, and
+    /// returns [`HashJoinStreamState::WaitPartitionBoundsReport`]. Otherwise 
it
+    /// delegates to [`Self::state_after_build_ready`].
+    fn transition_after_build_collected(
+        &mut self,
+        left_data: &Arc<JoinLeftData>,
+    ) -> HashJoinStreamState {
+        let Some(build_accumulator) = self.build_accumulator.as_ref() else {
+            return Self::state_after_build_ready(self.join_type, 
left_data.as_ref());
+        };
+
+        let pushdown = left_data.membership().clone();
+        let bounds = left_data
+            .bounds
+            .clone()
+            .unwrap_or_else(|| PartitionBounds::new(vec![]));
+
+        let build_data = match self.mode {
+            PartitionMode::Partitioned => PartitionBuildData::Partitioned {
+                partition_id: self.partition,
+                pushdown,
+                bounds,
+            },
+            PartitionMode::CollectLeft => {
+                PartitionBuildData::CollectLeft { pushdown, bounds }
+            }
+            PartitionMode::Auto => unreachable!(
+                "PartitionMode::Auto should not be present at execution time. 
This is a bug in DataFusion, please report it!"
+            ),
+        };
+
+        let acc = Arc::clone(build_accumulator);
+        self.build_waiter = Some(OnceFut::new(async move {
+            acc.report_build_data(build_data).await
+        }));
+        self.build_reported = true;

Review Comment:
   I'm also trying to understand what exactly changed here, and it looks like 
the only logical change is that now there's a `self.build_reported = true` flag 
set, but everything else is pretty much a refactor. Does that sound right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to