shivbhatia10 opened a new issue, #18517:
URL: https://github.com/apache/datafusion/issues/18517

   ### Describe the bug
   
   Hi folks, we have a fairly complex pipeline which ends in a `drop 
duplicates` transform, which is implemented as an aggregation with a group by 
on the column we want to drop duplicates for. We know that if we don't have 
this `drop duplicates` at the end, the primary key column is definitely 
producing unique values for each row, so we expect the transform to do nothing. 
However in reality it's going from 8.7 million rows down to 28 thousand, which 
is a massive loss of data.
   
   After investigating this for a few weeks we believe it's because partitions 
are being handled incorrectly at the physical execution stage. We tried setting 
`target_partitions` to 1 and the pipeline started behaving correctly. I wrote a 
script to try to reproduce the logic of the pipeline and read through the 
physical plans for different values of `target_partitions` - what I noticed is 
that when it's set to 1, there are no instances of `UnKnownColumn`, but for any 
other value I see 8 instances of it.
   
   I believe this might be causing the issue with the `drop duplicates` 
pipeline. Unfortunately I was not able to generate synthetic data which 
reproduced the issue with dropping rows, and I can't share the original data 
where we observe this because it's customer data.
   
   Please let me know if there's any other context I can provide, we'd 
appreciate any insight you folks can provide here!
   
   ### To Reproduce
   
   This is the script I used to find this behaviour:
   
   ```
   use datafusion::arrow::array::StringArray;
   use datafusion::arrow::datatypes::{DataType, Field, Schema};
   use datafusion::arrow::record_batch::RecordBatch;
   use datafusion::error::Result;
   use datafusion::functions_aggregate::first_last::FirstValue;
   use datafusion::logical_expr::AggregateUDF;
   use datafusion::prelude::*;
   use std::sync::Arc;
   
   #[tokio::main]
   async fn main() -> Result<()> {
       let config = SessionConfig::new().with_target_partitions(16);
       let ctx = SessionContext::new_with_config(config);
   
       let first_value_udf = AggregateUDF::from(FirstValue::new());
       ctx.register_udaf(first_value_udf);
   
       let num_rows = 5_000_000;
   
       let left_agg_source = RecordBatch::try_new(
           Arc::new(Schema::new(vec![
               Field::new("join_key", DataType::Utf8, true),
               Field::new("url_field", DataType::Utf8, true),
               Field::new("data_field", DataType::Utf8, true),
           ])),
           vec![
               Arc::new(StringArray::from_iter_values(
                   (0..num_rows).map(|i| format!("jk_{}", i % 100000)),
               )),
               Arc::new(StringArray::from_iter_values(
                   (0..num_rows).map(|i| format!("proto://id_{}", i)),
               )),
               Arc::new(StringArray::from_iter_values(
                   (0..num_rows).map(|i| format!("data_{}", i)),
               )),
           ],
       )?;
   
       let right_agg_source = RecordBatch::try_new(
           Arc::new(Schema::new(vec![
               Field::new("join_key", DataType::Utf8, true),
               Field::new("key1", DataType::Utf8, true),
               Field::new("attr1", DataType::Utf8, true),
           ])),
           vec![
               Arc::new(StringArray::from_iter_values(
                   (0..num_rows).map(|i| format!("jk_{}", i % 100000)),
               )),
               Arc::new(StringArray::from_iter_values(
                   (0..num_rows).map(|i| format!("k1_{}", i % 50000)),
               )),
               Arc::new(StringArray::from_iter_values(
                   (0..num_rows).map(|i| format!("attr_{}", i % 100)),
               )),
           ],
       )?;
   
       let left_union_source = RecordBatch::try_new(
           Arc::new(Schema::new(vec![
               Field::new("key2", DataType::Utf8, true),
               Field::new("key1", DataType::Utf8, true),
               Field::new("value", DataType::Utf8, true),
           ])),
           vec![
               Arc::new(StringArray::from_iter_values(
                   (0..num_rows).map(|i| format!("id_{}", i)),
               )),
               Arc::new(StringArray::from_iter_values(
                   (0..num_rows).map(|i| format!("k1_{}", i % 50000)),
               )),
               Arc::new(StringArray::from_iter_values(
                   (0..num_rows).map(|i| format!("val_{}", i)),
               )),
           ],
       )?;
   
       let right_union_source = RecordBatch::try_new(
           Arc::new(Schema::new(vec![
               Field::new("key2", DataType::Utf8, true),
               Field::new("key1", DataType::Utf8, true),
               Field::new("value", DataType::Utf8, true),
           ])),
           vec![
               Arc::new(StringArray::from_iter_values(
                   (0..num_rows / 2).map(|i| format!("id_{}", i + num_rows)),
               )),
               Arc::new(StringArray::from_iter_values(
                   (0..num_rows / 2).map(|i| format!("k1_{}", i % 50000)),
               )),
               Arc::new(StringArray::from_iter_values(
                   (0..num_rows / 2).map(|i| format!("val_{}", i + num_rows)),
               )),
           ],
       )?;
   
       ctx.register_batch("left_agg_source", left_agg_source)?;
       ctx.register_batch("right_agg_source", right_agg_source)?;
       ctx.register_batch("left_union_source", left_union_source)?;
       ctx.register_batch("right_union_source", right_union_source)?;
   
       let query = "
           WITH 
           agg1 AS (
               SELECT 
                   SUBSTRING(url_field FROM POSITION('://' IN url_field) + 3) 
as extracted_id,
                   join_key,
                   first_value(data_field) as agg_data
               FROM left_agg_source
               WHERE url_field LIKE 'proto://%'
               GROUP BY SUBSTRING(url_field FROM POSITION('://' IN url_field) + 
3), join_key, url_field
           ),
           agg2 AS (
               SELECT join_key, key1, attr1
               FROM right_agg_source
               GROUP BY join_key, key1, attr1
           ),
           joined_aggs AS (
               SELECT 
                   a1.extracted_id,
                   a2.attr1,
                   a2.key1,
                   first_value(a1.agg_data) as final_data
               FROM agg1 a1
               INNER JOIN agg2 a2 ON a1.join_key = a2.join_key
               GROUP BY a1.extracted_id, a2.attr1, a2.key1
           ),
           unioned AS (
               SELECT key2, key1, value FROM left_union_source
               UNION ALL
               SELECT key2, key1, value FROM right_union_source
           ),
           final_joined AS (
               SELECT 
                   u.key2,
                   u.key1,
                   u.value,
                   j.attr1,
                   j.final_data
               FROM unioned u
               LEFT JOIN joined_aggs j 
                   ON u.key1 = j.key1 
                   AND u.key2 = j.extracted_id
           )
           SELECT 
               CONCAT(key1, '_', COALESCE(attr1, 'NULL'), '_', key2) as 
composite_key,
               first_value(key2) as key2,
               first_value(value) as value,
               first_value(final_data) as final_data,
               first_value(attr1) as attr1
           FROM final_joined
           GROUP BY CONCAT(key1, '_', COALESCE(attr1, 'NULL'), '_', key2)
       ";
   
       println!("Running query with {} rows...", num_rows);
       let df = ctx.sql(query).await?;
   
       println!("\nChecking for UnknownColumn in physical plan...");
       let plan = df.clone().create_physical_plan().await?;
       let plan_str = format!("{:#?}", plan);
   
       println!("{}", plan_str);
   
       let unknown_col_count = plan_str.matches("UnKnownColumn").count();
       println!("Found {} instances of UnKnownColumn", unknown_col_count);
   
       Ok(())
   }
   ```
   
   ### Expected behavior
   
   For target_partitions set to 16 (or any value greater than 1 that I've tried 
so far) I see this output:
   
   `Found 8 instances of UnKnownColumn`
   
   but specifically for target_partitions set to 1 I see
   
   `Found 0 instances of UnKnownColumn`
   
   I believe this is the source of the issue we're having with rows being 
dropped. I apologize for not being able to provide a repro of that behavior, 
but I believe this UnKnownColumn thing is an issue in its own right.
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to