2010YOUY01 commented on code in PR #18644:
URL: https://github.com/apache/datafusion/pull/18644#discussion_r2525555322


##########
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##########
@@ -53,15 +56,196 @@ pub(crate) struct AggregateStream {
 ///
 /// The latter requires a state object, which is [`AggregateStreamInner`].
 struct AggregateStreamInner {
+    // ==== Properties ====
     schema: SchemaRef,
     mode: AggregateMode,
     input: SendableRecordBatchStream,
-    baseline_metrics: BaselineMetrics,
     aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
     filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
+
+    // ==== Runtime States/Buffers ====
     accumulators: Vec<AccumulatorItem>,
-    reservation: MemoryReservation,
+    // None if the dynamic filter is not applicable. See details in 
`AggrDynFilter`.
+    agg_dyn_filter_state: Option<Arc<AggrDynFilter>>,
     finished: bool,
+
+    // ==== Execution Resources ====
+    baseline_metrics: BaselineMetrics,
+    reservation: MemoryReservation,
+}
+
+impl AggregateStreamInner {
+    // TODO: check if we get Null handling correct
+    /// # Examples
+    /// - Example 1
+    ///   Accumulators: min(c1)
+    ///   Current Bounds: min(c1)=10
+    ///   --> dynamic filter PhysicalExpr: c1 < 10
+    ///
+    /// - Example 2
+    ///   Accumulators: min(c1), max(c1), min(c2)
+    ///   Current Bounds: min(c1)=10, max(c1)=100, min(c2)=20
+    ///   --> dynamic filter PhysicalExpr: (c1 < 10) OR (c1>100) OR (c2 < 20)
+    ///
+    /// # Errors
+    /// Returns internal errors if the dynamic filter is not enabled
+    fn build_dynamic_filter_from_accumulator_bounds(
+        &self,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        let Some(filter_state) = self.agg_dyn_filter_state.as_ref() else {
+            return Ok(lit(true));
+        };
+
+        let mut predicates: Vec<Arc<dyn PhysicalExpr>> =
+            Vec::with_capacity(filter_state.supported_accumulators_info.len());
+
+        for acc_info in &filter_state.supported_accumulators_info {
+            // Skip if we don't yet have a meaningful bound
+            let bound = {
+                let guard = acc_info.shared_bound.lock();
+                if (*guard).is_null() {
+                    continue;
+                }
+                guard.clone()
+            };
+
+            let agg_exprs = self
+                .aggregate_expressions
+                .get(acc_info.aggr_index)
+                .ok_or_else(|| {
+                    internal_datafusion_err!(
+                        "Invalid aggregate expression index {} for dynamic 
filter",
+                        acc_info.aggr_index
+                    )
+                })?;
+            // Only aggregates with a single argument are supported.
+            let column_expr = agg_exprs.first().ok_or_else(|| {
+                internal_datafusion_err!(
+                    "Aggregate expression at index {} expected a single 
argument",
+                    acc_info.aggr_index
+                )
+            })?;
+
+            let literal = lit(bound);
+            let predicate: Arc<dyn PhysicalExpr> = match acc_info.aggr_type {
+                DynamicFilterAggregateType::Min => Arc::new(BinaryExpr::new(
+                    Arc::clone(column_expr),
+                    Operator::Lt,
+                    literal,
+                )),
+                DynamicFilterAggregateType::Max => Arc::new(BinaryExpr::new(
+                    Arc::clone(column_expr),
+                    Operator::Gt,
+                    literal,
+                )),
+            };
+            predicates.push(predicate);
+        }
+
+        let combined = predicates.into_iter().reduce(|acc, pred| {
+            Arc::new(BinaryExpr::new(acc, Operator::Or, pred)) as Arc<dyn 
PhysicalExpr>
+        });
+
+        Ok(combined.unwrap_or_else(|| lit(true)))
+    }
+
+    // If the dynamic filter is enabled, update it using the current 
accumulator's
+    // values
+    fn maybe_update_dyn_filter(&mut self) -> Result<()> {
+        // Step 1: Update each partition's current bound
+        let Some(filter_state) = self.agg_dyn_filter_state.as_ref() else {
+            return Ok(());
+        };
+
+        for acc_info in &filter_state.supported_accumulators_info {
+            let acc =
+                self.accumulators
+                    .get_mut(acc_info.aggr_index)
+                    .ok_or_else(|| {
+                        internal_datafusion_err!(
+                            "Invalid accumulator index {} for dynamic filter",
+                            acc_info.aggr_index
+                        )
+                    })?;
+            // First get current partition's bound, then update the shared 
bound among
+            // all partitions.
+            let current_bound = acc.evaluate()?;

Review Comment:
   Good point! Additionally, maybe we should only update the shared bound if 
the local bound has tighten, to reduce lock contention.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to