2010YOUY01 commented on code in PR #18644:
URL: https://github.com/apache/datafusion/pull/18644#discussion_r2525556256


##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -815,6 +914,66 @@ impl AggregateExec {
             }
         }
     }
+
+    /// Check if dynamic filter is possible for the current plan node.
+    /// - If yes, init one inside `AggregateExec`'s `dynamic_filter` field.
+    /// - If not supported, `self.dynamic_filter` should be kept `None`
+    fn init_dynamic_filter(&mut self) {
+        if (!self.group_by.is_single()) || (!matches!(self.mode, 
AggregateMode::Partial))

Review Comment:
   You're right, `is_empty()` should be used, fixed in 
[775c45b](https://github.com/apache/datafusion/pull/18644/commits/775c45b29ab078dea46c0de3061923b49c6436ff)



##########
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##########
@@ -53,15 +56,196 @@ pub(crate) struct AggregateStream {
 ///
 /// The latter requires a state object, which is [`AggregateStreamInner`].
 struct AggregateStreamInner {
+    // ==== Properties ====
     schema: SchemaRef,
     mode: AggregateMode,
     input: SendableRecordBatchStream,
-    baseline_metrics: BaselineMetrics,
     aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
     filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
+
+    // ==== Runtime States/Buffers ====
     accumulators: Vec<AccumulatorItem>,
-    reservation: MemoryReservation,
+    // None if the dynamic filter is not applicable. See details in 
`AggrDynFilter`.
+    agg_dyn_filter_state: Option<Arc<AggrDynFilter>>,
     finished: bool,
+
+    // ==== Execution Resources ====
+    baseline_metrics: BaselineMetrics,
+    reservation: MemoryReservation,
+}
+
+impl AggregateStreamInner {
+    // TODO: check if we get Null handling correct
+    /// # Examples
+    /// - Example 1
+    ///   Accumulators: min(c1)
+    ///   Current Bounds: min(c1)=10
+    ///   --> dynamic filter PhysicalExpr: c1 < 10
+    ///
+    /// - Example 2
+    ///   Accumulators: min(c1), max(c1), min(c2)
+    ///   Current Bounds: min(c1)=10, max(c1)=100, min(c2)=20
+    ///   --> dynamic filter PhysicalExpr: (c1 < 10) OR (c1>100) OR (c2 < 20)
+    ///
+    /// # Errors
+    /// Returns internal errors if the dynamic filter is not enabled

Review Comment:
   fixed in 
[775c45b](https://github.com/apache/datafusion/pull/18644/commits/775c45b29ab078dea46c0de3061923b49c6436ff)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to