Copilot commented on code in PR #20023:
URL: https://github.com/apache/datafusion/pull/20023#discussion_r2731215015
##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -712,20 +715,84 @@ impl ListingTable {
});
};
// list files (with partitions)
- let file_list =
future::try_join_all(self.table_paths.iter().map(|table_path| {
- pruned_partition_list(
- ctx,
- store.as_ref(),
- table_path,
- filters,
- &self.options.file_extension,
- &self.options.table_partition_cols,
- )
- }))
- .await?;
- let meta_fetch_concurrency =
- ctx.config_options().execution.meta_fetch_concurrency;
- let file_list =
stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
+ // For non-WASM targets, use parallel execution with JoinSet
+ #[cfg(not(target_arch = "wasm32"))]
+ let file_list = {
+ let mut join_set = JoinSet::new();
+ let config = ctx.config_options().clone();
+ let runtime_env = Arc::clone(ctx.runtime_env());
+ let file_extension = self.options.file_extension.clone();
+ let partition_cols = self.options.table_partition_cols.clone();
+ let filters = filters.to_vec();
+
+ for table_path in &self.table_paths {
+ let store = Arc::clone(&store);
+ let table_path = table_path.clone();
+ let config = config.clone();
+ let runtime_env = Arc::clone(&runtime_env);
+ let file_extension = file_extension.clone();
+ let partition_cols = partition_cols.clone();
+ let filters = filters.clone();
+
+ join_set.spawn(async move {
+ let stream = pruned_partition_list(
+ &config,
+ &runtime_env,
+ store.as_ref(),
+ &table_path,
+ &filters,
+ &file_extension,
+ &partition_cols,
+ )
+ .await?;
+ stream.try_collect::<Vec<_>>().await
Review Comment:
This collects all files from the stream into memory before processing, which
differs from the original behavior and the WASM version. The original code kept
streams as streams and processed them lazily using `flatten_unordered`. This
change could cause memory issues when scanning tables with a large number of
files.
The spawned task should return the stream itself (not the collected Vec) to
maintain the streaming behavior. However, since JoinSet tasks require 'static
lifetime and the stream has lifetime 'a, you'll need to either box the stream
or collect it. Consider documenting this memory trade-off if collecting is
intentional, or finding a way to preserve the streaming behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]