jonmmease opened a new issue, #4873:
URL: https://github.com/apache/arrow-datafusion/issues/4873

   **Describe the bug**
   [VegaFusion](https://vegafusion.io/) implements the Vega 
[impute](https://vega.github.io/vega/docs/transforms/impute/) transform with a 
query that uses a CROSS JOIN to build a table that contains all combinations of 
the unique values of certain input columns, then a LEFT OUTER JOIN to a 
subquery that performs a GROUP BY, grouping by these same columns. This LEFT 
OUTER JOIN includes a USING constraint with the same columns that are used in 
the GROUP BY subquery. 
   
   When the order of the columns in the GROUP BY doesn't match the order of the 
columns in the USING constraint, the query panics. If the columns are specified 
in the same order, the query completes successfully.
   
   **Example**
   For example, here is an input table with 3 columns.
   
   ```
   +------+------+------+
   | col1 | col2 | col3 |
   +------+------+------+
   | A    | 1    | 1    |
   | A    | 1    | 1    |
   | A    | 2    | 1    |
   | A    | 2    | 1    |
   | A    | 3    | 1    |
   | A    | 3    | 1    |
   | A    | 4    | 1    |
   | A    | 4    | 1    |
   | BB   | 5    | 1    |
   | BB   | 5    | 1    |
   | BB   | 6    | 1    |
   | BB   | 6    | 1    |
   +------+------+------+
   ```
   Here is a working query:
   ```sql
   SELECT "col1", "col2", coalesce("sum_col3", 0) as sum_col3
   FROM (SELECT DISTINCT "col2" FROM "tbl") AS "subq1"
   CROSS JOIN (SELECT DISTINCT "col1" FROM "tbl") AS "subq2"
   LEFT OUTER JOIN (SELECT "col1", "col2", sum("col3") as "sum_col3" FROM "tbl" 
GROUP BY "col1", "col2") AS "subq3"
   USING("col1", "col2")
   ```
   ```
   +------+------+----------+
   | col1 | col2 | sum_col3 |
   +------+------+----------+
   | BB   | 5    | 2        |
   | BB   | 1    | 0        |
   | BB   | 4    | 0        |
   | BB   | 6    | 2        |
   | BB   | 3    | 0        |
   | A    | 3    | 2        |
   | BB   | 2    | 0        |
   | A    | 2    | 2        |
   | A    | 1    | 2        |
   | A    | 5    | 0        |
   | A    | 6    | 0        |
   | A    | 4    | 2        |
   +------+------+----------+
   ```
   
   The result contains one row for every combination of the unique values in 
`col1` and `col2`.  The `sum_col3` columns contains the sum of the input `col3` 
for that combination of `col1` and `col2`, and zeros are filled in for 
combinations that weren't present in the input table.
   
   Now modify the query by flipping the order of `col1` and `col2` in the USING 
constraint (From `USING("col1", "col2")` to `USING("col2", "col1")`).  Now the 
query panics.
   
   ```sql
   SELECT "col1", "col2", coalesce("sum_col3", 0) as sum_col3
   FROM (SELECT DISTINCT "col2" FROM "tbl") AS "subq1"
   CROSS JOIN (SELECT DISTINCT "col1" FROM "tbl") AS "subq2"
   LEFT OUTER JOIN (SELECT "col1", "col2", sum("col3") as "sum_col3" FROM "tbl" 
GROUP BY "col1", "col2") AS "subq3"
   USING("col2", "col1")
   ```
   ```
   thread 'test_cross_join_bug2::count_distinct_error' panicked at 'called 
`Option::unwrap()` on a `None` value', 
/Users/jonmmease/.cargo/git/checkouts/arrow-datafusion-71ae82d9dec9a01c/556282a/datafusion/core/src/physical_plan/joins/hash_join.rs:923:17
   stack backtrace:
   ...
   ```
   
   If the order of the columns in the GROUP BY query is also flipped (from 
`GROUP BY "col1", "col2"` to `GROUP BY "col2", "col1"`) then the query works 
again:
   ```sql
   SELECT "col1", "col2", coalesce("sum_col3", 0) as sum_col3
   FROM (SELECT DISTINCT "col2" FROM "tbl") AS "subq1"
   CROSS JOIN (SELECT DISTINCT "col1" FROM "tbl") AS "subq2"
   LEFT OUTER JOIN (SELECT "col1", "col2", sum("col3") as "sum_col3" FROM "tbl" 
GROUP BY "col2", "col1") AS "subq3"
   USING("col2", "col1")
   ```
   ```
   +------+------+----------+
   | col1 | col2 | sum_col3 |
   +------+------+----------+
   | BB   | 5    | 2        |
   | BB   | 1    | 0        |
   | BB   | 4    | 0        |
   | A    | 3    | 2        |
   | BB   | 2    | 0        |
   | BB   | 6    | 2        |
   | BB   | 3    | 0        |
   | A    | 2    | 2        |
   | A    | 6    | 0        |
   | A    | 4    | 2        |
   | A    | 1    | 2        |
   | A    | 5    | 0        |
   +------+------+----------+
   ```
   
   **To Reproduce**
   Here is a Rust test that implements the two queries above:
   
   ```rust
   #[cfg(test)]
   mod test_cross_join_bug2 {
       use std::sync::Arc;
       use datafusion::arrow::array::{ArrayRef, StringArray, UInt64Array};
       use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
       use datafusion::arrow::record_batch::RecordBatch;
       use datafusion::arrow::util::pretty::pretty_format_batches;
       use datafusion::datasource::MemTable;
       use datafusion::prelude::SessionContext;
   
       #[tokio::test]
       async fn count_distinct_error() {
           let col1 = Arc::new(StringArray::from(vec![
               "A", "A", "A", "A",
               "A", "A", "A", "A",
               "BB", "BB", "BB", "BB",
           ])) as ArrayRef;
   
           let col2 = Arc::new(UInt64Array::from(vec![
               1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6
           ])) as ArrayRef;
   
           let col3 = Arc::new(UInt64Array::from(vec![
               1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
           ])) as ArrayRef;
   
           let schema = Arc::new(Schema::new(vec![
               Field::new("col1", DataType::Utf8, true),
               Field::new("col2", DataType::UInt64, true),
               Field::new("col3", DataType::UInt64, true),
           ])) as SchemaRef;
   
           let batch =
               RecordBatch::try_new(schema.clone(), vec![
                   col1, col2, col3,
               ]).unwrap();
           let mem_table = MemTable::try_new(schema, 
vec![vec![batch]]).unwrap();
   
           // Create context and register table
           let ctx = SessionContext::new();
           ctx.register_table("tbl", Arc::new(mem_table)).unwrap();
   
           // Pretty print the input table
           let res = ctx
               .sql("SELECT * from tbl")
               .await
               .unwrap()
               .collect()
               .await
               .unwrap();
           let formatted = pretty_format_batches(res.as_slice()).unwrap();
           println!("{}", formatted);
   
           // Perform query where GROUP BY and USING both specify "col1", 
"col2" (works)
           let sql1 = r#"
   SELECT "col1", "col2", coalesce("sum_col3", 0) as sum_col3
   FROM (SELECT DISTINCT "col2" FROM "tbl") AS "subq1"
   CROSS JOIN (SELECT DISTINCT "col1" FROM "tbl") AS "subq2"
   LEFT OUTER JOIN (SELECT "col1", "col2", sum("col3") as "sum_col3" FROM "tbl" 
GROUP BY "col1", "col2") AS "subq3"
   USING("col1", "col2")
       "#;
   
           let res = ctx.sql(sql1).await.unwrap().collect().await.unwrap();
           let formatted = pretty_format_batches(res.as_slice()).unwrap();
           println!("{}", formatted);
   
           // Perform query where GROUP BY specifies "col1", "col2" and USING 
specifies "col2", "col1" (panic)
           let sql2 = r#"
   SELECT "col1", "col2", coalesce("sum_col3", 0) as sum_col3
   FROM (SELECT DISTINCT "col2" FROM "tbl") AS "subq1"
   CROSS JOIN (SELECT DISTINCT "col1" FROM "tbl") AS "subq2"
   LEFT OUTER JOIN (SELECT "col1", "col2", sum("col3") as "sum_col3" FROM "tbl" 
GROUP BY "col1", "col2") AS "subq3"
   USING("col2", "col1")
       "#;
   
           let res = ctx.sql(sql2).await.unwrap().collect().await.unwrap();
           let formatted = pretty_format_batches(res.as_slice()).unwrap();
           println!("{}", formatted);
       }
   }
   ```
   
   Output
   ```
   +------+------+------+
   | col1 | col2 | col3 |
   +------+------+------+
   | A    | 1    | 1    |
   | A    | 1    | 1    |
   | A    | 2    | 1    |
   | A    | 2    | 1    |
   | A    | 3    | 1    |
   | A    | 3    | 1    |
   | A    | 4    | 1    |
   | A    | 4    | 1    |
   | BB   | 5    | 1    |
   | BB   | 5    | 1    |
   | BB   | 6    | 1    |
   | BB   | 6    | 1    |
   +------+------+------+
   +------+------+----------+
   | col1 | col2 | sum_col3 |
   +------+------+----------+
   | A    | 3    | 2        |
   | BB   | 2    | 0        |
   | BB   | 6    | 2        |
   | BB   | 3    | 0        |
   | BB   | 5    | 2        |
   | BB   | 1    | 0        |
   | BB   | 4    | 0        |
   | A    | 4    | 2        |
   | A    | 2    | 2        |
   | A    | 6    | 0        |
   | A    | 1    | 2        |
   | A    | 5    | 0        |
   +------+------+----------+
   
   thread 'test_cross_join_bug2::count_distinct_error' panicked at 'called 
`Option::unwrap()` on a `None` value', 
/Users/jonmmease/.cargo/git/checkouts/arrow-datafusion-71ae82d9dec9a01c/556282a/datafusion/core/src/physical_plan/joins/hash_join.rs:923:17
   ```
   
   **Expected behavior**
   These two queries should both complete successfully, regardless of whether 
the columns are specified in the same order in the GROUP BY and USING 
constraints.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to