This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4b8a3d6969 Parallel parquet schema inference (#6366)
4b8a3d6969 is described below
commit 4b8a3d696986738442d863d3bf0468cfb325c39a
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri May 19 11:38:10 2023 +0100
Parallel parquet schema inference (#6366)
---
datafusion/core/src/datasource/file_format/parquet.rs | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index f2780cd469..c2bccb6378 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -26,6 +26,7 @@ use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::PhysicalExpr;
+use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::parquet_to_arrow_schema;
@@ -151,12 +152,12 @@ impl FileFormat for ParquetFormat {
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
- let mut schemas = Vec::with_capacity(objects.len());
- for object in objects {
- let schema =
- fetch_schema(store.as_ref(), object,
self.metadata_size_hint).await?;
- schemas.push(schema)
- }
+ let schemas: Vec<_> = futures::stream::iter(objects)
+ .map(|object| fetch_schema(store.as_ref(), object,
self.metadata_size_hint))
+ .boxed() // Workaround
https://github.com/rust-lang/rust/issues/64552
+ .buffered(32)
+ .try_collect()
+ .await?;
let schema = if self.skip_metadata(state.config_options()) {
Schema::try_merge(clear_metadata(schemas))