Copilot commented on code in PR #20023:
URL: https://github.com/apache/datafusion/pull/20023#discussion_r2731215015


##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -712,20 +715,84 @@ impl ListingTable {
             });
         };
         // list files (with partitions)
-        let file_list = 
future::try_join_all(self.table_paths.iter().map(|table_path| {
-            pruned_partition_list(
-                ctx,
-                store.as_ref(),
-                table_path,
-                filters,
-                &self.options.file_extension,
-                &self.options.table_partition_cols,
-            )
-        }))
-        .await?;
-        let meta_fetch_concurrency =
-            ctx.config_options().execution.meta_fetch_concurrency;
-        let file_list = 
stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
+        // For non-WASM targets, use parallel execution with JoinSet
+        #[cfg(not(target_arch = "wasm32"))]
+        let file_list = {
+            let mut join_set = JoinSet::new();
+            let config = ctx.config_options().clone();
+            let runtime_env = Arc::clone(ctx.runtime_env());
+            let file_extension = self.options.file_extension.clone();
+            let partition_cols = self.options.table_partition_cols.clone();
+            let filters = filters.to_vec();
+
+            for table_path in &self.table_paths {
+                let store = Arc::clone(&store);
+                let table_path = table_path.clone();
+                let config = config.clone();
+                let runtime_env = Arc::clone(&runtime_env);
+                let file_extension = file_extension.clone();
+                let partition_cols = partition_cols.clone();
+                let filters = filters.clone();
+
+                join_set.spawn(async move {
+                    let stream = pruned_partition_list(
+                        &config,
+                        &runtime_env,
+                        store.as_ref(),
+                        &table_path,
+                        &filters,
+                        &file_extension,
+                        &partition_cols,
+                    )
+                    .await?;
+                    stream.try_collect::<Vec<_>>().await

Review Comment:
   This collects all files from the stream into memory before processing, which 
differs from the original behavior and the WASM version. The original code kept 
streams as streams and processed them lazily using `flatten_unordered`. This 
change could cause memory issues when scanning tables with a large number of 
files.
   
   The spawned task should return the stream itself (not the collected Vec) to 
maintain the streaming behavior. However, since JoinSet tasks require 'static 
lifetime and the stream has lifetime 'a, you'll need to either box the stream 
or collect it. Consider documenting this memory trade-off if collecting is 
intentional, or finding a way to preserve the streaming behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to