ozankabak commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128238867
##########
datafusion/datasource/src/source.rs:
##########
@@ -179,12 +180,17 @@ pub trait DataSource: Send + Sync + Debug {
/// the [`FileSource`] trait.
///
/// [`FileSource`]: crate::file::FileSource
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for
cooperative scheduling.
#[derive(Clone, Debug)]
pub struct DataSourceExec {
/// The source of the data -- for example, `FileScanConfig` or
`MemorySourceConfig`
data_source: Arc<dyn DataSource>,
/// Cached plan properties such as sort order
cache: PlanProperties,
+ /// Indicates whether to enable cooperative yielding mode.
+ cooperative: bool,
Review Comment:
Should this flag should come from the `data_source` object? That is what
gives us the stream (which may or may not yield to the runtime).
##########
datafusion/datasource/src/source.rs:
##########
@@ -256,7 +262,39 @@ impl ExecutionPlan for DataSourceExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- self.data_source.open(partition, context)
+ // 1. Get the “base” stream exactly as before, without yielding.
+ let stream = self.data_source.open(partition, Arc::clone(&context));
+
+ // 2. If cooperative == false, return base_stream immediately.
+ if !self.cooperative {
+ return stream;
+ }
+
+ let frequency = context
+ .session_config()
+ .options()
+ .optimizer
+ .yield_frequency_for_pipeline_break;
+
+ // 3. If cooperative == true, wrap the stream into a YieldStream.
+ let yielding_stream = YieldStream::new(stream?, frequency);
+ Ok(Box::pin(yielding_stream))
+ }
+
+ /// Override: this operator *does* support cooperative yielding when
`cooperative == true`.
+ fn yields_cooperatively(&self) -> bool {
+ self.cooperative
+ }
+
+ /// If `cooperative == true`, return `Some(self.clone())` so the optimizer
knows
+ /// we can replace a plain DataSourceExec with this same node (it already
yields).
+ /// Otherwise, return None.
+ fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn
ExecutionPlan>> {
+ if self.cooperative {
+ Some(self)
+ } else {
+ None
+ }
Review Comment:
```suggestion
self.cooperative.then_some(self)
```
##########
datafusion/sqllogictest/test_files/joins.slt:
##########
@@ -4702,7 +4702,8 @@ physical_plan
01)CrossJoinExec
02)--DataSourceExec: partitions=1, partition_sizes=[0]
03)--ProjectionExec: expr=[1 as Int64(1)]
-04)----PlaceholderRowExec
+04)----YieldStreamExec frequency=64
+05)------PlaceholderRowExec
Review Comment:
Maybe we can, but let's first finalize the design and implementation for
others.
##########
datafusion/physical-plan/src/memory.rs:
##########
@@ -139,13 +140,18 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug +
fmt::Display {
///
/// This plan generates output batches lazily, it doesn't have to buffer all
batches
/// in memory up front (compared to `MemorySourceConfig`), thus consuming
constant memory.
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for
cooperative scheduling.
Review Comment:
Unnecessary
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3534,6 +3534,7 @@ async fn test_distribute_sort_memtable() -> Result<()> {
let session_config = SessionConfig::new()
.with_repartition_file_min_size(1000)
.with_target_partitions(3);
+
Review Comment:
```suggestion
```
##########
datafusion/common/src/config.rs:
##########
@@ -722,6 +722,19 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to
`LargeBinary`.
pub expand_views_at_output: bool, default = false
+
+ /// When true, the optimizer will insert a Yield operator at the leaf
nodes of any pipeline
+ /// that contains a pipeline-breaking operator, allowing the Tokio
scheduler to switch to
+ /// other tasks while waiting.
+ /// Default: true (enabled).
+ pub enable_add_yield_for_pipeline_break: bool, default = true
+
+ /// Yield frequency in batches, it represents how many batches to
process before yielding
+ /// to the Tokio scheduler. The default value is 64, which means that
after processing
+ /// 64 batches, the execution will yield control back to the Tokio
scheduler.
+ /// This setting is only effective when
`enable_add_yield_for_pipeline_break` is set to true.
+ /// This value should be greater than 0.
+ pub yield_frequency_for_pipeline_break: usize, default = 64
Review Comment:
I think a single configuration is enough, yield frequency being zero already
implies there is no yielding.
##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -546,6 +558,23 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
child_pushdown_result,
))
}
+
+ /// Whether this operator supports cooperative yielding. Default is false.
+ fn yields_cooperatively(&self) -> bool {
+ false
Review Comment:
Shouldn't this use the result of `emission_type` to return the default value
instead of just returning `false`?
##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -75,7 +75,19 @@ use futures::stream::{StreamExt, TryStreamExt};
/// [`execute`]: ExecutionPlan::execute
/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
-pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
+/// The core trait for a physical execution plan node. Every operator
+/// implements this trait. We have extended it by adding two new methods
+/// for “cooperative yielding” support:
+///
+/// 1. `yields_cooperatively()` indicates whether this operator already
+/// supports async/yield behavior internally (default: false).
+///
+/// 2. `with_cooperative_yields(self: Arc<Self>)` returns an alternate
+/// plan node that has built-in yielding; if not available, returns None.
+///
+/// Because `with_cooperative_yields` moves `Arc<Self>` into `Arc<dyn
ExecutionPlan>`,
+/// we must ensure `Self: 'static`. Therefore, we add `+ 'static` here.
+pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync + 'static {
Review Comment:
I think we can make this work without adding the `'static` lifetime.
##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -75,7 +75,19 @@ use futures::stream::{StreamExt, TryStreamExt};
/// [`execute`]: ExecutionPlan::execute
/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
-pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
+/// The core trait for a physical execution plan node. Every operator
+/// implements this trait. We have extended it by adding two new methods
+/// for “cooperative yielding” support:
+///
+/// 1. `yields_cooperatively()` indicates whether this operator already
+/// supports async/yield behavior internally (default: false).
+///
+/// 2. `with_cooperative_yields(self: Arc<Self>)` returns an alternate
+/// plan node that has built-in yielding; if not available, returns None.
+///
+/// Because `with_cooperative_yields` moves `Arc<Self>` into `Arc<dyn
ExecutionPlan>`,
+/// we must ensure `Self: 'static`. Therefore, we add `+ 'static` here.
Review Comment:
The comments here are not necessary, having good docstrings for the methods
themselves is enough.
##########
datafusion/physical-plan/src/memory.rs:
##########
@@ -254,10 +269,36 @@ impl ExecutionPlan for LazyMemoryExec {
);
}
- Ok(Box::pin(LazyMemoryStream {
+ let stream = Box::pin(LazyMemoryStream {
schema: Arc::clone(&self.schema),
generator: Arc::clone(&self.batch_generators[partition]),
- }))
+ });
+
+ // 2. If cooperative == false, return base_stream immediately.
+ if !self.cooperative {
+ return Ok(stream);
+ }
+
+ let frequency = context
+ .session_config()
+ .options()
+ .optimizer
+ .yield_frequency_for_pipeline_break;
+
+ // 3. If cooperative == true, wrap the stream into a YieldStream.
+ let yielding_stream = YieldStream::new(stream, frequency);
+ Ok(Box::pin(yielding_stream))
+ }
+
+ /// If `cooperative == true`, return `Some(self.clone())` so the optimizer
knows
+ /// we can replace a plain DataSourceExec with this same node (it already
yields).
+ /// Otherwise, return None.
+ fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn
ExecutionPlan>> {
+ if self.cooperative {
+ Some(self)
+ } else {
+ None
+ }
Review Comment:
```suggestion
self.cooperative.then_some(self)
```
##########
datafusion/physical-optimizer/src/optimizer.rs:
##########
@@ -35,6 +35,7 @@ use crate::sanity_checker::SanityCheckPlan;
use crate::topk_aggregation::TopKAggregation;
use crate::update_aggr_exprs::OptimizeAggregateOrder;
+use crate::wrap_leaves_cancellation::WrapLeaves;
Review Comment:
```suggestion
use crate::update_aggr_exprs::OptimizeAggregateOrder;
use crate::wrap_leaves_cancellation::WrapLeaves;
```
##########
datafusion/proto/src/physical_plan/mod.rs:
##########
@@ -89,6 +89,7 @@ use datafusion_common::config::TableParquetOptions;
use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
+use datafusion::physical_plan::yield_stream::YieldStreamExec;
Review Comment:
```suggestion
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
use datafusion::physical_plan::yield_stream::YieldStreamExec;
```
##########
datafusion/physical-plan/src/memory.rs:
##########
@@ -139,13 +140,18 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug +
fmt::Display {
///
/// This plan generates output batches lazily, it doesn't have to buffer all
batches
/// in memory up front (compared to `MemorySourceConfig`), thus consuming
constant memory.
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for
cooperative scheduling.
pub struct LazyMemoryExec {
/// Schema representing the data
schema: SchemaRef,
/// Functions to generate batches for each partition
batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
/// Plan properties cache storing equivalence properties, partitioning,
and execution mode
cache: PlanProperties,
+ /// Indicates whether to enable cooperative yielding mode.
Review Comment:
```suggestion
/// Indicates whether to enable cooperative yielding mode (defaults to
`true`).
```
##########
datafusion/sqllogictest/test_files/group_by.slt:
##########
@@ -4113,7 +4113,7 @@ EXPLAIN SELECT lhs.c, rhs.c, lhs.sum1, rhs.sum1
----
logical_plan
01)Projection: lhs.c, rhs.c, lhs.sum1, rhs.sum1
-02)--Cross Join:
+02)--Cross Join:
Review Comment:
Accidental whitespace?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]