alamb commented on code in PR #9346:
URL: https://github.com/apache/arrow-datafusion/pull/9346#discussion_r1504415151


##########
datafusion-examples/examples/custom_datasource.rs:
##########
@@ -199,11 +200,23 @@ impl CustomExec {
         db: CustomDataSource,
     ) -> Self {
         let projected_schema = project_schema(&schema, projections).unwrap();
+        let cache = Self::create_cache(projected_schema.clone());
         Self {
             db,
             projected_schema,
+            cache,
         }
     }
+
+    /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
+    fn create_cache(schema: SchemaRef) -> PlanPropertiesCache {

Review Comment:
   As mentioned above, calling this like 
   
   ```suggestion
       fn compute_properties(schema: SchemaRef) -> PlanProperties {
   ```
   
   I think would make its use and intent more clear



##########
datafusion-examples/examples/custom_datasource.rs:
##########
@@ -217,16 +230,8 @@ impl ExecutionPlan for CustomExec {
         self
     }
 
-    fn schema(&self) -> SchemaRef {
-        self.projected_schema.clone()
-    }
-
-    fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
-        datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
-    }
-
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
+    fn cache(&self) -> &PlanPropertiesCache {
+        &self.cache

Review Comment:
   ```suggestion
       fn properties(&self) -> &PlanProperties {
           &self.cache
   ```



##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -446,6 +450,123 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     }
 }
 
+/// Describes the execution mode of an operator's resulting stream with respect

Review Comment:
   This is a much nicer API 👍 



##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -446,6 +450,123 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     }
 }
 
+/// Describes the execution mode of an operator's resulting stream with respect
+/// to its size and behavior. There are three possible execution modes: 
`Bounded`,
+/// `Unbounded` and `PipelineBreaking`.
+#[derive(Clone, Copy, PartialEq, Debug)]
+pub enum ExecutionMode {
+    /// Represents the mode where generated stream is bounded, e.g. finite.
+    Bounded,
+    /// Represents the mode where generated stream is unbounded, e.g. infinite.
+    /// Even though the operator generates an unbounded stream of results, it
+    /// works with bounded memory and execution can still continue 
successfully.

Review Comment:
   ```suggestion
       /// works with bounded memory and execution can still continue 
successfully.
       ///
       /// The stream that results from calling `execute` on an `ExecutionPlan` 
that is `Unbounded` 
       /// will never be done (return `None`), except in case of error.
   ```



##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -446,6 +450,123 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     }
 }
 
+/// Describes the execution mode of an operator's resulting stream with respect
+/// to its size and behavior. There are three possible execution modes: 
`Bounded`,
+/// `Unbounded` and `PipelineBreaking`.
+#[derive(Clone, Copy, PartialEq, Debug)]
+pub enum ExecutionMode {
+    /// Represents the mode where generated stream is bounded, e.g. finite.
+    Bounded,
+    /// Represents the mode where generated stream is unbounded, e.g. infinite.
+    /// Even though the operator generates an unbounded stream of results, it
+    /// works with bounded memory and execution can still continue 
successfully.
+    Unbounded,
+    /// Represents the mode where some of the operator's input stream(s) are
+    /// unbounded; however, the operator cannot generate streaming results from
+    /// these streaming inputs. In this case, the execution mode will be 
pipeline
+    /// breaking, e.g. the operator requires unbounded memory to generate 
results.
+    PipelineBreaking,
+}
+
+impl ExecutionMode {
+    /// Check whether the execution mode is unbounded or not.
+    pub fn is_unbounded(&self) -> bool {
+        matches!(self, ExecutionMode::Unbounded)
+    }
+
+    /// Check whether the execution is pipeline friendly. If so, operator can
+    /// execute safely.
+    pub fn pipeline_friendly(&self) -> bool {
+        matches!(self, ExecutionMode::Bounded | ExecutionMode::Unbounded)
+    }
+}
+
+/// Conservatively "combines" execution modes of a given collection of 
operators.
+fn exec_mode_flatten<'a>(
+    children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
+) -> ExecutionMode {
+    let mut result = ExecutionMode::Bounded;
+    for mode in children.into_iter().map(|child| child.execution_mode()) {
+        match (mode, result) {
+            (ExecutionMode::PipelineBreaking, _)
+            | (_, ExecutionMode::PipelineBreaking) => {
+                // If any of the modes is `PipelineBreaking`, so is the result:
+                return ExecutionMode::PipelineBreaking;
+            }
+            (ExecutionMode::Unbounded, _) | (_, ExecutionMode::Unbounded) => {
+                // Unbounded mode eats up bounded mode:
+                result = ExecutionMode::Unbounded;
+            }
+            (ExecutionMode::Bounded, ExecutionMode::Bounded) => {
+                // When both modes are bounded, so is the result:
+                result = ExecutionMode::Bounded;
+            }
+        }
+    }
+    result
+}
+
+/// Represents a cache for plan properties used in query optimization.

Review Comment:
   ```suggestion
   /// Represents plan properties used in query optimization.
   ///
   /// These properties are in a single structure to permit this information to 
be computed
   /// once and then those cached results used multiple times without 
recomputation (aka a cache)
   ```



##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -121,21 +121,23 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     fn as_any(&self) -> &dyn Any;
 
     /// Get the schema for this execution plan
-    fn schema(&self) -> SchemaRef;
+    fn schema(&self) -> SchemaRef {
+        self.cache().schema().clone()
+    }
+
+    fn cache(&self) -> &PlanPropertiesCache;
 
     /// Specifies how the output of this `ExecutionPlan` is split into
     /// partitions.
-    fn output_partitioning(&self) -> Partitioning;
+    fn output_partitioning(&self) -> &Partitioning {

Review Comment:
   Given that `fn cache()` is required, it seems to me there is a potential in 
the API for conflicting implementations (e.g. a user could return a default 
`PlanProperties` but forget to remove `output_partitioning`
   
   This I suggest removing the functions like `output_partitioning` from the 
`ExecutionPlan` trait.
   
   You could probably avoid having to make a bunch of changes by moving the 
functions into a new trait like
   
   ```rust
   pub trait ExecutionPlanExt {
       fn execution_mode(&self) -> ExecutionMode;
   ...
   }
   
   // implement so current call sites of plan.execution_mode() still work:
   impl ExecutionPlanExt for &dyn ExecutionPlan {
       fn execution_mode(&self) -> ExecutionMode;
          self.cache().exec_mode
       }
    
   ```



##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -446,6 +450,123 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     }
 }
 
+/// Describes the execution mode of an operator's resulting stream with respect
+/// to its size and behavior. There are three possible execution modes: 
`Bounded`,
+/// `Unbounded` and `PipelineBreaking`.
+#[derive(Clone, Copy, PartialEq, Debug)]
+pub enum ExecutionMode {
+    /// Represents the mode where generated stream is bounded, e.g. finite.
+    Bounded,
+    /// Represents the mode where generated stream is unbounded, e.g. infinite.
+    /// Even though the operator generates an unbounded stream of results, it
+    /// works with bounded memory and execution can still continue 
successfully.
+    Unbounded,
+    /// Represents the mode where some of the operator's input stream(s) are
+    /// unbounded; however, the operator cannot generate streaming results from
+    /// these streaming inputs. In this case, the execution mode will be 
pipeline
+    /// breaking, e.g. the operator requires unbounded memory to generate 
results.
+    PipelineBreaking,
+}
+
+impl ExecutionMode {
+    /// Check whether the execution mode is unbounded or not.
+    pub fn is_unbounded(&self) -> bool {
+        matches!(self, ExecutionMode::Unbounded)
+    }
+
+    /// Check whether the execution is pipeline friendly. If so, operator can
+    /// execute safely.
+    pub fn pipeline_friendly(&self) -> bool {
+        matches!(self, ExecutionMode::Bounded | ExecutionMode::Unbounded)
+    }
+}
+
+/// Conservatively "combines" execution modes of a given collection of 
operators.
+fn exec_mode_flatten<'a>(

Review Comment:
   I found this name confusing as it isn't flattening a nested structure but 
more like computing something from its children.
   
   How about a name related to children? (I don't feel strongly)
   
   ```suggestion
   fn execution_mode_from_children<'a>(
   ```



##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -446,6 +450,123 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     }
 }
 
+/// Describes the execution mode of an operator's resulting stream with respect
+/// to its size and behavior. There are three possible execution modes: 
`Bounded`,
+/// `Unbounded` and `PipelineBreaking`.
+#[derive(Clone, Copy, PartialEq, Debug)]
+pub enum ExecutionMode {
+    /// Represents the mode where generated stream is bounded, e.g. finite.
+    Bounded,
+    /// Represents the mode where generated stream is unbounded, e.g. infinite.
+    /// Even though the operator generates an unbounded stream of results, it
+    /// works with bounded memory and execution can still continue 
successfully.
+    Unbounded,
+    /// Represents the mode where some of the operator's input stream(s) are
+    /// unbounded; however, the operator cannot generate streaming results from
+    /// these streaming inputs. In this case, the execution mode will be 
pipeline
+    /// breaking, e.g. the operator requires unbounded memory to generate 
results.
+    PipelineBreaking,

Review Comment:
   What is the usecase of `PipelineBreaking` ? I didn't see it used anywhere in 
the code differently than `ExecutionMode::Bounded` (though I may have missed)
   
   I guess I am wondering what is (practical)  difference between `Bounded` and 
`PipelineBreaking`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to