This is an automated email from the ASF dual-hosted git repository.
berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 55730dcece Feat: Add fetch to CoalescePartitionsExec (#14499)
55730dcece is described below
commit 55730dcecebe5475af40b9e8a5c4805a73c31b11
Author: mertak-synnada <[email protected]>
AuthorDate: Thu Feb 6 10:07:03 2025 +0000
Feat: Add fetch to CoalescePartitionsExec (#14499)
* add fetch info to CoalescePartitionsExec
* use Statistics with_fetch API on CoalescePartitionsExec
* check limit_reached only if fetch is assigned
---
.../tests/physical_optimizer/limit_pushdown.rs | 20 ++++-----
datafusion/core/tests/sql/explain_analyze.rs | 7 ---
.../physical-optimizer/src/limit_pushdown.rs | 9 ----
.../physical-plan/src/coalesce_partitions.rs | 50 +++++++++++++++++-----
datafusion/physical-plan/src/stream.rs | 31 +++++++++++++-
datafusion/physical-plan/src/union.rs | 12 +++++-
datafusion/sqllogictest/test_files/aggregate.slt | 23 +++++-----
datafusion/sqllogictest/test_files/limit.slt | 5 +--
datafusion/sqllogictest/test_files/repartition.slt | 11 +++--
datafusion/sqllogictest/test_files/union.slt | 43 +++++++++----------
10 files changed, 128 insertions(+), 83 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
index 49490b2a3d..dd2c1960a6 100644
--- a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
@@ -233,12 +233,11 @@ fn
transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limi
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
let expected = [
- "GlobalLimitExec: skip=0, fetch=5",
- " CoalescePartitionsExec",
- " CoalesceBatchesExec: target_batch_size=8192, fetch=5",
- " FilterExec: c3@2 > 0",
- " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " StreamingTableExec: partition_sizes=1, projection=[c1,
c2, c3], infinite_source=true"
+ "CoalescePartitionsExec: fetch=5",
+ " CoalesceBatchesExec: target_batch_size=8192, fetch=5",
+ " FilterExec: c3@2 > 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " StreamingTableExec: partition_sizes=1, projection=[c1, c2,
c3], infinite_source=true"
];
assert_eq!(get_plan_string(&after_optimize), expected);
@@ -378,11 +377,10 @@ fn
keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions() -> R
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
let expected = [
- "GlobalLimitExec: skip=0, fetch=5",
- " CoalescePartitionsExec",
- " FilterExec: c3@2 > 0",
- " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " StreamingTableExec: partition_sizes=1, projection=[c1,
c2, c3], infinite_source=true"
+ "CoalescePartitionsExec: fetch=5",
+ " FilterExec: c3@2 > 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " StreamingTableExec: partition_sizes=1, projection=[c1, c2,
c3], infinite_source=true"
];
assert_eq!(get_plan_string(&after_optimize), expected);
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index 5fb0b98526..dce175d04b 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -69,11 +69,6 @@ async fn explain_analyze_baseline_metrics() {
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
"metrics=[output_rows=99, elapsed_compute="
);
- assert_metrics!(
- &formatted,
- "GlobalLimitExec: skip=0, fetch=3, ",
- "metrics=[output_rows=3, elapsed_compute="
- );
assert_metrics!(
&formatted,
"ProjectionExec: expr=[count(*)",
@@ -101,9 +96,7 @@ async fn explain_analyze_baseline_metrics() {
plan.as_any().downcast_ref::<sorts::sort::SortExec>().is_some()
||
plan.as_any().downcast_ref::<physical_plan::aggregates::AggregateExec>().is_some()
- // CoalescePartitionsExec doesn't do any work so is not included
||
plan.as_any().downcast_ref::<physical_plan::filter::FilterExec>().is_some()
- ||
plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
||
plan.as_any().downcast_ref::<physical_plan::limit::LocalLimitExec>().is_some()
||
plan.as_any().downcast_ref::<physical_plan::projection::ProjectionExec>().is_some()
||
plan.as_any().downcast_ref::<physical_plan::coalesce_batches::CoalesceBatchesExec>().is_some()
diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs
b/datafusion/physical-optimizer/src/limit_pushdown.rs
index bc0b64cdd7..5887cb51a7 100644
--- a/datafusion/physical-optimizer/src/limit_pushdown.rs
+++ b/datafusion/physical-optimizer/src/limit_pushdown.rs
@@ -146,15 +146,6 @@ pub fn pushdown_limit_helper(
global_state.skip = skip;
global_state.fetch = fetch;
- if limit_exec.input().as_any().is::<CoalescePartitionsExec>() {
- // If the child is a `CoalescePartitionsExec`, we should not
remove the limit
- // the push_down through the `CoalescePartitionsExec` to each
partition will not guarantee the limit.
- // TODO: we may have a better solution if we can support
with_fetch for limit inside CoalescePartitionsExec.
- // Follow-up issue:
https://github.com/apache/datafusion/issues/14446
- global_state.satisfied = true;
- return Ok((Transformed::no(pushdown_plan), global_state));
- }
-
// Now the global state has the most recent information, we can remove
// the `LimitExec` plan. We will decide later if we should add it again
// or not.
diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs
b/datafusion/physical-plan/src/coalesce_partitions.rs
index 3900bd1ddc..9a955155c0 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -43,6 +43,8 @@ pub struct CoalescePartitionsExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
cache: PlanProperties,
+ /// Optional number of rows to fetch. Stops producing rows after this fetch
+ pub(crate) fetch: Option<usize>,
}
impl CoalescePartitionsExec {
@@ -53,6 +55,7 @@ impl CoalescePartitionsExec {
input,
metrics: ExecutionPlanMetricsSet::new(),
cache,
+ fetch: None,
}
}
@@ -83,9 +86,12 @@ impl DisplayAs for CoalescePartitionsExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "CoalescePartitionsExec")
- }
+ DisplayFormatType::Default | DisplayFormatType::Verbose => match
self.fetch {
+ Some(fetch) => {
+ write!(f, "CoalescePartitionsExec: fetch={fetch}")
+ }
+ None => write!(f, "CoalescePartitionsExec"),
+ },
}
}
}
@@ -116,9 +122,9 @@ impl ExecutionPlan for CoalescePartitionsExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(Arc::new(CoalescePartitionsExec::new(Arc::clone(
- &children[0],
- ))))
+ let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0]));
+ plan.fetch = self.fetch;
+ Ok(Arc::new(plan))
}
fn execute(
@@ -164,7 +170,11 @@ impl ExecutionPlan for CoalescePartitionsExec {
}
let stream = builder.build();
- Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)))
+ Ok(Box::pin(ObservedStream::new(
+ stream,
+ baseline_metrics,
+ self.fetch,
+ )))
}
}
}
@@ -174,7 +184,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
}
fn statistics(&self) -> Result<Statistics> {
- self.input.statistics()
+ Statistics::with_fetch(self.input.statistics()?, self.schema(),
self.fetch, 0, 1)
}
fn supports_limit_pushdown(&self) -> bool {
@@ -197,8 +207,28 @@ impl ExecutionPlan for CoalescePartitionsExec {
return Ok(None);
}
// CoalescePartitionsExec always has a single child, so zero indexing
is safe.
- make_with_child(projection, projection.input().children()[0])
- .map(|e| Some(Arc::new(CoalescePartitionsExec::new(e)) as _))
+ make_with_child(projection, projection.input().children()[0]).map(|e| {
+ if self.fetch.is_some() {
+ let mut plan = CoalescePartitionsExec::new(e);
+ plan.fetch = self.fetch;
+ Some(Arc::new(plan) as _)
+ } else {
+ Some(Arc::new(CoalescePartitionsExec::new(e)) as _)
+ }
+ })
+ }
+
+ fn fetch(&self) -> Option<usize> {
+ self.fetch
+ }
+
+ fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn
ExecutionPlan>> {
+ Some(Arc::new(CoalescePartitionsExec {
+ input: Arc::clone(&self.input),
+ fetch: limit,
+ metrics: self.metrics.clone(),
+ cache: self.cache.clone(),
+ }))
}
}
diff --git a/datafusion/physical-plan/src/stream.rs
b/datafusion/physical-plan/src/stream.rs
index 331cded165..5c941c76ae 100644
--- a/datafusion/physical-plan/src/stream.rs
+++ b/datafusion/physical-plan/src/stream.rs
@@ -444,18 +444,44 @@ impl Stream for EmptyRecordBatchStream {
pub(crate) struct ObservedStream {
inner: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
+ fetch: Option<usize>,
+ produced: usize,
}
impl ObservedStream {
pub fn new(
inner: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
+ fetch: Option<usize>,
) -> Self {
Self {
inner,
baseline_metrics,
+ fetch,
+ produced: 0,
}
}
+
+ fn limit_reached(
+ &mut self,
+ poll: Poll<Option<Result<RecordBatch>>>,
+ ) -> Poll<Option<Result<RecordBatch>>> {
+ let Some(fetch) = self.fetch else { return poll };
+
+ if self.produced >= fetch {
+ return Poll::Ready(None);
+ }
+
+ if let Poll::Ready(Some(Ok(batch))) = &poll {
+ if self.produced + batch.num_rows() > fetch {
+ let batch = batch.slice(0,
fetch.saturating_sub(self.produced));
+ self.produced += batch.num_rows();
+ return Poll::Ready(Some(Ok(batch)));
+ };
+ self.produced += batch.num_rows()
+ }
+ poll
+ }
}
impl RecordBatchStream for ObservedStream {
@@ -471,7 +497,10 @@ impl Stream for ObservedStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- let poll = self.inner.poll_next_unpin(cx);
+ let mut poll = self.inner.poll_next_unpin(cx);
+ if self.fetch.is_some() {
+ poll = self.limit_reached(poll);
+ }
self.baseline_metrics.record_poll(poll)
}
}
diff --git a/datafusion/physical-plan/src/union.rs
b/datafusion/physical-plan/src/union.rs
index a41336ea6e..91d2f2c9e8 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -237,7 +237,11 @@ impl ExecutionPlan for UnionExec {
if partition < input.output_partitioning().partition_count() {
let stream = input.execute(partition, context)?;
debug!("Found a Union partition to execute");
- return Ok(Box::pin(ObservedStream::new(stream,
baseline_metrics)));
+ return Ok(Box::pin(ObservedStream::new(
+ stream,
+ baseline_metrics,
+ None,
+ )));
} else {
partition -= input.output_partitioning().partition_count();
}
@@ -448,7 +452,11 @@ impl ExecutionPlan for InterleaveExec {
self.schema(),
input_stream_vec,
));
- return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
+ return Ok(Box::pin(ObservedStream::new(
+ stream,
+ baseline_metrics,
+ None,
+ )));
}
warn!("Error in InterleaveExec: Partition {} not found", partition);
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt
b/datafusion/sqllogictest/test_files/aggregate.slt
index c8c544785d..bb2ddf0da4 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -5032,18 +5032,17 @@ logical_plan
03)----Aggregate: groupBy=[[aggregate_test_100.c3]],
aggr=[[min(aggregate_test_100.c1)]]
04)------TableScan: aggregate_test_100 projection=[c1, c3]
physical_plan
-01)GlobalLimitExec: skip=0, fetch=5
-02)--CoalescePartitionsExec
-03)----AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3,
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
-04)------CoalesceBatchesExec: target_batch_size=8192
-05)--------RepartitionExec: partitioning=Hash([c3@0,
min(aggregate_test_100.c1)@1], 4), input_partitions=4
-06)----------AggregateExec: mode=Partial, gby=[c3@0 as c3,
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
-07)------------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3],
aggr=[min(aggregate_test_100.c1)]
-08)--------------CoalesceBatchesExec: target_batch_size=8192
-09)----------------RepartitionExec: partitioning=Hash([c3@0], 4),
input_partitions=4
-10)------------------AggregateExec: mode=Partial, gby=[c3@1 as c3],
aggr=[min(aggregate_test_100.c1)]
-11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-12)----------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c3], has_header=true
+01)CoalescePartitionsExec: fetch=5
+02)--AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3,
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------RepartitionExec: partitioning=Hash([c3@0,
min(aggregate_test_100.c1)@1], 4), input_partitions=4
+05)--------AggregateExec: mode=Partial, gby=[c3@0 as c3,
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
+06)----------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3],
aggr=[min(aggregate_test_100.c1)]
+07)------------CoalesceBatchesExec: target_batch_size=8192
+08)--------------RepartitionExec: partitioning=Hash([c3@0], 4),
input_partitions=4
+09)----------------AggregateExec: mode=Partial, gby=[c3@1 as c3],
aggr=[min(aggregate_test_100.c1)]
+10)------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+11)--------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c3], has_header=true
#
diff --git a/datafusion/sqllogictest/test_files/limit.slt
b/datafusion/sqllogictest/test_files/limit.slt
index 65f35d40fc..a999149418 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -852,9 +852,8 @@ physical_plan
01)ProjectionExec: expr=[foo@0 as foo]
02)--SortExec: TopK(fetch=1000), expr=[part_key@1 ASC NULLS LAST],
preserve_partitioning=[false]
03)----ProjectionExec: expr=[1 as foo, part_key@0 as part_key]
-04)------GlobalLimitExec: skip=0, fetch=1
-05)--------CoalescePartitionsExec
-06)----------ParquetExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]},
projection=[part_key], limit=1
+04)------CoalescePartitionsExec: fetch=1
+05)--------ParquetExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]},
projection=[part_key], limit=1
query I
with selection as (
diff --git a/datafusion/sqllogictest/test_files/repartition.slt
b/datafusion/sqllogictest/test_files/repartition.slt
index 630674bb09..36a326928f 100644
--- a/datafusion/sqllogictest/test_files/repartition.slt
+++ b/datafusion/sqllogictest/test_files/repartition.slt
@@ -121,12 +121,11 @@ logical_plan
02)--Filter: sink_table.c3 > Int16(0)
03)----TableScan: sink_table projection=[c1, c2, c3]
physical_plan
-01)GlobalLimitExec: skip=0, fetch=5
-02)--CoalescePartitionsExec
-03)----CoalesceBatchesExec: target_batch_size=8192, fetch=5
-04)------FilterExec: c3@2 > 0
-05)--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
-06)----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3],
infinite_source=true
+01)CoalescePartitionsExec: fetch=5
+02)--CoalesceBatchesExec: target_batch_size=8192, fetch=5
+03)----FilterExec: c3@2 > 0
+04)------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
+05)--------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3],
infinite_source=true
# Start repratition on empty column test.
# See https://github.com/apache/datafusion/issues/12057
diff --git a/datafusion/sqllogictest/test_files/union.slt
b/datafusion/sqllogictest/test_files/union.slt
index cbd19bf380..5517af5475 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -510,28 +510,27 @@ logical_plan
19)------------Projection: Int64(1) AS c1
20)--------------EmptyRelation
physical_plan
-01)GlobalLimitExec: skip=0, fetch=3
-02)--CoalescePartitionsExec
-03)----UnionExec
-04)------ProjectionExec: expr=[count(*)@0 as cnt]
-05)--------AggregateExec: mode=Final, gby=[], aggr=[count(*)]
-06)----------CoalescePartitionsExec
-07)------------AggregateExec: mode=Partial, gby=[], aggr=[count(*)]
-08)--------------ProjectionExec: expr=[]
-09)----------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[]
-10)------------------CoalesceBatchesExec: target_batch_size=2
-11)--------------------RepartitionExec: partitioning=Hash([c1@0], 4),
input_partitions=4
-12)----------------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[]
-13)------------------------CoalesceBatchesExec: target_batch_size=2
-14)--------------------------FilterExec: c13@1 !=
C2GT5KVyOPZpgKVl110TyZO0NcJ434, projection=[c1@0]
-15)----------------------------RepartitionExec:
partitioning=RoundRobinBatch(4), input_partitions=1
-16)------------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c13], has_header=true
-17)------ProjectionExec: expr=[1 as cnt]
-18)--------PlaceholderRowExec
-19)------ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
-20)--------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name:
"lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)),
end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
-21)----------ProjectionExec: expr=[1 as c1]
-22)------------PlaceholderRowExec
+01)CoalescePartitionsExec: fetch=3
+02)--UnionExec
+03)----ProjectionExec: expr=[count(*)@0 as cnt]
+04)------AggregateExec: mode=Final, gby=[], aggr=[count(*)]
+05)--------CoalescePartitionsExec
+06)----------AggregateExec: mode=Partial, gby=[], aggr=[count(*)]
+07)------------ProjectionExec: expr=[]
+08)--------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[]
+09)----------------CoalesceBatchesExec: target_batch_size=2
+10)------------------RepartitionExec: partitioning=Hash([c1@0], 4),
input_partitions=4
+11)--------------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[]
+12)----------------------CoalesceBatchesExec: target_batch_size=2
+13)------------------------FilterExec: c13@1 !=
C2GT5KVyOPZpgKVl110TyZO0NcJ434, projection=[c1@0]
+14)--------------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+15)----------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c13], has_header=true
+16)----ProjectionExec: expr=[1 as cnt]
+17)------PlaceholderRowExec
+18)----ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
+19)------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "lead(b.c1,Int64(1)) ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
+20)--------ProjectionExec: expr=[1 as c1]
+21)----------PlaceholderRowExec
########
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]