daphnenhuch-at opened a new issue, #15833: URL: https://github.com/apache/datafusion/issues/15833
### Describe the bug
I have a query which sorts the data by a column called "userPrimaryKey" and
then using a windowing function to add a row number column to the data frame.
I've set `target_partitions` to 8 to have a relatively efficient use of
parallelism. However, when the query comes back, round robin partitioning is
causing the record batches to be returned in an unsorted order. I have 10,000
records and the result has records 8192 through 9999 followed by 0 through
8191. I would expect the result to be fully sorted. This is fixed by setting
`datafusion.optimizer.enable_round_robin_repartition` to `false`. But I think
the query plan is making the wrong decision to Repartition the data after the
Bounded window function.
I've included a test which reproduces this bug as well as the query plan
produced by the query.
### To Reproduce
Run the following test
```
#[cfg(test)]
mod tests {
use deltalake::arrow::array::Int32Array;
use deltalake::datafusion::prelude::SessionContext;
use rand::distributions::Alphanumeric;
use rand::Rng;
use std::sync::Arc;
use deltalake::datafusion::logical_expr::{ident, lit, ExprSchemable};
use deltalake::{
arrow::datatypes::DataType as ArrowDataType,
arrow::{
array::{BooleanArray, Int64Array, RecordBatch, StringArray},
datatypes::{DataType, Field, Schema},
},
datafusion::{
datasource::MemTable,
functions_window::expr_fn::row_number,
logical_expr::{expr::WildcardOptions, utils::expand_wildcard},
sql::sqlparser::ast::ExcludeSelectItem,
},
};
use napi::anyhow::Result;
fn generate_random_id() -> String {
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(10)
.map(char::from)
.collect()
}
fn get_session_context() -> SessionContext {
use std::sync::Arc;
use deltalake::datafusion::{
execution::{
disk_manager::DiskManagerConfig, memory_pool::FairSpillPool,
runtime_env::RuntimeEnvBuilder,
},
prelude::{SessionConfig, SessionContext},
};
let memory_pool_size_in_bytes = 100000000;
let memory_pool = FairSpillPool::new(memory_pool_size_in_bytes);
let runtime = Arc::new(
RuntimeEnvBuilder::new()
// We disable disk manager to avoid potentially writing
temporary files to disk, which
// would violate conditions of EKM
.with_disk_manager(DiskManagerConfig::Disabled)
.with_memory_pool(Arc::new(memory_pool))
.build()
.unwrap(),
);
SessionContext::new_with_config_rt(
SessionConfig::default()
.set_bool("datafusion.execution.parquet.pushdown_filters",
true)
.set_bool("datafusion.execution.parquet.reorder_filters",
true)
.set_usize("datafusion.execution.target_partitions", 2)
.set_bool("datafusion.execution.keep_partition_by_columns",
false)
.set_usize("datafusion.execution.parquet.metadata_size_hint", 512 << 10),
runtime,
)
}
fn create_record_batch(schema: Arc<Schema>, start: i32, end: i32) ->
RecordBatch {
let mut column_1_values = vec![];
let mut column_2_values = vec![];
let mut primary_keys = vec![];
let mut ids = vec![];
let mut last_update_times = vec![];
let mut is_deleted_values = vec![];
for i in start..end {
column_1_values.push(format!("{:04}", i));
column_2_values.push("a");
primary_keys.push(format!("{:04}", i));
ids.push(generate_random_id());
last_update_times.push(chrono::Utc::now().timestamp_millis());
is_deleted_values.push(false);
}
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(column_1_values)),
Arc::new(StringArray::from(column_2_values)),
Arc::new(StringArray::from(primary_keys)),
Arc::new(StringArray::from(ids)),
Arc::new(Int64Array::from(last_update_times)),
Arc::new(BooleanArray::from(is_deleted_values)),
],
)
.unwrap();
return batch;
}
#[tokio::test(flavor = "multi_thread")]
async fn simple_reproduction() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("1", DataType::Utf8, true),
Field::new("2", DataType::Utf8, true),
Field::new("userPrimaryKey", DataType::Utf8, false),
Field::new("id", DataType::Utf8, false),
Field::new("lastUpdateTime", DataType::Int64, false),
Field::new("isDeleted", DataType::Boolean, false),
]));
let mut batches: Vec<Vec<RecordBatch>> = vec![];
let mut batch: RecordBatch = create_record_batch(schema.clone(), 0,
8192);
batches.push(vec![batch]);
let mut batch: RecordBatch = create_record_batch(schema.clone(),
8192, 10000);
batches.push(vec![batch]);
dbg!(&batches);
let table = MemTable::try_new(schema.clone(), batches).unwrap();
let ctx = get_session_context();
ctx.register_table("my_table", Arc::new(table)).unwrap();
let row_number_sub_query = ctx
.table("my_table")
.await?
.sort(vec![ident("userPrimaryKey").sort(true, true)])?
.window(vec![row_number().alias("dataFusionRowNumber")])?;
let column_names_to_exclude = vec!["dataFusionRowNumber".into()];
let mut columns_to_select_exprs = expand_wildcard(
row_number_sub_query.schema(),
row_number_sub_query.logical_plan(),
Some(&WildcardOptions {
ilike: None,
exclude:
Some(ExcludeSelectItem::Multiple(column_names_to_exclude)),
except: None,
replace: None,
rename: None,
}),
)?;
columns_to_select_exprs.push(
(ident("dataFusionRowNumber") - lit(1))
.cast_to(&ArrowDataType::Int32,
row_number_sub_query.schema())?
.alias_qualified(Some("my_table"), "fileRowNumber"),
);
let result = row_number_sub_query.select(columns_to_select_exprs)?;
let batches = result.collect().await?;
let mut counter = 0;
for batch in batches {
let primary_key_column = batch
.column_by_name("userPrimaryKey")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let file_row_number_column = batch
.column_by_name("fileRowNumber")
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
for i in 0..batch.num_rows() {
let user_primary_key =
primary_key_column.value(i).to_string();
let file_row_number = file_row_number_column.value(i) as i32;
assert_eq!(file_row_number, counter);
assert_eq!(user_primary_key, format!("{:04}", counter));
counter += 1;
}
}
Ok(())
}
}
```
### Expected behavior
I expect the data to come back fully sorted from 0000 through 9999 with the
corresponding row number.
### Additional context
This is the query plan produced. I think that we should either not be
repartitioning the data or should use a `SoftPreservingMergeExec` such that the
streams come back in sorted order.
```
CoalescePartitionsExec {
input: ProjectionExec {
expr: [
(
Column {
name: "1",
index: 0,
},
"1",
),
(
Column {
name: "2",
index: 1,
},
"2",
),
(
Column {
name: "userPrimaryKey",
index: 2,
},
"userPrimaryKey",
),
(
Column {
name: "id",
index: 3,
},
"id",
),
(
Column {
name: "lastUpdateTime",
index: 4,
},
"lastUpdateTime",
),
(
Column {
name: "isDeleted",
index: 5,
},
"isDeleted",
),
(
BinaryExpr {
left: CastExpr {
expr: Column {
name: "dataFusionRowNumber",
index: 6,
},
cast_type: Int32,
cast_options: CastOptions {
safe: false,
format_options: FormatOptions {
safe: true,
null: "",
date_format: None,
datetime_format: None,
timestamp_format: None,
timestamp_tz_format: None,
time_format: None,
duration_format: Pretty,
},
},
},
op: Minus,
right: Literal {
value: Int32(1),
},
fail_on_overflow: false,
},
"fileRowNumber",
),
],
schema: Schema {
fields: [
Field {
name: "1",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "2",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "userPrimaryKey",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "id",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "lastUpdateTime",
data_type: Int64,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "isDeleted",
data_type: Boolean,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "fileRowNumber",
data_type: Int32,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
],
metadata: {},
},
input: RepartitionExec {
input: BoundedWindowAggExec {
input: SortExec {
input: DataSourceExec {
data_source: FileScanConfig
{object_store_url=ObjectStoreUrl { url: Url { scheme: "s3", cannot_be_a_base:
false, username: "", password: None, host:
Some(Domain("airtable-enterprise-data-tables-us-west-2-development")), port:
None, path: "/", query: None, fragment: None } }, statistics=Statistics {
num_rows: Exact(10000), total_byte_size: Exact(521607), column_statistics:
[ColumnStatistics { null_count: Exact(0), max_value: Exact(Utf8("9999")),
min_value: Exact(Utf8("0000")), sum_value: Absent, distinct_count: Absent },
ColumnStatistics { null_count: Exact(0), max_value: Exact(Utf8("a")),
min_value: Exact(Utf8("a")), sum_value: Absent, distinct_count: Absent },
ColumnStatistics { null_count: Exact(0), max_value: Exact(Utf8("9999")),
min_value: Exact(Utf8("0000")), sum_value: Absent, distinct_count: Absent },
ColumnStatistics { null_count: Exact(0), max_value:
Exact(Utf8("dtr00000009999541")), min_value: Exact(Utf8("dtr00000000000175")),
sum_value: Absent, d
istinct_count: Absent }, ColumnStatistics { null_count: Exact(0), max_value:
Exact(Int64(10000)), min_value: Exact(Int64(1)), sum_value: Absent,
distinct_count: Absent }, ColumnStatistics { null_count: Exact(0), max_value:
Exact(Boolean(false)), min_value: Exact(Boolean(false)), sum_value: Absent,
distinct_count: Absent }, ColumnStatistics { null_count: Exact(10000),
max_value: Exact(Int32(NULL)), min_value: Exact(Int32(NULL)), sum_value:
Absent, distinct_count: Absent }] }, file_groups={1 group:
[[entRDQybgmzURTXUd/edtcuTVRzNDkbys5t/optimize_42QnfFpBs8/2za67O8nb1.zstd.parquet/partitioning_column=0/XtwrzQJVtMmjo88H.parquet]]},
projection=[1, 2, userPrimaryKey, id, lastUpdateTime, isDeleted]},
cache: PlanProperties {
eq_properties: EquivalenceProperties {
eq_group: EquivalenceGroup {
classes: [],
},
oeq_class: OrderingEquivalenceClass {
orderings: [],
},
constants: [],
constraints: Constraints {
inner: [],
},
schema: Schema {
fields: [
Field {
name: "1",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "2",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "userPrimaryKey",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "id",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "lastUpdateTime",
data_type: Int64,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "isDeleted",
data_type: Boolean,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
],
metadata: {},
},
},
partitioning: UnknownPartitioning(
1,
),
emission_type: Incremental,
boundedness: Bounded,
output_ordering: None,
},
},
expr: LexOrdering {
inner: [
PhysicalSortExpr {
expr: Column {
name: "userPrimaryKey",
index: 2,
},
options: SortOptions {
descending: false,
nulls_first: true,
},
},
],
},
metrics_set: ExecutionPlanMetricsSet {
inner: Mutex {
data: MetricsSet {
metrics: [],
},
},
},
preserve_partitioning: false,
fetch: None,
cache: PlanProperties {
eq_properties: EquivalenceProperties {
eq_group: EquivalenceGroup {
classes: [],
},
oeq_class: OrderingEquivalenceClass {
orderings: [
LexOrdering {
inner: [
PhysicalSortExpr {
expr: Column {
name: "userPrimaryKey",
index: 2,
},
options: SortOptions {
descending: false,
nulls_first: true,
},
},
],
},
],
},
constants: [],
constraints: Constraints {
inner: [],
},
schema: Schema {
fields: [
Field {
name: "1",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "2",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "userPrimaryKey",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "id",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "lastUpdateTime",
data_type: Int64,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "isDeleted",
data_type: Boolean,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
],
metadata: {},
},
},
partitioning: UnknownPartitioning(
1,
),
emission_type: Final,
boundedness: Bounded,
output_ordering: Some(
LexOrdering {
inner: [
PhysicalSortExpr {
expr: Column {
name: "userPrimaryKey",
index: 2,
},
options: SortOptions {
descending: false,
nulls_first: true,
},
},
],
},
),
},
},
window_expr: [
StandardWindowExpr {
expr: WindowUDFExpr {
fun: WindowUDF {
inner: RowNumber {
signature: Signature {
type_signature: Nullary,
volatility: Immutable,
},
},
},
args: [],
name: "dataFusionRowNumber",
input_types: [],
is_reversed: false,
ignore_nulls: false,
},
partition_by: [],
order_by: LexOrdering {
inner: [],
},
window_frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)),
is_causal: false },
},
],
schema: Schema {
fields: [
Field {
name: "1",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "2",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "userPrimaryKey",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "id",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "lastUpdateTime",
data_type: Int64,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "isDeleted",
data_type: Boolean,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "dataFusionRowNumber",
data_type: UInt64,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
],
metadata: {},
},
metrics: ExecutionPlanMetricsSet {
inner: Mutex {
data: MetricsSet {
metrics: [],
},
},
},
input_order_mode: Sorted,
ordered_partition_by_indices: [],
cache: PlanProperties {
eq_properties: EquivalenceProperties {
eq_group: EquivalenceGroup {
classes: [],
},
oeq_class: OrderingEquivalenceClass {
orderings: [
LexOrdering {
inner: [
PhysicalSortExpr {
expr: Column {
name: "userPrimaryKey",
index: 2,
},
options: SortOptions {
descending: false,
nulls_first: true,
},
},
],
},
LexOrdering {
inner: [
PhysicalSortExpr {
expr: Column {
name: "dataFusionRowNumber",
index: 6,
},
options: SortOptions {
descending: false,
nulls_first: false,
},
},
],
},
],
},
constants: [],
constraints: Constraints {
inner: [],
},
schema: Schema {
fields: [
Field {
name: "1",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "2",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "userPrimaryKey",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "id",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "lastUpdateTime",
data_type: Int64,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "isDeleted",
data_type: Boolean,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "dataFusionRowNumber",
data_type: UInt64,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
],
metadata: {},
},
},
partitioning: UnknownPartitioning(
1,
),
emission_type: Final,
boundedness: Bounded,
output_ordering: Some(
LexOrdering {
inner: [
PhysicalSortExpr {
expr: Column {
name: "userPrimaryKey",
index: 2,
},
options: SortOptions {
descending: false,
nulls_first: true,
},
},
PhysicalSortExpr {
expr: Column {
name: "dataFusionRowNumber",
index: 6,
},
options: SortOptions {
descending: false,
nulls_first: false,
},
},
],
},
),
},
can_repartition: false,
},
state: OnceCell {
value: None,
},
metrics: ExecutionPlanMetricsSet {
inner: Mutex {
data: MetricsSet {
metrics: [],
},
},
},
preserve_order: false,
cache: PlanProperties {
eq_properties: EquivalenceProperties {
eq_group: EquivalenceGroup {
classes: [],
},
oeq_class: OrderingEquivalenceClass {
orderings: [
LexOrdering {
inner: [
PhysicalSortExpr {
expr: Column {
name: "userPrimaryKey",
index: 2,
},
options: SortOptions {
descending: false,
nulls_first: true,
},
},
],
},
LexOrdering {
inner: [
PhysicalSortExpr {
expr: Column {
name: "dataFusionRowNumber",
index: 6,
},
options: SortOptions {
descending: false,
nulls_first: false,
},
},
],
},
],
},
constants: [],
constraints: Constraints {
inner: [],
},
schema: Schema {
fields: [
Field {
name: "1",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "2",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "userPrimaryKey",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "id",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "lastUpdateTime",
data_type: Int64,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "isDeleted",
data_type: Boolean,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "dataFusionRowNumber",
data_type: UInt64,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
],
metadata: {},
},
},
partitioning: RoundRobinBatch(
8,
),
emission_type: Final,
boundedness: Bounded,
output_ordering: Some(
LexOrdering {
inner: [
PhysicalSortExpr {
expr: Column {
name: "userPrimaryKey",
index: 2,
},
options: SortOptions {
descending: false,
nulls_first: true,
},
},
PhysicalSortExpr {
expr: Column {
name: "dataFusionRowNumber",
index: 6,
},
options: SortOptions {
descending: false,
nulls_first: false,
},
},
],
},
),
},
},
metrics: ExecutionPlanMetricsSet {
inner: Mutex {
data: MetricsSet {
metrics: [],
},
},
},
cache: PlanProperties {
eq_properties: EquivalenceProperties {
eq_group: EquivalenceGroup {
classes: [],
},
oeq_class: OrderingEquivalenceClass {
orderings: [
LexOrdering {
inner: [
PhysicalSortExpr {
expr: Column {
name: "userPrimaryKey",
index: 2,
},
options: SortOptions {
descending: false,
nulls_first: true,
},
},
],
},
LexOrdering {
inner: [
PhysicalSortExpr {
expr: Column {
name: "fileRowNumber",
index: 6,
},
options: SortOptions {
descending: false,
nulls_first: false,
},
},
],
},
],
},
constants: [],
constraints: Constraints {
inner: [],
},
schema: Schema {
fields: [
Field {
name: "1",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "2",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "userPrimaryKey",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "id",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "lastUpdateTime",
data_type: Int64,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "isDeleted",
data_type: Boolean,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "fileRowNumber",
data_type: Int32,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
],
metadata: {},
},
},
partitioning: RoundRobinBatch(
8,
),
emission_type: Final,
boundedness: Bounded,
output_ordering: Some(
LexOrdering {
inner: [
PhysicalSortExpr {
expr: Column {
name: "userPrimaryKey",
index: 2,
},
options: SortOptions {
descending: false,
nulls_first: true,
},
},
PhysicalSortExpr {
expr: Column {
name: "fileRowNumber",
index: 6,
},
options: SortOptions {
descending: false,
nulls_first: false,
},
},
],
},
),
},
},
metrics: ExecutionPlanMetricsSet {
inner: Mutex {
data: MetricsSet {
metrics: [],
},
},
},
cache: PlanProperties {
eq_properties: EquivalenceProperties {
eq_group: EquivalenceGroup {
classes: [],
},
oeq_class: OrderingEquivalenceClass {
orderings: [],
},
constants: [],
constraints: Constraints {
inner: [],
},
schema: Schema {
fields: [
Field {
name: "1",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "2",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "userPrimaryKey",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "id",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "lastUpdateTime",
data_type: Int64,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "isDeleted",
data_type: Boolean,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "fileRowNumber",
data_type: Int32,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
],
metadata: {},
},
},
partitioning: UnknownPartitioning(
1,
),
emission_type: Final,
boundedness: Bounded,
output_ordering: None,
},
fetch: None,
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
