jonmmease opened a new issue, #4673:
URL: https://github.com/apache/arrow-datafusion/issues/4673
**Describe the bug**
In a multi-threaded runtime, it seems the behavior of the `ROW_NUMBER`
window function is not consistent when the input table consists of multiple
partitions.
In particular, it looks like the input partitions are arbitrarily sorted
prior to calculating the `ROW_NUMBER`.
**Example**
The input table has a single column (`a`)
```
+---+
| a |
+---+
| 1 |
| 1 |
| 1 |
| 1 |
| 2 |
| 2 |
| 2 |
| 2 |
| 3 |
| 3 |
| 3 |
| 3 |
| 4 |
| 4 |
| 4 |
| 4 |
+---+
```
The query adds a `ROW_NUMBER() as row_num` column and then groups by `a`,
keeping the minimum `row_num` for each group.
```sql
SELECT a, min(row_num) as min_row_num
from (SELECT a, ROW_NUMBER() OVER () AS "row_num" from tbl)
GROUP BY a
ORDER BY a
```
The expected result for this query (And the result when the input consists
of a single record partition) is
```
+---+-------------+
| a | min_row_num |
+---+-------------+
| 1 | 1 |
| 2 | 5 |
| 3 | 9 |
| 4 | 13 |
+---+-------------+
```
The issue is that this is not always the result when the input table
consists of multiple partitions and DataFusion is running in a multi-threaded
runtime.
Here are some representative results in this case:
```
+---+-------------+
| a | min_row_num |
+---+-------------+
| 1 | 9 |
| 2 | 13 |
| 3 | 5 |
| 4 | 1 |
+---+-------------+
```
```
+---+-------------+
| a | min_row_num |
+---+-------------+
| 1 | 9 |
| 2 | 1 |
| 3 | 13 |
| 4 | 5 |
+---+-------------+
```
It looks like the partitions are randomly ordered before the ROW_NUMBER
calculation is performed.
**To Reproduce**
Here is a full Rust example that reproduces the issue.
```rust
use std::sync::Arc;
use datafusion::arrow::array::{ArrayRef, Int64Array};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::datasource::MemTable;
use datafusion::prelude::*;
#[tokio::test(flavor = "multi_thread")]
async fn row_number_over_partitions() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true)
]));
let make_batch = |a: i64| -> RecordBatch {
let array = Arc::new(Int64Array::from(vec![a; 4])) as ArrayRef;
RecordBatch::try_new(schema.clone(), vec![array]).unwrap()
};
// // Single partition with four batches (Correct result)
// let batches = vec![vec![make_batch(1), make_batch(2), make_batch(3),
make_batch(4)]];
// Four partitions, each with one batch (Incorrect result)
let batches = vec![vec![make_batch(1)], vec![make_batch(2)],
vec![make_batch(3)], vec![make_batch(4)]];
let flat_batches: Vec<_> = batches.iter().flat_map(|v|
v.clone()).collect();
println!("{}", pretty_format_batches(flat_batches.as_slice()).unwrap());
let mem_table = MemTable::try_new(schema, batches).unwrap();
let ctx = SessionContext::new();
ctx.register_table("tbl", Arc::new(mem_table)).unwrap();
let res = ctx.sql(r#"
SELECT a, min(row_num) as min_row_num
from (SELECT a, ROW_NUMBER() OVER () AS "row_num" from tbl)
GROUP BY a
ORDER BY a
"#).await.unwrap().collect().await.unwrap();
let formatted = pretty_format_batches(res.as_slice()).unwrap();
println!("{}", formatted);
assert_eq!(formatted.to_string(), "\
+---+-------------+
| a | min_row_num |
+---+-------------+
| 1 | 1 |
| 2 | 5 |
| 3 | 9 |
| 4 | 13 |
+---+-------------+");
}
```
**Expected behavior**
I expect the test case above to pass
**Additional context**
The error is only present when the tokio `multi_thread` flavor is used. The
correct results are returned when `flavor = "current_thread"`
I'd be happy to dig in a bit more if someone could point me in the right
direction!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]