alamb commented on code in PR #16846: URL: https://github.com/apache/datafusion/pull/16846#discussion_r2222306119
########## datafusion-examples/examples/async_udf.rs: ########## @@ -15,104 +15,104 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{ArrayIter, ArrayRef, AsArray, Int64Array, RecordBatch, StringArray}; -use arrow::compute::kernels::cmp::eq; +//! This example shows how to create and use "Async UDFs" in DataFusion. +//! +//! Async UDFs allow you to perform asynchronous operations, such as +//! making network requests. This can be used for tasks like fetching +//! data from an external API such as a LLM service or an external database. + +use arrow::array::{ArrayRef, BooleanArray, Int64Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; use async_trait::async_trait; +use datafusion::assert_batches_eq; +use datafusion::common::cast::as_string_view_array; use datafusion::common::error::Result; -use datafusion::common::types::{logical_int64, logical_string}; +use datafusion::common::not_impl_err; use datafusion::common::utils::take_function_args; -use datafusion::common::{internal_err, not_impl_err}; use datafusion::config::ConfigOptions; +use datafusion::execution::SessionStateBuilder; use datafusion::logical_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl}; use datafusion::logical_expr::{ - ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, - TypeSignatureClass, Volatility, + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; -use datafusion::logical_expr_common::signature::Coercion; -use datafusion::physical_expr_common::datum::apply_cmp; -use datafusion::prelude::SessionContext; -use log::trace; +use datafusion::prelude::{SessionConfig, SessionContext}; use std::any::Any; use std::sync::Arc; #[tokio::main] async fn main() -> Result<()> { - let ctx: SessionContext = SessionContext::new(); - - let async_upper = AsyncUpper::new(); - let udf = AsyncScalarUDF::new(Arc::new(async_upper)); - ctx.register_udf(udf.into_scalar_udf()); - let async_equal = AsyncEqual::new(); + // Use a hard coded parallelism level of 4 so the explain plan + // is consistent across machines. + let config = SessionConfig::new().with_target_partitions(4); + let ctx = + SessionContext::from(SessionStateBuilder::new().with_config(config).build()); + + // Similarly to regular UDFs, you create an AsyncScalarUDF by implementing + // `AsyncScalarUDFImpl` and creating an instance of `AsyncScalarUDF`. + let async_equal = AskLLM::new(); Review Comment: 💯 I am not above using the latest shiny tech trend to advertise DataFusion :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org