Copilot commented on code in PR #20039:
URL: https://github.com/apache/datafusion/pull/20039#discussion_r2735052735
##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -712,20 +715,89 @@ impl ListingTable {
});
};
// list files (with partitions)
- let file_list =
future::try_join_all(self.table_paths.iter().map(|table_path| {
- pruned_partition_list(
- ctx,
- store.as_ref(),
- table_path,
- filters,
- &self.options.file_extension,
- &self.options.table_partition_cols,
- )
- }))
- .await?;
- let meta_fetch_concurrency =
- ctx.config_options().execution.meta_fetch_concurrency;
- let file_list =
stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
+ // For non-WASM targets, use parallel execution with JoinSet.
+ // Note: This implementation collects files into memory per table_path
rather than
+ // streaming lazily. This is a trade-off required because JoinSet
tasks need 'static
+ // lifetime, which prevents returning borrowed streams directly. For
most use cases,
+ // the parallelization benefit outweighs the temporary memory
overhead. The WASM
+ // fallback below preserves streaming behavior for memory-constrained
environments.
+ #[cfg(not(target_arch = "wasm32"))]
+ let file_list = {
+ let mut join_set = JoinSet::new();
+ let config = ctx.config_options().clone();
+ let runtime_env = Arc::clone(ctx.runtime_env());
+ let file_extension = self.options.file_extension.clone();
Review Comment:
This PR is described as fixing nested async UDF execution, but it also
introduces a fairly broad refactor of listing-table file discovery (API changes
in `ListingTableUrl::list_*` / `pruned_partition_list`) and changes listing
concurrency behavior (JoinSet).
If these changes aren’t strictly required for the async UDF fix, please
consider splitting them into a separate PR (or documenting why they’re coupled)
to keep the review surface and risk contained.
##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -712,20 +715,89 @@ impl ListingTable {
});
};
// list files (with partitions)
- let file_list =
future::try_join_all(self.table_paths.iter().map(|table_path| {
- pruned_partition_list(
- ctx,
- store.as_ref(),
- table_path,
- filters,
- &self.options.file_extension,
- &self.options.table_partition_cols,
- )
- }))
- .await?;
- let meta_fetch_concurrency =
- ctx.config_options().execution.meta_fetch_concurrency;
- let file_list =
stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
+ // For non-WASM targets, use parallel execution with JoinSet.
+ // Note: This implementation collects files into memory per table_path
rather than
+ // streaming lazily. This is a trade-off required because JoinSet
tasks need 'static
+ // lifetime, which prevents returning borrowed streams directly. For
most use cases,
+ // the parallelization benefit outweighs the temporary memory
overhead. The WASM
+ // fallback below preserves streaming behavior for memory-constrained
environments.
+ #[cfg(not(target_arch = "wasm32"))]
+ let file_list = {
+ let mut join_set = JoinSet::new();
+ let config = ctx.config_options().clone();
+ let runtime_env = Arc::clone(ctx.runtime_env());
+ let file_extension = self.options.file_extension.clone();
+ let partition_cols = self.options.table_partition_cols.clone();
+ let filters = filters.to_vec();
+
+ for table_path in &self.table_paths {
+ let store = Arc::clone(&store);
+ let table_path = table_path.clone();
+ let config = config.clone();
+ let runtime_env = Arc::clone(&runtime_env);
+ let file_extension = file_extension.clone();
+ let partition_cols = partition_cols.clone();
+ let filters = filters.clone();
Review Comment:
The JoinSet branch clones the entire `ConfigOptions` (`let config =
ctx.config_options().clone()`) and then clones it again for every spawned task.
`ConfigOptions` can be fairly large, so this can add unnecessary
allocation/copy overhead.
Consider wrapping the cloned `ConfigOptions` in an `Arc` once and cloning
the `Arc` into tasks (passing `&*arc_config`), or otherwise sharing immutable
config across tasks without repeated deep clones.
##########
datafusion/physical-plan/src/async_func.rs:
##########
@@ -216,14 +240,32 @@ impl ExecutionPlan for AsyncFuncExec {
async move {
let batch = batch?;
// append the result of evaluating the async expressions to
the output
- let mut output_arrays = batch.columns().to_vec();
- for async_expr in async_exprs_captured.iter() {
+ // We must evaluate them in order, adding the results to the
batch
+ // so that subsequent async expressions can access the results
of previous ones
+ let mut output_arrays =
Vec::with_capacity(async_exprs_captured.len());
+ let input_columns = batch.columns().len();
+
+ for (i, async_expr) in async_exprs_captured.iter().enumerate()
{
+ // Create a batch with the input columns and the async
columns evaluated so far
+ // We need to construct a schema for this intermediate
batch
+ let current_schema_fields: Vec<_> =
schema_captured.fields().iter().take(input_columns + i).cloned().collect();
+ let current_schema =
Arc::new(Schema::new(current_schema_fields));
+
+ let mut current_columns = batch.columns().to_vec();
+ current_columns.extend_from_slice(&output_arrays);
+
+ let current_batch = RecordBatch::try_new(current_schema,
current_columns)?;
Review Comment:
`AsyncFuncExec::execute` now rebuilds an intermediate `Schema` and re-clones
the input columns vector on every async expression (`current_schema_fields` +
`current_columns`). This makes per-batch work grow roughly O(#async_exprs^2)
and can be a noticeable regression for projections with multiple async UDFs.
Consider maintaining an incremental `current_fields` / `current_columns`
(starting from the input batch) and appending each evaluated async column
in-place, creating the next intermediate `RecordBatch` from those
incrementally-updated structures. This avoids repeatedly copying the input
columns and re-collecting schema fields each iteration.
##########
datafusion/physical-plan/src/async_func.rs:
##########
@@ -74,10 +80,28 @@ impl AsyncFuncExec {
.collect();
let schema = Arc::new(Schema::new(fields));
- let tuples = async_exprs
- .iter()
- .map(|expr| (Arc::clone(&expr.func), expr.name().to_string()))
- .collect::<Vec<_>>();
+
+ // Only include expressions that map to input columns in the
ProjectionMapping
+ // Expressions referencing newly created async columns cannot be
verified against input schema
+ let input_len = input.schema().fields().len();
+ let mut tuples = Vec::new();
+ for expr in &async_exprs {
+ let mut refers_to_new_cols = false;
+ expr.func.apply(&mut |e: &Arc<dyn PhysicalExpr>| {
+ if let Some(col) = e.as_any().downcast_ref::<Column>() {
+
+ if col.index() >= input_len {
+ refers_to_new_cols = true;
+ }
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })?;
+
Review Comment:
The `ProjectionMapping` construction scans each `expr.func` tree to detect
references to newly-created async columns, but it never short-circuits once
such a column is found. Since this runs for every async expression, it can add
unnecessary traversal overhead.
You can return `TreeNodeRecursion::Stop` once `refers_to_new_cols` becomes
`true` (or check the flag early in the visitor) to avoid walking the rest of
the subtree.
```suggestion
// Short-circuit traversal once we know this expression
// refers to newly created async columns
if refers_to_new_cols {
return Ok(TreeNodeRecursion::Stop);
}
if let Some(col) = e.as_any().downcast_ref::<Column>() {
if col.index() >= input_len {
refers_to_new_cols = true;
return Ok(TreeNodeRecursion::Stop);
}
}
Ok(TreeNodeRecursion::Continue)
})?;
```
##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -712,20 +715,89 @@ impl ListingTable {
});
};
// list files (with partitions)
- let file_list =
future::try_join_all(self.table_paths.iter().map(|table_path| {
- pruned_partition_list(
- ctx,
- store.as_ref(),
- table_path,
- filters,
- &self.options.file_extension,
- &self.options.table_partition_cols,
- )
- }))
- .await?;
- let meta_fetch_concurrency =
- ctx.config_options().execution.meta_fetch_concurrency;
- let file_list =
stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
+ // For non-WASM targets, use parallel execution with JoinSet.
+ // Note: This implementation collects files into memory per table_path
rather than
+ // streaming lazily. This is a trade-off required because JoinSet
tasks need 'static
+ // lifetime, which prevents returning borrowed streams directly. For
most use cases,
+ // the parallelization benefit outweighs the temporary memory
overhead. The WASM
+ // fallback below preserves streaming behavior for memory-constrained
environments.
+ #[cfg(not(target_arch = "wasm32"))]
+ let file_list = {
+ let mut join_set = JoinSet::new();
+ let config = ctx.config_options().clone();
+ let runtime_env = Arc::clone(ctx.runtime_env());
+ let file_extension = self.options.file_extension.clone();
+ let partition_cols = self.options.table_partition_cols.clone();
+ let filters = filters.to_vec();
+
+ for table_path in &self.table_paths {
+ let store = Arc::clone(&store);
+ let table_path = table_path.clone();
+ let config = config.clone();
+ let runtime_env = Arc::clone(&runtime_env);
+ let file_extension = file_extension.clone();
+ let partition_cols = partition_cols.clone();
+ let filters = filters.clone();
+
+ join_set.spawn(async move {
+ let stream = pruned_partition_list(
+ &config,
+ &runtime_env,
+ store.as_ref(),
+ &table_path,
+ &filters,
+ &file_extension,
+ &partition_cols,
+ )
+ .await?;
+ stream.try_collect::<Vec<_>>().await
+ });
+ }
Review Comment:
The new JoinSet-based implementation spawns one task per `table_path` with
no explicit bound. With many table paths this can oversubscribe the runtime
and/or hammer the object store.
Consider bounding the number of concurrent `pruned_partition_list` tasks
(e.g., using a semaphore / `buffer_unordered`, or reuse an existing concurrency
setting) so listing parallelism remains controlled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]