Jefffrey opened a new issue, #18149:
URL: https://github.com/apache/datafusion/issues/18149

   ### Describe the bug
   
   When using an async UDF as an input to certain UDAFs we can encounter an 
issue with schema.
   
   ### To Reproduce
   
   ```rust
   use arrow::compute::cast_with_options;
   use datafusion::arrow::datatypes::DataType;
   use datafusion::error::Result;
   use datafusion::logical_expr::async_udf::{AsyncScalarUDF, 
AsyncScalarUDFImpl};
   use datafusion::logical_expr::{
       ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, 
Volatility,
   };
   use datafusion::prelude::*;
   use std::any::Any;
   use std::sync::Arc;
   use tonic::async_trait;
   
   #[derive(Debug, PartialEq, Eq, Hash)]
   struct CustomUDF {
       signature: Signature,
   }
   
   impl CustomUDF {
       fn new() -> Self {
           CustomUDF {
               signature: Signature::exact(vec![DataType::Int64], 
Volatility::Immutable),
           }
       }
   }
   
   impl ScalarUDFImpl for CustomUDF {
       fn as_any(&self) -> &dyn Any {
           self
       }
   
       fn name(&self) -> &str {
           "custom_udf"
       }
   
       fn signature(&self) -> &Signature {
           &self.signature
       }
   
       fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
           Ok(DataType::Utf8)
       }
   
       fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
           fn_impl(args)
       }
   }
   
   #[async_trait]
   impl AsyncScalarUDFImpl for CustomUDF {
       async fn invoke_async_with_args(
           &self,
           args: ScalarFunctionArgs,
       ) -> Result<ColumnarValue> {
           fn_impl(args)
       }
   }
   
   fn fn_impl(args: ScalarFunctionArgs) -> Result<ColumnarValue> {
       let arg = &args.args[0];
       let array = arg.to_array(args.number_rows)?;
       let array = cast_with_options(&array, &DataType::Utf8, 
&Default::default())?;
       Ok(ColumnarValue::Array(array))
   }
   
   #[tokio::main]
   async fn main() -> Result<()> {
       let ctx = SessionContext::new_with_config(
           SessionConfig::new().set_bool("datafusion.explain.show_schema", 
true),
       );
       ctx.sql("create table data(x int) as values (-10), (2)")
           .await?
           .collect()
           .await?;
   
       // sync works fine
       ctx.register_udf(ScalarUDF::new_from_impl(CustomUDF::new()));
       ctx.sql("select approx_distinct(custom_udf(x)) from data")
           .await?
           .explain(false, false)?
           .show()
           .await?;
       ctx.sql("select approx_distinct(custom_udf(x)) from data")
           .await?
           .show()
           .await?;
   
       // issue with async
       
ctx.register_udf(AsyncScalarUDF::new(Arc::new(CustomUDF::new())).into_scalar_udf());
       ctx.sql("select approx_distinct(custom_udf(x)) from data")
           .await?
           .explain(false, false)?
           .show()
           .await?;
       ctx.sql("select approx_distinct(custom_udf(x)) from data")
           .await?
           .show()
           .await?;
   
       Ok(())
   }
   ```
   
   - Here we test a sync version and the async version; we expect the same 
result for both
   
   Output:
   
   ```sh
   datafusion (main)$ cargo run --example dataframe
      Compiling datafusion-ffi v50.2.0 
(/Users/jeffrey/Code/datafusion/datafusion/ffi)
      Compiling datafusion-examples v50.2.0 
(/Users/jeffrey/Code/datafusion/datafusion-examples)
       Finished `dev` profile [unoptimized + debuginfo] target(s) in 2.60s
        Running `/Users/jeffrey/.cargo_target_cache/debug/examples/dataframe`
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                           |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Aggregate: groupBy=[[]], 
aggr=[[approx_distinct(custom_udf(CAST(data.x AS Int64)))]]                     
                             |
   |               |   TableScan: data projection=[x]                           
                                                                           |
   | physical_plan | AggregateExec: mode=Single, gby=[], 
aggr=[approx_distinct(custom_udf(data.x))], 
schema=[approx_distinct(custom_udf(data.x)):UInt64;N] |
   |               |   DataSourceExec: partitions=1, partition_sizes=[1], 
schema=[x:Int32;N]                                                              
 |
   |               |                                                            
                                                                           |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
   +-------------------------------------+
   | approx_distinct(custom_udf(data.x)) |
   +-------------------------------------+
   | 2                                   |
   +-------------------------------------+
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
             |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Aggregate: groupBy=[[]], 
aggr=[[approx_distinct(custom_udf(CAST(data.x AS Int64)))]]                     
                                               |
   |               |   TableScan: data projection=[x]                           
                                                                                
             |
   | physical_plan | AggregateExec: mode=Final, gby=[], 
aggr=[approx_distinct(custom_udf(data.x))], 
schema=[approx_distinct(custom_udf(data.x)):UInt64;N]                    |
   |               |   CoalescePartitionsExec, 
schema=[approx_distinct(custom_udf(data.x))[hll_registers]:Binary]              
                                              |
   |               |     AggregateExec: mode=Partial, gby=[], 
aggr=[approx_distinct(custom_udf(data.x))], 
schema=[approx_distinct(custom_udf(data.x))[hll_registers]:Binary] |
   |               |       RepartitionExec: partitioning=RoundRobinBatch(12), 
input_partitions=1, schema=[x:Int32;N, __async_fn_0:Utf8;N]                     
               |
   |               |         AsyncFuncExec: 
async_expr=[async_expr(name=__async_fn_0, expr=custom_udf(CAST(x@0 AS 
Int64)))], schema=[x:Int32;N, __async_fn_0:Utf8;N]         |
   |               |           CoalesceBatchesExec: target_batch_size=8192, 
schema=[x:Int32;N]                                                              
                 |
   |               |             DataSourceExec: partitions=1, 
partition_sizes=[1], schema=[x:Int32;N]                                         
                              |
   |               |                                                            
                                                                                
             |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------+
   Error: Internal("PhysicalExpr Column references column '__async_fn_0' at 
index 1 (zero-based) but input schema only has 1 columns: [\"x\"]")
   ```
   
   The error comes from here:
   
   
https://github.com/apache/datafusion/blob/e323357b1e245d8651183e42747cb92709cb1998/datafusion/functions-aggregate/src/approx_distinct.rs#L363-L366
   
   - line 364
   
   ### Expected behavior
   
   Should show same result as sync version instead of error.
   
   ### Additional context
   
   I want to emphasize that this is not an issue with UDAFs (in this case 
`approx_distinct`). This is because during physical planning, async UDFs 
rewrite the aggregate with new expressions:
   
   
https://github.com/apache/datafusion/blob/e323357b1e245d8651183e42747cb92709cb1998/datafusion/core/src/physical_planner.rs#L745-L785
   
   - Introduced by #17619
   
   `agg_func` has new expressions written in, to replace the async function 
call. However, the schema it has does not change; therefore later in the 
pipeline (see `approx_distinct` above) it has a mismatch between schema and 
input expressions.
   
   Overall I question this API:
   
   
https://github.com/apache/datafusion/blob/e323357b1e245d8651183e42747cb92709cb1998/datafusion/physical-expr/src/aggregate.rs#L613-L620
   
   It allows rewriting the expressions but it does not enforce or check that 
the new expressions conform with the schema it already has.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to