andrei-ionescu opened a new issue #1404: URL: https://github.com/apache/arrow-datafusion/issues/1404
**Describe the bug** Using `Partitioning::Hash` results in wrong partitions. **To Reproduce** 1. Create a new cargo project 2. Create the `data` directory on the same level with the `src` directory 3. Download the attached archive [data-dimension-vehicle-20210609T222533Z-vehicle-electric_gas_steam_3cols.parquet.zip](https://github.com/apache/arrow-datafusion/files/7660549/data-dimension-vehicle-20210609T222533Z-vehicle-electric_gas_steam_3cols.parquet.zip) and unpack it in `data` directory 4. Add the following dependencies in `Cargo.toml` ```rust [dependencies] tokio = "1.14" arrow = "6.0" datafusion = "6.0" ``` 5. Place the following code in the `main.rs` ```rust use arrow::{ record_batch::RecordBatch, util::display::array_value_to_string}; use datafusion::{ prelude::*, error::Result, logical_plan::count_distinct}; const SOURCE_DATA_PATH: &str = "./data/data-dimension-vehicle-20210609T222533Z-vehicle-electric_gas_steam_3cols.parquet"; #[tokio::main] async fn main() -> Result<()> { let mut ctx = ExecutionContext::new(); // Define partitioning let partitioning_columns = vec![ col("nlm_dimension_load_date"), col("src_fuel_type")]; // Read source parquet file let df = ctx.read_parquet(SOURCE_DATA_PATH).await?; // Manual partitioning let concat_cols = vec![ col("nlm_dimension_load_date"), lit(" / "), col("src_fuel_type")]; // Get info for the given partitions let df_info = df .aggregate( partitioning_columns.clone(), vec![ count_distinct(concat(&concat_cols[..])).alias("c"), count(concat(&concat_cols[..])).alias("rows_in_part") ])?; println!(""); println!("Data distribution by partition"); println!("=============================="); println!(""); df_info.show().await?; println!(""); // Determine the total number of partitions let num_partitions_rec = df_info .aggregate( vec![col("c")], vec![sum(col("c")).alias("total")])? .select( vec![col("total")])? .collect().await?; // Should be 72 let total_partitions = array_value_to_string( num_partitions_rec .iter() .filter(|b| b.num_rows() > 0) .collect::<Vec<&RecordBatch>>()[0] .column(0), 0 )?.parse::<usize>().unwrap(); println!("Total number of partitions: {}", total_partitions); println!("=============================="); println!(""); // Partition the dataframe with hash partitioning with 72 desired partitions let partitioned_df = df .repartition(Partitioning::Hash( partitioning_columns, total_partitions))?; // 72 // Collected partitions let collected_partitions = partitioned_df .collect_partitioned() .await?; println!("Collected partitions"); println!("===================="); println!(""); collected_partitions.iter().enumerate().for_each(|(i, p)| { let mut num_rows: usize = 0; for i in p { num_rows += i.num_rows(); } println!("Partition={}, rows_in_part={}", i, num_rows); }); Ok(()) } ``` 6. Execute it with `cargo run`. 7. The result is bellow: ``` Data distribution by partition ============================== +-------------------------+---------------+---+--------------+ | nlm_dimension_load_date | src_fuel_type | c | rows_in_part | +-------------------------+---------------+---+--------------+ | 2020-10-18T00:29:41Z | Steam | 1 | 14 | | 2021-06-09T00:32:40Z | Gas | 1 | 5 | | 2021-06-09T00:35:51Z | Electric | 1 | 479 | | 2020-10-18T00:52:38Z | Steam | 1 | 7 | | 2020-10-18T00:12:55Z | Gas | 1 | 3 | | 2020-10-18T00:52:38Z | Electric | 1 | 259 | | 2021-06-09T00:08:27Z | Gas | 1 | 15 | | 2021-06-08T23:55:20Z | Gas | 1 | 7 | | 2021-06-09T00:35:51Z | Steam | 1 | 2 | | 2021-06-09T00:26:40Z | Electric | 1 | 160 | | 2020-10-18T00:04:58Z | Electric | 1 | 36 | | 2021-06-09T00:13:39Z | Electric | 1 | 47 | | 2020-10-18T00:29:41Z | Gas | 1 | 1 | | 2021-06-09T00:41:20Z | Steam | 1 | 4 | | 2020-10-18T00:35:16Z | Electric | 1 | 131 | | 2020-10-18T00:18:03Z | Steam | 1 | 8 | | 2021-06-09T00:23:35Z | Gas | 1 | 4 | | 2021-06-09T00:20:35Z | Electric | 1 | 54 | | 2021-06-09T00:41:20Z | Gas | 1 | 8 | | 2020-10-18T00:35:16Z | Steam | 1 | 27 | | 2020-10-18T00:18:03Z | Electric | 1 | 88 | | 2021-06-09T00:20:35Z | Steam | 1 | 2 | | 2021-06-09T00:26:40Z | Gas | 1 | 7 | | 2021-06-09T22:15:47Z | Electric | 1 | 943 | | 2020-10-18T00:23:47Z | Gas | 1 | 2 | | 2021-06-09T00:32:40Z | Steam | 1 | 2 | | 2020-10-18T00:29:41Z | Electric | 1 | 89 | | 2021-06-09T22:15:47Z | Gas | 1 | 35 | | 2020-10-18T00:46:52Z | Gas | 1 | 2 | | 2021-06-09T00:43:47Z | Gas | 1 | 20 | | 2021-06-09T00:32:40Z | Electric | 1 | 304 | | 2021-06-09T22:15:47Z | Steam | 1 | 9 | | 2020-10-18T00:04:58Z | Steam | 1 | 5 | | 2021-06-09T00:41:20Z | Electric | 1 | 203 | | 2021-06-09T00:23:35Z | Electric | 1 | 56 | | 2021-06-09T00:26:40Z | Steam | 1 | 5 | | 2021-06-09T00:13:39Z | Steam | 1 | 2 | | 2020-10-18T00:23:47Z | Electric | 1 | 114 | | 2020-10-18T00:52:38Z | Gas | 1 | 2 | | 2020-10-18T00:46:52Z | Steam | 1 | 9 | | 2021-06-09T00:43:47Z | Steam | 1 | 4 | | 2020-10-18T00:46:52Z | Electric | 1 | 246 | | 2021-06-09T00:43:47Z | Electric | 1 | 410 | | 2020-10-18T00:23:47Z | Steam | 1 | 10 | | 2021-06-09T00:35:51Z | Gas | 1 | 10 | | 2020-10-18T00:12:55Z | Electric | 1 | 86 | | 2021-06-09T00:08:27Z | Steam | 1 | 22 | | 2020-10-18T00:06:36Z | Gas | 1 | 2 | | 2021-06-09T00:16:33Z | Gas | 1 | 6 | | 2021-06-09T00:38:39Z | Gas | 1 | 19 | | 2021-06-08T23:55:20Z | Steam | 1 | 13 | | 2020-10-18T00:40:11Z | Gas | 1 | 1 | | 2021-06-09T00:08:27Z | Electric | 1 | 472 | | 2021-06-08T23:55:20Z | Electric | 1 | 234 | | 2021-06-09T00:29:31Z | Gas | 1 | 10 | | 2021-06-09T00:02:37Z | Gas | 1 | 13 | | 2020-10-18T00:12:55Z | Steam | 1 | 9 | | 2021-06-09T00:38:39Z | Electric | 1 | 256 | | 2020-10-18T00:06:36Z | Electric | 1 | 136 | | 2020-10-18T00:40:11Z | Electric | 1 | 209 | | 2021-06-09T00:02:37Z | Steam | 1 | 18 | | 2020-10-18T00:18:03Z | Gas | 1 | 1 | | 2021-06-09T00:29:31Z | Steam | 1 | 7 | | 2021-06-09T00:16:33Z | Electric | 1 | 54 | | 2020-10-18T00:40:11Z | Steam | 1 | 48 | | 2021-06-09T00:02:37Z | Electric | 1 | 571 | | 2020-10-18T00:35:16Z | Gas | 1 | 2 | | 2021-06-09T00:29:31Z | Electric | 1 | 223 | | 2020-10-18T00:06:36Z | Steam | 1 | 10 | | 2021-06-09T00:38:39Z | Steam | 1 | 7 | | 2021-06-09T00:20:35Z | Gas | 1 | 4 | | 2021-06-09T00:16:33Z | Steam | 1 | 1 | +-------------------------+---------------+---+--------------+ Total number of partitions: 72 ============================== Collected partitions ==================== Partition=0, rows_in_part=6 Partition=1, rows_in_part=7 Partition=2, rows_in_part=23 Partition=3, rows_in_part=0 Partition=4, rows_in_part=0 Partition=5, rows_in_part=208 Partition=6, rows_in_part=8 Partition=7, rows_in_part=227 Partition=8, rows_in_part=236 Partition=9, rows_in_part=0 Partition=10, rows_in_part=2 Partition=11, rows_in_part=2 Partition=12, rows_in_part=0 Partition=13, rows_in_part=63 Partition=14, rows_in_part=88 Partition=15, rows_in_part=0 Partition=16, rows_in_part=584 Partition=17, rows_in_part=0 Partition=18, rows_in_part=10 Partition=19, rows_in_part=0 Partition=20, rows_in_part=0 Partition=21, rows_in_part=0 Partition=22, rows_in_part=61 Partition=23, rows_in_part=7 Partition=24, rows_in_part=32 Partition=25, rows_in_part=7 Partition=26, rows_in_part=114 Partition=27, rows_in_part=29 Partition=28, rows_in_part=0 Partition=29, rows_in_part=0 Partition=30, rows_in_part=6 Partition=31, rows_in_part=231 Partition=32, rows_in_part=0 Partition=33, rows_in_part=18 Partition=34, rows_in_part=0 Partition=35, rows_in_part=943 Partition=36, rows_in_part=0 Partition=37, rows_in_part=0 Partition=38, rows_in_part=131 Partition=39, rows_in_part=260 Partition=40, rows_in_part=0 Partition=41, rows_in_part=479 Partition=42, rows_in_part=0 Partition=43, rows_in_part=4 Partition=44, rows_in_part=0 Partition=45, rows_in_part=203 Partition=46, rows_in_part=27 Partition=47, rows_in_part=0 Partition=48, rows_in_part=10 Partition=49, rows_in_part=2 Partition=50, rows_in_part=0 Partition=51, rows_in_part=304 Partition=52, rows_in_part=35 Partition=53, rows_in_part=40 Partition=54, rows_in_part=0 Partition=55, rows_in_part=64 Partition=56, rows_in_part=19 Partition=57, rows_in_part=0 Partition=58, rows_in_part=0 Partition=59, rows_in_part=0 Partition=60, rows_in_part=89 Partition=61, rows_in_part=5 Partition=62, rows_in_part=8 Partition=63, rows_in_part=137 Partition=64, rows_in_part=0 Partition=65, rows_in_part=259 Partition=66, rows_in_part=656 Partition=67, rows_in_part=0 Partition=68, rows_in_part=19 Partition=69, rows_in_part=0 Partition=70, rows_in_part=0 Partition=71, rows_in_part=621 ``` 7. The following **WRONG** things can be observer: - some of the collected partitions are empty, they have no rows - the number of rows in the collected partitions do no match the source data **Expected behavior** Collected partition after using the `Partitioning::Hash` to match the desired partitioning. **Additional context** `cargo 1.58.0-nightly (294967c53 2021-11-29)` `rustc 1.59.0-nightly (efec54529 2021-12-04)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
