devinjdangelo opened a new issue, #7145:
URL: https://github.com/apache/arrow-datafusion/issues/7145
### Describe the bug
I've noticed that when reading from a CSV source table, explicit calls to
`DataFrame.repartition(Partitioning::RoundRobinBatch(n))` behave differently
vs. when reading from a JSON or Parquet source. For some reason when reading
from CSV, the total number of batches generated is often less than `n`. This
means we end up with empty partitions, which I think is surprising from a user
perspective. If I explicitly request `n` partitions, I should expect `n`
nonempty partitions if the source has sufficient data.
I would also expect to see consistent behavior regardless of the type of
source table I am reading from.
### To Reproduce
First example, read from parquet and write to csv behaves as expected:
```rust
#[tokio::main]
async fn main() -> Result<()> {
// create local execution context
let ctx = SessionContext::new();
let local = Arc::new(LocalFileSystem::new());
let local_url = Url::parse("file://local").unwrap();
ctx.runtime_env().register_object_store(&local_url, local);
ctx.register_parquet(
"source",
"file://local/home/dev/dftest/test5.parquet",
ParquetReadOptions::default(),
)
.await?;
let path = "file://local/home/dev/dftest/testdata/";
let df = ctx
.sql("select * from source")
.await?
.repartition(Partitioning::RoundRobinBatch(10))?;
let in_path = "file://local/home/dev/dftest/csv_data/";
df.write_csv(in_path).await?;
```
```bash
ls -lh csv_data_sink
total 70M
-rw-rw-r-- 1 dev dev 7.5M Jul 30 12:53 part-0.csv
-rw-rw-r-- 1 dev dev 7.5M Jul 30 12:53 part-1.csv
-rw-rw-r-- 1 dev dev 6.9M Jul 30 12:53 part-2.csv
-rw-rw-r-- 1 dev dev 6.9M Jul 30 12:53 part-3.csv
-rw-rw-r-- 1 dev dev 6.9M Jul 30 12:53 part-4.csv
-rw-rw-r-- 1 dev dev 6.9M Jul 30 12:53 part-5.csv
-rw-rw-r-- 1 dev dev 6.9M Jul 30 12:53 part-6.csv
-rw-rw-r-- 1 dev dev 6.9M Jul 30 12:53 part-7.csv
-rw-rw-r-- 1 dev dev 6.9M Jul 30 12:53 part-8.csv
-rw-rw-r-- 1 dev dev 6.9M Jul 30 12:53 part-9.csv
```
Second example is nearly identical, except we make the source data CSV
instead.
```rust
#[tokio::main]
async fn main() -> Result<()> {
// create local execution context
let ctx = SessionContext::new();
let local = Arc::new(LocalFileSystem::new());
let local_url = Url::parse("file://local").unwrap();
ctx.runtime_env().register_object_store(&local_url, local);
ctx.register_csv(
"source",
"file://local/home/dev/dftest/test5.csv",
CsvReadOptions::default(),
)
.await?;
let path = "file://local/home/dev/dftest/testdata/";
let df = ctx
.sql("select * from source")
.await?
.repartition(Partitioning::RoundRobinBatch(10))?;
let in_path = "file://local/home/dev/dftest/csv_data/";
df.write_csv(in_path).await?;
```
Now, we see many empty partitions written out.
```bash
ls -lh csv_data_sink
total 70M
-rw-rw-r-- 1 dev dev 18M Jul 30 12:53 part-0.csv
-rw-rw-r-- 1 dev dev 17M Jul 30 12:53 part-1.csv
-rw-rw-r-- 1 dev dev 18M Jul 30 12:53 part-2.csv
-rw-rw-r-- 1 dev dev 16M Jul 30 12:53 part-3.csv
-rw-rw-r-- 1 dev dev 1.9M Jul 30 12:53 part-4.csv
-rw-rw-r-- 1 dev dev 0 Jul 30 12:53 part-5.csv
-rw-rw-r-- 1 dev dev 0 Jul 30 12:53 part-6.csv
-rw-rw-r-- 1 dev dev 0 Jul 30 12:53 part-7.csv
-rw-rw-r-- 1 dev dev 0 Jul 30 12:53 part-8.csv
-rw-rw-r-- 1 dev dev 0 Jul 30 12:53 part-9.csv
```
### Expected behavior
The examples above should behave the same regardless of if the source is CSV
or not. To me, it makes sense for `RoundRobinBatch(n)` to ensure there is a
minimum of `n` batches so there can be `n` nonempty partitions.
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]