Copilot commented on code in PR #20039:
URL: https://github.com/apache/datafusion/pull/20039#discussion_r2735052735


##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -712,20 +715,89 @@ impl ListingTable {
             });
         };
         // list files (with partitions)
-        let file_list = 
future::try_join_all(self.table_paths.iter().map(|table_path| {
-            pruned_partition_list(
-                ctx,
-                store.as_ref(),
-                table_path,
-                filters,
-                &self.options.file_extension,
-                &self.options.table_partition_cols,
-            )
-        }))
-        .await?;
-        let meta_fetch_concurrency =
-            ctx.config_options().execution.meta_fetch_concurrency;
-        let file_list = 
stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
+        // For non-WASM targets, use parallel execution with JoinSet.
+        // Note: This implementation collects files into memory per table_path 
rather than
+        // streaming lazily. This is a trade-off required because JoinSet 
tasks need 'static
+        // lifetime, which prevents returning borrowed streams directly. For 
most use cases,
+        // the parallelization benefit outweighs the temporary memory 
overhead. The WASM
+        // fallback below preserves streaming behavior for memory-constrained 
environments.
+        #[cfg(not(target_arch = "wasm32"))]
+        let file_list = {
+            let mut join_set = JoinSet::new();
+            let config = ctx.config_options().clone();
+            let runtime_env = Arc::clone(ctx.runtime_env());
+            let file_extension = self.options.file_extension.clone();

Review Comment:
   This PR is described as fixing nested async UDF execution, but it also 
introduces a fairly broad refactor of listing-table file discovery (API changes 
in `ListingTableUrl::list_*` / `pruned_partition_list`) and changes listing 
concurrency behavior (JoinSet).
   
   If these changes aren’t strictly required for the async UDF fix, please 
consider splitting them into a separate PR (or documenting why they’re coupled) 
to keep the review surface and risk contained.



##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -712,20 +715,89 @@ impl ListingTable {
             });
         };
         // list files (with partitions)
-        let file_list = 
future::try_join_all(self.table_paths.iter().map(|table_path| {
-            pruned_partition_list(
-                ctx,
-                store.as_ref(),
-                table_path,
-                filters,
-                &self.options.file_extension,
-                &self.options.table_partition_cols,
-            )
-        }))
-        .await?;
-        let meta_fetch_concurrency =
-            ctx.config_options().execution.meta_fetch_concurrency;
-        let file_list = 
stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
+        // For non-WASM targets, use parallel execution with JoinSet.
+        // Note: This implementation collects files into memory per table_path 
rather than
+        // streaming lazily. This is a trade-off required because JoinSet 
tasks need 'static
+        // lifetime, which prevents returning borrowed streams directly. For 
most use cases,
+        // the parallelization benefit outweighs the temporary memory 
overhead. The WASM
+        // fallback below preserves streaming behavior for memory-constrained 
environments.
+        #[cfg(not(target_arch = "wasm32"))]
+        let file_list = {
+            let mut join_set = JoinSet::new();
+            let config = ctx.config_options().clone();
+            let runtime_env = Arc::clone(ctx.runtime_env());
+            let file_extension = self.options.file_extension.clone();
+            let partition_cols = self.options.table_partition_cols.clone();
+            let filters = filters.to_vec();
+
+            for table_path in &self.table_paths {
+                let store = Arc::clone(&store);
+                let table_path = table_path.clone();
+                let config = config.clone();
+                let runtime_env = Arc::clone(&runtime_env);
+                let file_extension = file_extension.clone();
+                let partition_cols = partition_cols.clone();
+                let filters = filters.clone();

Review Comment:
   The JoinSet branch clones the entire `ConfigOptions` (`let config = 
ctx.config_options().clone()`) and then clones it again for every spawned task. 
`ConfigOptions` can be fairly large, so this can add unnecessary 
allocation/copy overhead.
   
   Consider wrapping the cloned `ConfigOptions` in an `Arc` once and cloning 
the `Arc` into tasks (passing `&*arc_config`), or otherwise sharing immutable 
config across tasks without repeated deep clones.



##########
datafusion/physical-plan/src/async_func.rs:
##########
@@ -216,14 +240,32 @@ impl ExecutionPlan for AsyncFuncExec {
             async move {
                 let batch = batch?;
                 // append the result of evaluating the async expressions to 
the output
-                let mut output_arrays = batch.columns().to_vec();
-                for async_expr in async_exprs_captured.iter() {
+                // We must evaluate them in order, adding the results to the 
batch
+                // so that subsequent async expressions can access the results 
of previous ones
+                let mut output_arrays = 
Vec::with_capacity(async_exprs_captured.len());
+                let input_columns = batch.columns().len();
+                
+                for (i, async_expr) in async_exprs_captured.iter().enumerate() 
{
+                    // Create a batch with the input columns and the async 
columns evaluated so far
+                    // We need to construct a schema for this intermediate 
batch
+                    let current_schema_fields: Vec<_> = 
schema_captured.fields().iter().take(input_columns + i).cloned().collect();
+                    let current_schema = 
Arc::new(Schema::new(current_schema_fields));
+                    
+                    let mut current_columns = batch.columns().to_vec();
+                    current_columns.extend_from_slice(&output_arrays);
+                    
+                    let current_batch = RecordBatch::try_new(current_schema, 
current_columns)?;

Review Comment:
   `AsyncFuncExec::execute` now rebuilds an intermediate `Schema` and re-clones 
the input columns vector on every async expression (`current_schema_fields` + 
`current_columns`). This makes per-batch work grow roughly O(#async_exprs^2) 
and can be a noticeable regression for projections with multiple async UDFs.
   
   Consider maintaining an incremental `current_fields` / `current_columns` 
(starting from the input batch) and appending each evaluated async column 
in-place, creating the next intermediate `RecordBatch` from those 
incrementally-updated structures. This avoids repeatedly copying the input 
columns and re-collecting schema fields each iteration.



##########
datafusion/physical-plan/src/async_func.rs:
##########
@@ -74,10 +80,28 @@ impl AsyncFuncExec {
             .collect();
 
         let schema = Arc::new(Schema::new(fields));
-        let tuples = async_exprs
-            .iter()
-            .map(|expr| (Arc::clone(&expr.func), expr.name().to_string()))
-            .collect::<Vec<_>>();
+        
+        // Only include expressions that map to input columns in the 
ProjectionMapping
+        // Expressions referencing newly created async columns cannot be 
verified against input schema
+        let input_len = input.schema().fields().len();
+        let mut tuples = Vec::new();
+        for expr in &async_exprs {
+             let mut refers_to_new_cols = false;
+             expr.func.apply(&mut |e: &Arc<dyn PhysicalExpr>| {
+                 if let Some(col) = e.as_any().downcast_ref::<Column>() {
+
+                     if col.index() >= input_len {
+                         refers_to_new_cols = true;
+                     }
+                 }
+                 Ok(TreeNodeRecursion::Continue)
+             })?;
+             

Review Comment:
   The `ProjectionMapping` construction scans each `expr.func` tree to detect 
references to newly-created async columns, but it never short-circuits once 
such a column is found. Since this runs for every async expression, it can add 
unnecessary traversal overhead.
   
   You can return `TreeNodeRecursion::Stop` once `refers_to_new_cols` becomes 
`true` (or check the flag early in the visitor) to avoid walking the rest of 
the subtree.
   ```suggestion
                    // Short-circuit traversal once we know this expression
                    // refers to newly created async columns
                    if refers_to_new_cols {
                        return Ok(TreeNodeRecursion::Stop);
                    }
   
                    if let Some(col) = e.as_any().downcast_ref::<Column>() {
                        if col.index() >= input_len {
                            refers_to_new_cols = true;
                            return Ok(TreeNodeRecursion::Stop);
                        }
                    }
   
                    Ok(TreeNodeRecursion::Continue)
                })?;
   ```



##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -712,20 +715,89 @@ impl ListingTable {
             });
         };
         // list files (with partitions)
-        let file_list = 
future::try_join_all(self.table_paths.iter().map(|table_path| {
-            pruned_partition_list(
-                ctx,
-                store.as_ref(),
-                table_path,
-                filters,
-                &self.options.file_extension,
-                &self.options.table_partition_cols,
-            )
-        }))
-        .await?;
-        let meta_fetch_concurrency =
-            ctx.config_options().execution.meta_fetch_concurrency;
-        let file_list = 
stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
+        // For non-WASM targets, use parallel execution with JoinSet.
+        // Note: This implementation collects files into memory per table_path 
rather than
+        // streaming lazily. This is a trade-off required because JoinSet 
tasks need 'static
+        // lifetime, which prevents returning borrowed streams directly. For 
most use cases,
+        // the parallelization benefit outweighs the temporary memory 
overhead. The WASM
+        // fallback below preserves streaming behavior for memory-constrained 
environments.
+        #[cfg(not(target_arch = "wasm32"))]
+        let file_list = {
+            let mut join_set = JoinSet::new();
+            let config = ctx.config_options().clone();
+            let runtime_env = Arc::clone(ctx.runtime_env());
+            let file_extension = self.options.file_extension.clone();
+            let partition_cols = self.options.table_partition_cols.clone();
+            let filters = filters.to_vec();
+
+            for table_path in &self.table_paths {
+                let store = Arc::clone(&store);
+                let table_path = table_path.clone();
+                let config = config.clone();
+                let runtime_env = Arc::clone(&runtime_env);
+                let file_extension = file_extension.clone();
+                let partition_cols = partition_cols.clone();
+                let filters = filters.clone();
+
+                join_set.spawn(async move {
+                    let stream = pruned_partition_list(
+                        &config,
+                        &runtime_env,
+                        store.as_ref(),
+                        &table_path,
+                        &filters,
+                        &file_extension,
+                        &partition_cols,
+                    )
+                    .await?;
+                    stream.try_collect::<Vec<_>>().await
+                });
+            }

Review Comment:
   The new JoinSet-based implementation spawns one task per `table_path` with 
no explicit bound. With many table paths this can oversubscribe the runtime 
and/or hammer the object store.
   
   Consider bounding the number of concurrent `pruned_partition_list` tasks 
(e.g., using a semaphore / `buffer_unordered`, or reuse an existing concurrency 
setting) so listing parallelism remains controlled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to