paleolimbot opened a new issue, #17422:
URL: https://github.com/apache/datafusion/issues/17422

   ### Describe the bug
   
   I haven't yet figured out exactly what part of this query causes the issue, 
but in the query:
   
   ```sql
   SELECT L.id l_id FROM L
   WHERE EXISTS (SELECT 1 FROM R WHERE extension_predicate(L.geometry, 
R.geometry))
   ORDER BY l_id
   ```
   
   ...causes `extension_predicate()` to be called with `(<extension type 
dropped>, <extension type>)`. I think, but don't know, that the subquery is the 
issue.
   
   We're using that query to check a left semi join...explicit joins that don't 
involve a subquery are OK.
   
   ### To Reproduce
   
   Reproducer:
   
   ```rust
   use std::collections::HashMap;
   
   use datafusion::{
       arrow::{
           array::{RecordBatch, create_array},
           datatypes::{DataType, Field, Schema},
       },
       logical_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility},
       prelude::*,
       scalar::ScalarValue,
   };
   
   #[tokio::main]
   async fn main() {
       let schema = Schema::new(vec![
           Field::new("id", DataType::Int64, true),
           Field::new("geometry", DataType::Utf8, 
true).with_metadata(HashMap::from([(
               "ARROW:extension:name".to_string(),
               "foofy.foofy".to_string(),
           )])),
       ]);
   
       let batch_lhs = RecordBatch::try_new(
           schema.clone().into(),
           vec![
               create_array!(Int64, [1, 2]),
               create_array!(Utf8, [Some("item1"), Some("item2")]),
           ],
       )
       .unwrap();
   
       let batch_rhs = RecordBatch::try_new(
           schema.clone().into(),
           vec![
               create_array!(Int64, [2, 3]),
               create_array!(Utf8, [Some("item2"), Some("item3")]),
           ],
       )
       .unwrap();
   
       let ctx = SessionContext::new();
       ctx.register_batch("l", batch_lhs).unwrap();
       ctx.register_batch("r", batch_rhs).unwrap();
       ctx.register_udf(ExtensionScalarPredicate::default().into());
   
       println!("INNER JOIN ---");
       let df = ctx
           .sql(
               "
           SELECT L.id l_id FROM L
           INNER JOIN R ON extension_predicate(L.geometry, R.geometry)
           ORDER BY l_id
           ",
           )
           .await
           .unwrap();
       df.show().await.unwrap();
   
       println!("With subquery ---");
       let df = ctx
           .sql(
               "
           SELECT L.id l_id FROM L
           WHERE EXISTS (SELECT 1 FROM R WHERE extension_predicate(L.geometry, 
R.geometry))
           ORDER BY l_id
           ",
           )
           .await
           .unwrap();
       df.show().await.unwrap();
   }
   
   #[derive(Debug)]
   struct ExtensionScalarPredicate {
       signature: Signature,
   }
   
   impl Default for ExtensionScalarPredicate {
       fn default() -> Self {
           Self {
               signature: Signature::user_defined(Volatility::Immutable),
           }
       }
   }
   
   impl ScalarUDFImpl for ExtensionScalarPredicate {
       fn as_any(&self) -> &dyn std::any::Any {
           self
       }
   
       fn name(&self) -> &str {
           "extension_predicate"
       }
   
       fn signature(&self) -> &Signature {
           &self.signature
       }
   
       fn coerce_types(&self, arg_types: &[DataType]) -> 
datafusion::error::Result<Vec<DataType>> {
           Ok(arg_types.to_vec())
       }
   
       fn return_type(&self, _arg_types: &[DataType]) -> 
datafusion::error::Result<DataType> {
           unreachable!("This shouldn't have been called")
       }
   
       fn return_field_from_args(
           &self,
           args: datafusion::logical_expr::ReturnFieldArgs,
       ) -> datafusion::error::Result<datafusion::arrow::datatypes::FieldRef> {
           let dropped = args
               .arg_fields
               .iter()
               .enumerate()
               .filter(|(_, input_field)| 
!input_field.metadata().contains_key("ARROW:extension:name"))
               .collect::<Vec<_>>();
           if !dropped.is_empty() {
               panic!(
                   "Metadata for {} or more args was dropped 
(return_field_from_args) {dropped:?}",
                   dropped.len()
               );
           }
   
           Ok(Field::new("", DataType::Boolean, true).into())
       }
   
       fn invoke_with_args(
           &self,
           args: datafusion::logical_expr::ScalarFunctionArgs,
       ) -> datafusion::error::Result<datafusion::logical_expr::ColumnarValue> {
           let dropped = args
               .arg_fields
               .iter()
               .enumerate()
               .filter(|(_, input_field)| 
!input_field.metadata().contains_key("ARROW:extension:name"))
               .collect::<Vec<_>>();
           if !dropped.is_empty() {
               panic!(
                   "Metadata for {} or more args was dropped (invoke_with_args) 
{dropped:?}",
                   dropped.len()
               );
           }
   
           let array = 
ScalarValue::Boolean(Some(true)).to_array_of_size(args.number_rows)?;
           Ok(ColumnarValue::Array(array))
       }
   }
   ```
   
   ### Expected behavior
   
   I would have expected the function to be called with field metadata from its 
inputs (rather than the storage type).
   
   ### Additional context
   
   cc @timsaucer and @Kontinuation 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to