This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b8a3d6969 Parallel parquet schema inference (#6366)
4b8a3d6969 is described below

commit 4b8a3d696986738442d863d3bf0468cfb325c39a
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri May 19 11:38:10 2023 +0100

    Parallel parquet schema inference (#6366)
---
 datafusion/core/src/datasource/file_format/parquet.rs | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index f2780cd469..c2bccb6378 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -26,6 +26,7 @@ use async_trait::async_trait;
 use bytes::{BufMut, BytesMut};
 use datafusion_common::DataFusionError;
 use datafusion_physical_expr::PhysicalExpr;
+use futures::{StreamExt, TryStreamExt};
 use hashbrown::HashMap;
 use object_store::{ObjectMeta, ObjectStore};
 use parquet::arrow::parquet_to_arrow_schema;
@@ -151,12 +152,12 @@ impl FileFormat for ParquetFormat {
         store: &Arc<dyn ObjectStore>,
         objects: &[ObjectMeta],
     ) -> Result<SchemaRef> {
-        let mut schemas = Vec::with_capacity(objects.len());
-        for object in objects {
-            let schema =
-                fetch_schema(store.as_ref(), object, 
self.metadata_size_hint).await?;
-            schemas.push(schema)
-        }
+        let schemas: Vec<_> = futures::stream::iter(objects)
+            .map(|object| fetch_schema(store.as_ref(), object, 
self.metadata_size_hint))
+            .boxed() // Workaround 
https://github.com/rust-lang/rust/issues/64552
+            .buffered(32)
+            .try_collect()
+            .await?;
 
         let schema = if self.skip_metadata(state.config_options()) {
             Schema::try_merge(clear_metadata(schemas))

Reply via email to