andrei-ionescu opened a new issue #1404:
URL: https://github.com/apache/arrow-datafusion/issues/1404


   **Describe the bug**
   
   Using `Partitioning::Hash` results in wrong partitions.
   
   **To Reproduce**
   
   1. Create a new cargo project
   2. Create the `data` directory on the same level with the `src` directory
   3. Download the attached archive 
[data-dimension-vehicle-20210609T222533Z-vehicle-electric_gas_steam_3cols.parquet.zip](https://github.com/apache/arrow-datafusion/files/7660549/data-dimension-vehicle-20210609T222533Z-vehicle-electric_gas_steam_3cols.parquet.zip)
 and unpack it in `data` directory
   4. Add the following dependencies in `Cargo.toml`
   
   ```rust
   [dependencies]
   tokio = "1.14"
   arrow = "6.0"
   datafusion = "6.0"
   ```
   
   5. Place the following code in the `main.rs`
   
   ```rust
   use arrow::{
       record_batch::RecordBatch, 
       util::display::array_value_to_string};
   use datafusion::{
       prelude::*, 
       error::Result, 
       logical_plan::count_distinct};
   
   
   const SOURCE_DATA_PATH: &str = 
"./data/data-dimension-vehicle-20210609T222533Z-vehicle-electric_gas_steam_3cols.parquet";
   
   #[tokio::main]
   async fn main()  -> Result<()> {
       let mut ctx = ExecutionContext::new(); 
   
       // Define partitioning
       let partitioning_columns = vec![
           col("nlm_dimension_load_date"), 
           col("src_fuel_type")];
   
       // Read source parquet file
       let df = ctx.read_parquet(SOURCE_DATA_PATH).await?;
   
       // Manual partitioning
       let concat_cols = vec![
           col("nlm_dimension_load_date"), 
           lit(" / "), 
           col("src_fuel_type")];
   
       // Get info for the given partitions
       let df_info = df
           .aggregate(
               partitioning_columns.clone(), 
               vec![
                   count_distinct(concat(&concat_cols[..])).alias("c"), 
                   count(concat(&concat_cols[..])).alias("rows_in_part")
               ])?;
       
       println!("");
       println!("Data distribution by partition");
       println!("==============================");
       println!("");
       df_info.show().await?;
       println!("");
   
       // Determine the total number of partitions
       let num_partitions_rec = df_info
           .aggregate(
               vec![col("c")], 
               vec![sum(col("c")).alias("total")])?
           .select(
               vec![col("total")])?
           .collect().await?;
           
       // Should be 72
       let total_partitions = array_value_to_string(
           num_partitions_rec
               .iter()
               .filter(|b| b.num_rows() > 0)
               .collect::<Vec<&RecordBatch>>()[0]
               .column(0), 
           0
       )?.parse::<usize>().unwrap();
   
       println!("Total number of partitions: {}", total_partitions);
       println!("==============================");
       println!("");
   
       // Partition the dataframe with hash partitioning with 72 desired 
partitions
       let partitioned_df = df
           .repartition(Partitioning::Hash(
               partitioning_columns, 
               total_partitions))?; // 72
   
       // Collected partitions
       let collected_partitions = partitioned_df
           .collect_partitioned()
           .await?;
   
       println!("Collected partitions");
       println!("====================");
       println!("");
       
       collected_partitions.iter().enumerate().for_each(|(i, p)| {
           let mut num_rows: usize = 0;
           for i in p {
               num_rows += i.num_rows();
           }
           println!("Partition={}, rows_in_part={}", i, num_rows);
       });
   
       Ok(())
   }
   ```
   
   6. Execute it with `cargo run`.
   7. The result is bellow:
   
   ```
   Data distribution by partition
   ==============================
   
   +-------------------------+---------------+---+--------------+
   | nlm_dimension_load_date | src_fuel_type | c | rows_in_part |
   +-------------------------+---------------+---+--------------+
   | 2020-10-18T00:29:41Z    | Steam         | 1 | 14           |
   | 2021-06-09T00:32:40Z    | Gas           | 1 | 5            |
   | 2021-06-09T00:35:51Z    | Electric      | 1 | 479          |
   | 2020-10-18T00:52:38Z    | Steam         | 1 | 7            |
   | 2020-10-18T00:12:55Z    | Gas           | 1 | 3            |
   | 2020-10-18T00:52:38Z    | Electric      | 1 | 259          |
   | 2021-06-09T00:08:27Z    | Gas           | 1 | 15           |
   | 2021-06-08T23:55:20Z    | Gas           | 1 | 7            |
   | 2021-06-09T00:35:51Z    | Steam         | 1 | 2            |
   | 2021-06-09T00:26:40Z    | Electric      | 1 | 160          |
   | 2020-10-18T00:04:58Z    | Electric      | 1 | 36           |
   | 2021-06-09T00:13:39Z    | Electric      | 1 | 47           |
   | 2020-10-18T00:29:41Z    | Gas           | 1 | 1            |
   | 2021-06-09T00:41:20Z    | Steam         | 1 | 4            |
   | 2020-10-18T00:35:16Z    | Electric      | 1 | 131          |
   | 2020-10-18T00:18:03Z    | Steam         | 1 | 8            |
   | 2021-06-09T00:23:35Z    | Gas           | 1 | 4            |
   | 2021-06-09T00:20:35Z    | Electric      | 1 | 54           |
   | 2021-06-09T00:41:20Z    | Gas           | 1 | 8            |
   | 2020-10-18T00:35:16Z    | Steam         | 1 | 27           |
   | 2020-10-18T00:18:03Z    | Electric      | 1 | 88           |
   | 2021-06-09T00:20:35Z    | Steam         | 1 | 2            |
   | 2021-06-09T00:26:40Z    | Gas           | 1 | 7            |
   | 2021-06-09T22:15:47Z    | Electric      | 1 | 943          |
   | 2020-10-18T00:23:47Z    | Gas           | 1 | 2            |
   | 2021-06-09T00:32:40Z    | Steam         | 1 | 2            |
   | 2020-10-18T00:29:41Z    | Electric      | 1 | 89           |
   | 2021-06-09T22:15:47Z    | Gas           | 1 | 35           |
   | 2020-10-18T00:46:52Z    | Gas           | 1 | 2            |
   | 2021-06-09T00:43:47Z    | Gas           | 1 | 20           |
   | 2021-06-09T00:32:40Z    | Electric      | 1 | 304          |
   | 2021-06-09T22:15:47Z    | Steam         | 1 | 9            |
   | 2020-10-18T00:04:58Z    | Steam         | 1 | 5            |
   | 2021-06-09T00:41:20Z    | Electric      | 1 | 203          |
   | 2021-06-09T00:23:35Z    | Electric      | 1 | 56           |
   | 2021-06-09T00:26:40Z    | Steam         | 1 | 5            |
   | 2021-06-09T00:13:39Z    | Steam         | 1 | 2            |
   | 2020-10-18T00:23:47Z    | Electric      | 1 | 114          |
   | 2020-10-18T00:52:38Z    | Gas           | 1 | 2            |
   | 2020-10-18T00:46:52Z    | Steam         | 1 | 9            |
   | 2021-06-09T00:43:47Z    | Steam         | 1 | 4            |
   | 2020-10-18T00:46:52Z    | Electric      | 1 | 246          |
   | 2021-06-09T00:43:47Z    | Electric      | 1 | 410          |
   | 2020-10-18T00:23:47Z    | Steam         | 1 | 10           |
   | 2021-06-09T00:35:51Z    | Gas           | 1 | 10           |
   | 2020-10-18T00:12:55Z    | Electric      | 1 | 86           |
   | 2021-06-09T00:08:27Z    | Steam         | 1 | 22           |
   | 2020-10-18T00:06:36Z    | Gas           | 1 | 2            |
   | 2021-06-09T00:16:33Z    | Gas           | 1 | 6            |
   | 2021-06-09T00:38:39Z    | Gas           | 1 | 19           |
   | 2021-06-08T23:55:20Z    | Steam         | 1 | 13           |
   | 2020-10-18T00:40:11Z    | Gas           | 1 | 1            |
   | 2021-06-09T00:08:27Z    | Electric      | 1 | 472          |
   | 2021-06-08T23:55:20Z    | Electric      | 1 | 234          |
   | 2021-06-09T00:29:31Z    | Gas           | 1 | 10           |
   | 2021-06-09T00:02:37Z    | Gas           | 1 | 13           |
   | 2020-10-18T00:12:55Z    | Steam         | 1 | 9            |
   | 2021-06-09T00:38:39Z    | Electric      | 1 | 256          |
   | 2020-10-18T00:06:36Z    | Electric      | 1 | 136          |
   | 2020-10-18T00:40:11Z    | Electric      | 1 | 209          |
   | 2021-06-09T00:02:37Z    | Steam         | 1 | 18           |
   | 2020-10-18T00:18:03Z    | Gas           | 1 | 1            |
   | 2021-06-09T00:29:31Z    | Steam         | 1 | 7            |
   | 2021-06-09T00:16:33Z    | Electric      | 1 | 54           |
   | 2020-10-18T00:40:11Z    | Steam         | 1 | 48           |
   | 2021-06-09T00:02:37Z    | Electric      | 1 | 571          |
   | 2020-10-18T00:35:16Z    | Gas           | 1 | 2            |
   | 2021-06-09T00:29:31Z    | Electric      | 1 | 223          |
   | 2020-10-18T00:06:36Z    | Steam         | 1 | 10           |
   | 2021-06-09T00:38:39Z    | Steam         | 1 | 7            |
   | 2021-06-09T00:20:35Z    | Gas           | 1 | 4            |
   | 2021-06-09T00:16:33Z    | Steam         | 1 | 1            |
   +-------------------------+---------------+---+--------------+
   
   Total number of partitions: 72
   ==============================
   
   Collected partitions
   ====================
   
   Partition=0, rows_in_part=6
   Partition=1, rows_in_part=7
   Partition=2, rows_in_part=23
   Partition=3, rows_in_part=0
   Partition=4, rows_in_part=0
   Partition=5, rows_in_part=208
   Partition=6, rows_in_part=8
   Partition=7, rows_in_part=227
   Partition=8, rows_in_part=236
   Partition=9, rows_in_part=0
   Partition=10, rows_in_part=2
   Partition=11, rows_in_part=2
   Partition=12, rows_in_part=0
   Partition=13, rows_in_part=63
   Partition=14, rows_in_part=88
   Partition=15, rows_in_part=0
   Partition=16, rows_in_part=584
   Partition=17, rows_in_part=0
   Partition=18, rows_in_part=10
   Partition=19, rows_in_part=0
   Partition=20, rows_in_part=0
   Partition=21, rows_in_part=0
   Partition=22, rows_in_part=61
   Partition=23, rows_in_part=7
   Partition=24, rows_in_part=32
   Partition=25, rows_in_part=7
   Partition=26, rows_in_part=114
   Partition=27, rows_in_part=29
   Partition=28, rows_in_part=0
   Partition=29, rows_in_part=0
   Partition=30, rows_in_part=6
   Partition=31, rows_in_part=231
   Partition=32, rows_in_part=0
   Partition=33, rows_in_part=18
   Partition=34, rows_in_part=0
   Partition=35, rows_in_part=943
   Partition=36, rows_in_part=0
   Partition=37, rows_in_part=0
   Partition=38, rows_in_part=131
   Partition=39, rows_in_part=260
   Partition=40, rows_in_part=0
   Partition=41, rows_in_part=479
   Partition=42, rows_in_part=0
   Partition=43, rows_in_part=4
   Partition=44, rows_in_part=0
   Partition=45, rows_in_part=203
   Partition=46, rows_in_part=27
   Partition=47, rows_in_part=0
   Partition=48, rows_in_part=10
   Partition=49, rows_in_part=2
   Partition=50, rows_in_part=0
   Partition=51, rows_in_part=304
   Partition=52, rows_in_part=35
   Partition=53, rows_in_part=40
   Partition=54, rows_in_part=0
   Partition=55, rows_in_part=64
   Partition=56, rows_in_part=19
   Partition=57, rows_in_part=0
   Partition=58, rows_in_part=0
   Partition=59, rows_in_part=0
   Partition=60, rows_in_part=89
   Partition=61, rows_in_part=5
   Partition=62, rows_in_part=8
   Partition=63, rows_in_part=137
   Partition=64, rows_in_part=0
   Partition=65, rows_in_part=259
   Partition=66, rows_in_part=656
   Partition=67, rows_in_part=0
   Partition=68, rows_in_part=19
   Partition=69, rows_in_part=0
   Partition=70, rows_in_part=0
   Partition=71, rows_in_part=621
   ```
   
   7. The following **WRONG** things can be observer:
       - some of the collected partitions are empty, they have no rows
       - the number of rows in the collected partitions do no match the source 
data
   
   **Expected behavior**
   
   Collected partition after using the `Partitioning::Hash` to match the 
desired partitioning.
   
   **Additional context**
   
   `cargo 1.58.0-nightly (294967c53 2021-11-29)`
   `rustc 1.59.0-nightly (efec54529 2021-12-04)`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to