findepi commented on code in PR #16846:
URL: https://github.com/apache/datafusion/pull/16846#discussion_r2223254701


##########
datafusion-examples/examples/async_udf.rs:
##########
@@ -15,104 +15,104 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::array::{ArrayIter, ArrayRef, AsArray, Int64Array, RecordBatch, 
StringArray};
-use arrow::compute::kernels::cmp::eq;
+//! This example shows how to create and use "Async UDFs" in DataFusion.
+//!
+//! Async UDFs allow you to perform asynchronous operations, such as
+//! making network requests. This can be used for tasks like fetching
+//! data from an external API such as a LLM service or an external database.
+
+use arrow::array::{ArrayRef, BooleanArray, Int64Array, RecordBatch, 
StringArray};
 use arrow_schema::{DataType, Field, Schema};
 use async_trait::async_trait;
+use datafusion::assert_batches_eq;
+use datafusion::common::cast::as_string_view_array;
 use datafusion::common::error::Result;
-use datafusion::common::types::{logical_int64, logical_string};
+use datafusion::common::not_impl_err;
 use datafusion::common::utils::take_function_args;
-use datafusion::common::{internal_err, not_impl_err};
 use datafusion::config::ConfigOptions;
+use datafusion::execution::SessionStateBuilder;
 use datafusion::logical_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl};
 use datafusion::logical_expr::{
-    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
-    TypeSignatureClass, Volatility,
+    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
 };
-use datafusion::logical_expr_common::signature::Coercion;
-use datafusion::physical_expr_common::datum::apply_cmp;
-use datafusion::prelude::SessionContext;
-use log::trace;
+use datafusion::prelude::{SessionConfig, SessionContext};
 use std::any::Any;
 use std::sync::Arc;
 
 #[tokio::main]
 async fn main() -> Result<()> {
-    let ctx: SessionContext = SessionContext::new();
-
-    let async_upper = AsyncUpper::new();
-    let udf = AsyncScalarUDF::new(Arc::new(async_upper));
-    ctx.register_udf(udf.into_scalar_udf());
-    let async_equal = AsyncEqual::new();
+    // Use a hard coded parallelism level of 4 so the explain plan
+    // is consistent across machines.
+    let config = SessionConfig::new().with_target_partitions(4);
+    let ctx =
+        
SessionContext::from(SessionStateBuilder::new().with_config(config).build());
+
+    // Similarly to regular UDFs, you create an AsyncScalarUDF by implementing
+    // `AsyncScalarUDFImpl` and creating an instance of `AsyncScalarUDF`.
+    let async_equal = AskLLM::new();

Review Comment:
   I think this (external service calls) is actually a nice use-case for async 
functions. 
   It's good they allow for batching, since they are invoked on a batch of data 
(as everything in DF).
   Do async functions also allow interleaved execution of batches? I.e.  can 
next call to an async UDF start before the previous completed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to