findepi commented on code in PR #16846:
URL: https://github.com/apache/datafusion/pull/16846#discussion_r2221418135


##########
datafusion-examples/examples/async_udf.rs:
##########
@@ -127,118 +123,45 @@ fn animal() -> Result<RecordBatch> {
     Ok(RecordBatch::try_new(schema, vec![id_array, name_array])?)
 }
 
+/// An async UDF that simulates asking a large language model (LLM) service a
+/// question based on the content of two columns. The UDF will return a boolean
+/// indicating whether the LLM thinks the first argument matches the question 
in
+/// the second argument.
+///
+/// Since this is a simplified example, it does not call an LLM service, but
+/// could be extended to do so in a real-world scenario.
 #[derive(Debug)]
-pub struct AsyncUpper {
+struct AskLLM {
     signature: Signature,
 }
 
-impl Default for AsyncUpper {
+impl Default for AskLLM {
     fn default() -> Self {
         Self::new()
     }
 }
 
-impl AsyncUpper {
+impl AskLLM {

Review Comment:
   :) 



##########
datafusion-examples/examples/async_udf.rs:
##########
@@ -249,19 +172,62 @@ impl ScalarUDFImpl for AsyncEqual {
         Ok(DataType::Boolean)
     }
 
+    /// Since this is an async UDF, the `invoke_with_args` method will not be
+    /// called directly.
     fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
-        not_impl_err!("AsyncEqual can only be called from async contexts")
+        not_impl_err!("AskLLM can only be called from async contexts")
     }
 }
 
+/// In addition to [`ScalarUDFImpl`], we also need to implement the
+/// [`AsyncScalarUDFImpl`] trait.
 #[async_trait]
-impl AsyncScalarUDFImpl for AsyncEqual {
+impl AsyncScalarUDFImpl for AskLLM {
+    /// The `invoke_async_with_args` method is similar to `invoke_with_args`,
+    /// but it returns a `Future` that resolves to the result.
+    ///
+    /// Since this signature is `async`, it can do any `async` operations, such
+    /// as network requests. This method is run on the same tokio `Runtime` 
that
+    /// is processing the query, so you may wish to make actual network 
requests
+    /// on a different `Runtime`, as explained in the `thread_pools.rs` example
+    /// in this directory.
     async fn invoke_async_with_args(
         &self,
         args: ScalarFunctionArgs,
         _option: &ConfigOptions,
     ) -> Result<ArrayRef> {
-        let [arg1, arg2] = take_function_args(self.name(), &args.args)?;
-        apply_cmp(arg1, arg2, eq)?.to_array(args.number_rows)
+        // in a real UDF you would likely want to special case constant
+        // arguments to improve performance, but this example converts the
+        // arguments to arrays for simplicity.
+        let args = ColumnarValue::values_to_arrays(&args.args)?;
+        let [arg1, arg2] = take_function_args(self.name(), args)?;
+
+        // In a real function, you would use a library such as `request` here 
to

Review Comment:
   request -> reqwest ?



##########
datafusion-examples/examples/async_udf.rs:
##########
@@ -249,19 +172,62 @@ impl ScalarUDFImpl for AsyncEqual {
         Ok(DataType::Boolean)
     }
 
+    /// Since this is an async UDF, the `invoke_with_args` method will not be
+    /// called directly.
     fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
-        not_impl_err!("AsyncEqual can only be called from async contexts")
+        not_impl_err!("AskLLM can only be called from async contexts")
     }
 }
 
+/// In addition to [`ScalarUDFImpl`], we also need to implement the
+/// [`AsyncScalarUDFImpl`] trait.
 #[async_trait]
-impl AsyncScalarUDFImpl for AsyncEqual {
+impl AsyncScalarUDFImpl for AskLLM {
+    /// The `invoke_async_with_args` method is similar to `invoke_with_args`,
+    /// but it returns a `Future` that resolves to the result.
+    ///
+    /// Since this signature is `async`, it can do any `async` operations, such
+    /// as network requests. This method is run on the same tokio `Runtime` 
that
+    /// is processing the query, so you may wish to make actual network 
requests
+    /// on a different `Runtime`, as explained in the `thread_pools.rs` example
+    /// in this directory.
     async fn invoke_async_with_args(
         &self,
         args: ScalarFunctionArgs,
         _option: &ConfigOptions,
     ) -> Result<ArrayRef> {
-        let [arg1, arg2] = take_function_args(self.name(), &args.args)?;
-        apply_cmp(arg1, arg2, eq)?.to_array(args.number_rows)
+        // in a real UDF you would likely want to special case constant
+        // arguments to improve performance, but this example converts the
+        // arguments to arrays for simplicity.
+        let args = ColumnarValue::values_to_arrays(&args.args)?;
+        let [arg1, arg2] = take_function_args(self.name(), args)?;

Review Comment:
   Can we give arg1 and arg2 better names?



##########
docs/source/library-user-guide/functions/adding-udfs.md:
##########
@@ -345,12 +354,17 @@ async fn main() {
 }
 ```
 
-## Adding a Scalar Async UDF
+## Adding a Async Scalar UDF
 
-A Scalar Async UDF allows you to implement user-defined functions that support
+An `Async` Scalar UDF allows you to implement user-defined functions that 
support
 asynchronous execution, such as performing network or I/O operations within the
 UDF.
 
+:::{note}
+DataFusion `49.0.0`
+: This feature requires DataFusion version `49.0.0` or later.

Review Comment:
   uh oh, no! This note might be useful today, but useless in the future.
   Docs are always for the current version.
   Release changelog is where version chaos is sorted out.
   
   (I like when API docs document "since version" for every API element, but I 
assume that's always automated and applied consistently to all features, and 
sub-components, like new methods added later)



##########
docs/source/library-user-guide/functions/adding-udfs.md:
##########
@@ -345,12 +354,17 @@ async fn main() {
 }
 ```
 
-## Adding a Scalar Async UDF
+## Adding a Async Scalar UDF
 
-A Scalar Async UDF allows you to implement user-defined functions that support
+An `Async` Scalar UDF allows you to implement user-defined functions that 
support

Review Comment:
   I don't think Async in this context should be in monospace.
   It does not refer to Rust `async` keyword or any name exported by DataFusion 
library.
   It's just part of "Async Scalar UDF" - a feature name.



##########
docs/source/library-user-guide/functions/adding-udfs.md:
##########
@@ -434,13 +448,17 @@ impl AsyncScalarUDFImpl for AsyncUpper {
         Some(10)
     }
 
+    /// This method is called to execute the async UDF and is similar
+    /// to the normal `invoke_with_args` except it returns an `ArrayRef`
+    /// instead of `ColumnarValue` and is `async`.

Review Comment:
   It's obviously async, but why ArrayRef instead of ColumnarValue?
   Is this inconsistency warranted?



##########
docs/source/library-user-guide/functions/adding-udfs.md:
##########
@@ -419,6 +432,7 @@ impl ScalarUDFImpl for AsyncUpper {
         Ok(DataType::Utf8)
     }
 
+    // Note the normal invoke_with_args method is not called for Async UDFs

Review Comment:
   i wish it didn't exist :( 



##########
datafusion-examples/examples/async_udf.rs:
##########
@@ -249,19 +172,62 @@ impl ScalarUDFImpl for AsyncEqual {
         Ok(DataType::Boolean)
     }
 
+    /// Since this is an async UDF, the `invoke_with_args` method will not be
+    /// called directly.
     fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
-        not_impl_err!("AsyncEqual can only be called from async contexts")
+        not_impl_err!("AskLLM can only be called from async contexts")
     }
 }
 
+/// In addition to [`ScalarUDFImpl`], we also need to implement the
+/// [`AsyncScalarUDFImpl`] trait.
 #[async_trait]
-impl AsyncScalarUDFImpl for AsyncEqual {
+impl AsyncScalarUDFImpl for AskLLM {
+    /// The `invoke_async_with_args` method is similar to `invoke_with_args`,
+    /// but it returns a `Future` that resolves to the result.
+    ///
+    /// Since this signature is `async`, it can do any `async` operations, such
+    /// as network requests. This method is run on the same tokio `Runtime` 
that
+    /// is processing the query, so you may wish to make actual network 
requests
+    /// on a different `Runtime`, as explained in the `thread_pools.rs` example
+    /// in this directory.
     async fn invoke_async_with_args(
         &self,
         args: ScalarFunctionArgs,
         _option: &ConfigOptions,
     ) -> Result<ArrayRef> {
-        let [arg1, arg2] = take_function_args(self.name(), &args.args)?;
-        apply_cmp(arg1, arg2, eq)?.to_array(args.number_rows)
+        // in a real UDF you would likely want to special case constant
+        // arguments to improve performance, but this example converts the
+        // arguments to arrays for simplicity.
+        let args = ColumnarValue::values_to_arrays(&args.args)?;
+        let [arg1, arg2] = take_function_args(self.name(), args)?;
+
+        // In a real function, you would use a library such as `request` here 
to
+        // make an async HTTP request. Credentials and other configurations can
+        // be supplied via the `ConfigOptions` parameter.
+
+        // In this example, we will simulate the LLM response by comparing the 
two
+        // input arguments using some static strings
+        let results: BooleanArray = arg1
+            .as_string_view()
+            .iter()
+            .zip(arg2.as_string_view().iter())

Review Comment:
   Here and above -- should use 
`datafusion::common::cast::as_string_view_array` instead of panicing on type 
mismatch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to