alamb commented on code in PR #9346:
URL: https://github.com/apache/arrow-datafusion/pull/9346#discussion_r1504415151
##########
datafusion-examples/examples/custom_datasource.rs:
##########
@@ -199,11 +200,23 @@ impl CustomExec {
db: CustomDataSource,
) -> Self {
let projected_schema = project_schema(&schema, projections).unwrap();
+ let cache = Self::create_cache(projected_schema.clone());
Self {
db,
projected_schema,
+ cache,
}
}
+
+ /// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
+ fn create_cache(schema: SchemaRef) -> PlanPropertiesCache {
Review Comment:
As mentioned above, calling this like
```suggestion
fn compute_properties(schema: SchemaRef) -> PlanProperties {
```
I think would make its use and intent more clear
##########
datafusion-examples/examples/custom_datasource.rs:
##########
@@ -217,16 +230,8 @@ impl ExecutionPlan for CustomExec {
self
}
- fn schema(&self) -> SchemaRef {
- self.projected_schema.clone()
- }
-
- fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
- datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
- }
-
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
+ fn cache(&self) -> &PlanPropertiesCache {
+ &self.cache
Review Comment:
```suggestion
fn properties(&self) -> &PlanProperties {
&self.cache
```
##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -446,6 +450,123 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}
}
+/// Describes the execution mode of an operator's resulting stream with respect
Review Comment:
This is a much nicer API 👍
##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -446,6 +450,123 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}
}
+/// Describes the execution mode of an operator's resulting stream with respect
+/// to its size and behavior. There are three possible execution modes:
`Bounded`,
+/// `Unbounded` and `PipelineBreaking`.
+#[derive(Clone, Copy, PartialEq, Debug)]
+pub enum ExecutionMode {
+ /// Represents the mode where generated stream is bounded, e.g. finite.
+ Bounded,
+ /// Represents the mode where generated stream is unbounded, e.g. infinite.
+ /// Even though the operator generates an unbounded stream of results, it
+ /// works with bounded memory and execution can still continue
successfully.
Review Comment:
```suggestion
/// works with bounded memory and execution can still continue
successfully.
///
/// The stream that results from calling `execute` on an `ExecutionPlan`
that is `Unbounded`
/// will never be done (return `None`), except in case of error.
```
##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -446,6 +450,123 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}
}
+/// Describes the execution mode of an operator's resulting stream with respect
+/// to its size and behavior. There are three possible execution modes:
`Bounded`,
+/// `Unbounded` and `PipelineBreaking`.
+#[derive(Clone, Copy, PartialEq, Debug)]
+pub enum ExecutionMode {
+ /// Represents the mode where generated stream is bounded, e.g. finite.
+ Bounded,
+ /// Represents the mode where generated stream is unbounded, e.g. infinite.
+ /// Even though the operator generates an unbounded stream of results, it
+ /// works with bounded memory and execution can still continue
successfully.
+ Unbounded,
+ /// Represents the mode where some of the operator's input stream(s) are
+ /// unbounded; however, the operator cannot generate streaming results from
+ /// these streaming inputs. In this case, the execution mode will be
pipeline
+ /// breaking, e.g. the operator requires unbounded memory to generate
results.
+ PipelineBreaking,
+}
+
+impl ExecutionMode {
+ /// Check whether the execution mode is unbounded or not.
+ pub fn is_unbounded(&self) -> bool {
+ matches!(self, ExecutionMode::Unbounded)
+ }
+
+ /// Check whether the execution is pipeline friendly. If so, operator can
+ /// execute safely.
+ pub fn pipeline_friendly(&self) -> bool {
+ matches!(self, ExecutionMode::Bounded | ExecutionMode::Unbounded)
+ }
+}
+
+/// Conservatively "combines" execution modes of a given collection of
operators.
+fn exec_mode_flatten<'a>(
+ children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
+) -> ExecutionMode {
+ let mut result = ExecutionMode::Bounded;
+ for mode in children.into_iter().map(|child| child.execution_mode()) {
+ match (mode, result) {
+ (ExecutionMode::PipelineBreaking, _)
+ | (_, ExecutionMode::PipelineBreaking) => {
+ // If any of the modes is `PipelineBreaking`, so is the result:
+ return ExecutionMode::PipelineBreaking;
+ }
+ (ExecutionMode::Unbounded, _) | (_, ExecutionMode::Unbounded) => {
+ // Unbounded mode eats up bounded mode:
+ result = ExecutionMode::Unbounded;
+ }
+ (ExecutionMode::Bounded, ExecutionMode::Bounded) => {
+ // When both modes are bounded, so is the result:
+ result = ExecutionMode::Bounded;
+ }
+ }
+ }
+ result
+}
+
+/// Represents a cache for plan properties used in query optimization.
Review Comment:
```suggestion
/// Represents plan properties used in query optimization.
///
/// These properties are in a single structure to permit this information to
be computed
/// once and then those cached results used multiple times without
recomputation (aka a cache)
```
##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -121,21 +121,23 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
fn as_any(&self) -> &dyn Any;
/// Get the schema for this execution plan
- fn schema(&self) -> SchemaRef;
+ fn schema(&self) -> SchemaRef {
+ self.cache().schema().clone()
+ }
+
+ fn cache(&self) -> &PlanPropertiesCache;
/// Specifies how the output of this `ExecutionPlan` is split into
/// partitions.
- fn output_partitioning(&self) -> Partitioning;
+ fn output_partitioning(&self) -> &Partitioning {
Review Comment:
Given that `fn cache()` is required, it seems to me there is a potential in
the API for conflicting implementations (e.g. a user could return a default
`PlanProperties` but forget to remove `output_partitioning`
This I suggest removing the functions like `output_partitioning` from the
`ExecutionPlan` trait.
You could probably avoid having to make a bunch of changes by moving the
functions into a new trait like
```rust
pub trait ExecutionPlanExt {
fn execution_mode(&self) -> ExecutionMode;
...
}
// implement so current call sites of plan.execution_mode() still work:
impl ExecutionPlanExt for &dyn ExecutionPlan {
fn execution_mode(&self) -> ExecutionMode;
self.cache().exec_mode
}
```
##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -446,6 +450,123 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}
}
+/// Describes the execution mode of an operator's resulting stream with respect
+/// to its size and behavior. There are three possible execution modes:
`Bounded`,
+/// `Unbounded` and `PipelineBreaking`.
+#[derive(Clone, Copy, PartialEq, Debug)]
+pub enum ExecutionMode {
+ /// Represents the mode where generated stream is bounded, e.g. finite.
+ Bounded,
+ /// Represents the mode where generated stream is unbounded, e.g. infinite.
+ /// Even though the operator generates an unbounded stream of results, it
+ /// works with bounded memory and execution can still continue
successfully.
+ Unbounded,
+ /// Represents the mode where some of the operator's input stream(s) are
+ /// unbounded; however, the operator cannot generate streaming results from
+ /// these streaming inputs. In this case, the execution mode will be
pipeline
+ /// breaking, e.g. the operator requires unbounded memory to generate
results.
+ PipelineBreaking,
+}
+
+impl ExecutionMode {
+ /// Check whether the execution mode is unbounded or not.
+ pub fn is_unbounded(&self) -> bool {
+ matches!(self, ExecutionMode::Unbounded)
+ }
+
+ /// Check whether the execution is pipeline friendly. If so, operator can
+ /// execute safely.
+ pub fn pipeline_friendly(&self) -> bool {
+ matches!(self, ExecutionMode::Bounded | ExecutionMode::Unbounded)
+ }
+}
+
+/// Conservatively "combines" execution modes of a given collection of
operators.
+fn exec_mode_flatten<'a>(
Review Comment:
I found this name confusing as it isn't flattening a nested structure but
more like computing something from its children.
How about a name related to children? (I don't feel strongly)
```suggestion
fn execution_mode_from_children<'a>(
```
##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -446,6 +450,123 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}
}
+/// Describes the execution mode of an operator's resulting stream with respect
+/// to its size and behavior. There are three possible execution modes:
`Bounded`,
+/// `Unbounded` and `PipelineBreaking`.
+#[derive(Clone, Copy, PartialEq, Debug)]
+pub enum ExecutionMode {
+ /// Represents the mode where generated stream is bounded, e.g. finite.
+ Bounded,
+ /// Represents the mode where generated stream is unbounded, e.g. infinite.
+ /// Even though the operator generates an unbounded stream of results, it
+ /// works with bounded memory and execution can still continue
successfully.
+ Unbounded,
+ /// Represents the mode where some of the operator's input stream(s) are
+ /// unbounded; however, the operator cannot generate streaming results from
+ /// these streaming inputs. In this case, the execution mode will be
pipeline
+ /// breaking, e.g. the operator requires unbounded memory to generate
results.
+ PipelineBreaking,
Review Comment:
What is the usecase of `PipelineBreaking` ? I didn't see it used anywhere in
the code differently than `ExecutionMode::Bounded` (though I may have missed)
I guess I am wondering what is (practical) difference between `Bounded` and
`PipelineBreaking`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]