jonmmease opened a new issue, #4873: URL: https://github.com/apache/arrow-datafusion/issues/4873
**Describe the bug** [VegaFusion](https://vegafusion.io/) implements the Vega [impute](https://vega.github.io/vega/docs/transforms/impute/) transform with a query that uses a CROSS JOIN to build a table that contains all combinations of the unique values of certain input columns, then a LEFT OUTER JOIN to a subquery that performs a GROUP BY, grouping by these same columns. This LEFT OUTER JOIN includes a USING constraint with the same columns that are used in the GROUP BY subquery. When the order of the columns in the GROUP BY doesn't match the order of the columns in the USING constraint, the query panics. If the columns are specified in the same order, the query completes successfully. **Example** For example, here is an input table with 3 columns. ``` +------+------+------+ | col1 | col2 | col3 | +------+------+------+ | A | 1 | 1 | | A | 1 | 1 | | A | 2 | 1 | | A | 2 | 1 | | A | 3 | 1 | | A | 3 | 1 | | A | 4 | 1 | | A | 4 | 1 | | BB | 5 | 1 | | BB | 5 | 1 | | BB | 6 | 1 | | BB | 6 | 1 | +------+------+------+ ``` Here is a working query: ```sql SELECT "col1", "col2", coalesce("sum_col3", 0) as sum_col3 FROM (SELECT DISTINCT "col2" FROM "tbl") AS "subq1" CROSS JOIN (SELECT DISTINCT "col1" FROM "tbl") AS "subq2" LEFT OUTER JOIN (SELECT "col1", "col2", sum("col3") as "sum_col3" FROM "tbl" GROUP BY "col1", "col2") AS "subq3" USING("col1", "col2") ``` ``` +------+------+----------+ | col1 | col2 | sum_col3 | +------+------+----------+ | BB | 5 | 2 | | BB | 1 | 0 | | BB | 4 | 0 | | BB | 6 | 2 | | BB | 3 | 0 | | A | 3 | 2 | | BB | 2 | 0 | | A | 2 | 2 | | A | 1 | 2 | | A | 5 | 0 | | A | 6 | 0 | | A | 4 | 2 | +------+------+----------+ ``` The result contains one row for every combination of the unique values in `col1` and `col2`. The `sum_col3` columns contains the sum of the input `col3` for that combination of `col1` and `col2`, and zeros are filled in for combinations that weren't present in the input table. Now modify the query by flipping the order of `col1` and `col2` in the USING constraint (From `USING("col1", "col2")` to `USING("col2", "col1")`). Now the query panics. ```sql SELECT "col1", "col2", coalesce("sum_col3", 0) as sum_col3 FROM (SELECT DISTINCT "col2" FROM "tbl") AS "subq1" CROSS JOIN (SELECT DISTINCT "col1" FROM "tbl") AS "subq2" LEFT OUTER JOIN (SELECT "col1", "col2", sum("col3") as "sum_col3" FROM "tbl" GROUP BY "col1", "col2") AS "subq3" USING("col2", "col1") ``` ``` thread 'test_cross_join_bug2::count_distinct_error' panicked at 'called `Option::unwrap()` on a `None` value', /Users/jonmmease/.cargo/git/checkouts/arrow-datafusion-71ae82d9dec9a01c/556282a/datafusion/core/src/physical_plan/joins/hash_join.rs:923:17 stack backtrace: ... ``` If the order of the columns in the GROUP BY query is also flipped (from `GROUP BY "col1", "col2"` to `GROUP BY "col2", "col1"`) then the query works again: ```sql SELECT "col1", "col2", coalesce("sum_col3", 0) as sum_col3 FROM (SELECT DISTINCT "col2" FROM "tbl") AS "subq1" CROSS JOIN (SELECT DISTINCT "col1" FROM "tbl") AS "subq2" LEFT OUTER JOIN (SELECT "col1", "col2", sum("col3") as "sum_col3" FROM "tbl" GROUP BY "col2", "col1") AS "subq3" USING("col2", "col1") ``` ``` +------+------+----------+ | col1 | col2 | sum_col3 | +------+------+----------+ | BB | 5 | 2 | | BB | 1 | 0 | | BB | 4 | 0 | | A | 3 | 2 | | BB | 2 | 0 | | BB | 6 | 2 | | BB | 3 | 0 | | A | 2 | 2 | | A | 6 | 0 | | A | 4 | 2 | | A | 1 | 2 | | A | 5 | 0 | +------+------+----------+ ``` **To Reproduce** Here is a Rust test that implements the two queries above: ```rust #[cfg(test)] mod test_cross_join_bug2 { use std::sync::Arc; use datafusion::arrow::array::{ArrayRef, StringArray, UInt64Array}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::arrow::util::pretty::pretty_format_batches; use datafusion::datasource::MemTable; use datafusion::prelude::SessionContext; #[tokio::test] async fn count_distinct_error() { let col1 = Arc::new(StringArray::from(vec![ "A", "A", "A", "A", "A", "A", "A", "A", "BB", "BB", "BB", "BB", ])) as ArrayRef; let col2 = Arc::new(UInt64Array::from(vec![ 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6 ])) as ArrayRef; let col3 = Arc::new(UInt64Array::from(vec![ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 ])) as ArrayRef; let schema = Arc::new(Schema::new(vec![ Field::new("col1", DataType::Utf8, true), Field::new("col2", DataType::UInt64, true), Field::new("col3", DataType::UInt64, true), ])) as SchemaRef; let batch = RecordBatch::try_new(schema.clone(), vec![ col1, col2, col3, ]).unwrap(); let mem_table = MemTable::try_new(schema, vec![vec![batch]]).unwrap(); // Create context and register table let ctx = SessionContext::new(); ctx.register_table("tbl", Arc::new(mem_table)).unwrap(); // Pretty print the input table let res = ctx .sql("SELECT * from tbl") .await .unwrap() .collect() .await .unwrap(); let formatted = pretty_format_batches(res.as_slice()).unwrap(); println!("{}", formatted); // Perform query where GROUP BY and USING both specify "col1", "col2" (works) let sql1 = r#" SELECT "col1", "col2", coalesce("sum_col3", 0) as sum_col3 FROM (SELECT DISTINCT "col2" FROM "tbl") AS "subq1" CROSS JOIN (SELECT DISTINCT "col1" FROM "tbl") AS "subq2" LEFT OUTER JOIN (SELECT "col1", "col2", sum("col3") as "sum_col3" FROM "tbl" GROUP BY "col1", "col2") AS "subq3" USING("col1", "col2") "#; let res = ctx.sql(sql1).await.unwrap().collect().await.unwrap(); let formatted = pretty_format_batches(res.as_slice()).unwrap(); println!("{}", formatted); // Perform query where GROUP BY specifies "col1", "col2" and USING specifies "col2", "col1" (panic) let sql2 = r#" SELECT "col1", "col2", coalesce("sum_col3", 0) as sum_col3 FROM (SELECT DISTINCT "col2" FROM "tbl") AS "subq1" CROSS JOIN (SELECT DISTINCT "col1" FROM "tbl") AS "subq2" LEFT OUTER JOIN (SELECT "col1", "col2", sum("col3") as "sum_col3" FROM "tbl" GROUP BY "col1", "col2") AS "subq3" USING("col2", "col1") "#; let res = ctx.sql(sql2).await.unwrap().collect().await.unwrap(); let formatted = pretty_format_batches(res.as_slice()).unwrap(); println!("{}", formatted); } } ``` Output ``` +------+------+------+ | col1 | col2 | col3 | +------+------+------+ | A | 1 | 1 | | A | 1 | 1 | | A | 2 | 1 | | A | 2 | 1 | | A | 3 | 1 | | A | 3 | 1 | | A | 4 | 1 | | A | 4 | 1 | | BB | 5 | 1 | | BB | 5 | 1 | | BB | 6 | 1 | | BB | 6 | 1 | +------+------+------+ +------+------+----------+ | col1 | col2 | sum_col3 | +------+------+----------+ | A | 3 | 2 | | BB | 2 | 0 | | BB | 6 | 2 | | BB | 3 | 0 | | BB | 5 | 2 | | BB | 1 | 0 | | BB | 4 | 0 | | A | 4 | 2 | | A | 2 | 2 | | A | 6 | 0 | | A | 1 | 2 | | A | 5 | 0 | +------+------+----------+ thread 'test_cross_join_bug2::count_distinct_error' panicked at 'called `Option::unwrap()` on a `None` value', /Users/jonmmease/.cargo/git/checkouts/arrow-datafusion-71ae82d9dec9a01c/556282a/datafusion/core/src/physical_plan/joins/hash_join.rs:923:17 ``` **Expected behavior** These two queries should both complete successfully, regardless of whether the columns are specified in the same order in the GROUP BY and USING constraints. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org