alamb commented on code in PR #3370: URL: https://github.com/apache/arrow-rs/pull/3370#discussion_r1052472004
########## parquet/src/arrow/async_reader/store.rs: ########## @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ops::Range; +use std::sync::Arc; + +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; + +use object_store::{ObjectMeta, ObjectStore}; + +use crate::arrow::async_reader::{fetch_parquet_metadata, AsyncFileReader}; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::ParquetMetaData; + +/// Implements [`AsyncFileReader`] for a parquet file in object storage +pub struct ParquetObjectReader { + store: Arc<dyn ObjectStore>, + meta: ObjectMeta, + metadata_size_hint: Option<usize>, +} + +impl ParquetObjectReader { + /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`ObjectMeta`] + /// + /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`] Review Comment: 👍 ########## parquet/src/arrow/async_reader/metadata.rs: ########## @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::{ParquetError, Result}; +use crate::file::footer::{decode_footer, decode_metadata}; +use crate::file::metadata::ParquetMetaData; +use bytes::{BufMut, Bytes, BytesMut}; +use std::future::Future; +use std::ops::Range; + +/// Fetches parquet metadata using an async function that can fetch byte ranges Review Comment: ```suggestion /// Fetches parquet metadata /// /// Parameters: /// * fetch: an async function that can fetch byte ranges /// * file_size: the total size of the parquet file /// * footer_size_hint: footer prefetch size (see comments below) ``` ########## parquet/src/arrow/async_reader/store.rs: ########## @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ops::Range; +use std::sync::Arc; + +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; + +use object_store::{ObjectMeta, ObjectStore}; + +use crate::arrow::async_reader::{fetch_parquet_metadata, AsyncFileReader}; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::ParquetMetaData; + +/// Implements [`AsyncFileReader`] for a parquet file in object storage +pub struct ParquetObjectReader { + store: Arc<dyn ObjectStore>, + meta: ObjectMeta, + metadata_size_hint: Option<usize>, +} + +impl ParquetObjectReader { + /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`ObjectMeta`] + /// + /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`] + pub fn new(store: Arc<dyn ObjectStore>, meta: ObjectMeta) -> Self { + Self { + store, + meta, + metadata_size_hint: None, + } + } + + /// Provide a hint as to the size of the parquet file's footer, see [fetch_parquet_metadata] + pub fn with_footer_size_hint(self, hint: usize) -> Self { + Self { + metadata_size_hint: Some(hint), + ..self + } + } +} + +impl AsyncFileReader for ParquetObjectReader { + fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> { + self.store + .get_range(&self.meta.location, range) + .map_err(|e| { + ParquetError::General(format!("AsyncChunkReader::get_bytes error: {}", e)) + }) + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec<Range<usize>>, + ) -> BoxFuture<'_, Result<Vec<Bytes>>> + where + Self: Send, + { + async move { + self.store + .get_ranges(&self.meta.location, &ranges) + .await + .map_err(|e| { + ParquetError::General(format!( + "AsyncChunkReader::get_byte_ranges error: {}", Review Comment: ```suggestion "ParquetObjectReader::get_byte_ranges error: {}", ``` ########## parquet/src/arrow/async_reader/metadata.rs: ########## @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::{ParquetError, Result}; +use crate::file::footer::{decode_footer, decode_metadata}; +use crate::file::metadata::ParquetMetaData; +use bytes::{BufMut, Bytes, BytesMut}; +use std::future::Future; +use std::ops::Range; + +/// Fetches parquet metadata using an async function that can fetch byte ranges +/// +/// The length of the parquet footer, which contains file metadata, is not +/// known up front. Therefore this function will first issue a request to read +/// the last 8 bytes to determine the footer's precise length, before +/// issuing a second request to fetch the metadata bytes +/// +/// If a hint is set, this method will read the specified number of bytes +/// in the first request, instead of 8, and only issue a second request +/// if additional bytes are needed. This can therefore eliminate a +/// potentially costly additional fetch operation +pub async fn fetch_parquet_metadata<F, Fut>( + fetch: F, + file_size: usize, + footer_size_hint: Option<usize>, +) -> Result<ParquetMetaData> +where + F: Fn(Range<usize>) -> Fut, + Fut: Future<Output = Result<Bytes>>, +{ + if file_size < 8 { + return Err(ParquetError::EOF(format!( + "file size of {} is less than footer", + file_size + ))); + } + + // If a size hint is provided, read more than the minimum size + // to try and avoid a second fetch. + let footer_start = if let Some(size_hint) = footer_size_hint { + file_size.saturating_sub(size_hint) + } else { + file_size - 8 + }; + + let suffix = fetch(footer_start..file_size).await?; + let suffix_len = suffix.len(); + + let mut footer = [0; 8]; + footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]); + + let length = decode_footer(&footer)?; + + if file_size < length + 8 { + return Err(ParquetError::EOF(format!( + "file size of {} is less than footer + metadata {}", + file_size, + length + 8 + ))); + } + + // Did not fetch the entire file metadata in the initial read, need to make a second request + if length > suffix_len - 8 { + let metadata_start = file_size - length - 8; + let remaining_metadata = fetch(metadata_start..footer_start).await?; + + let mut metadata = BytesMut::with_capacity(length); + + metadata.put(remaining_metadata.as_ref()); + metadata.put(&suffix[..suffix_len - 8]); + + Ok(decode_metadata(metadata.as_ref())?) + } else { + let metadata_start = file_size - length - 8; + + Ok(decode_metadata( + &suffix[metadata_start - footer_start..suffix_len - 8], + )?) + } +} Review Comment: I think there should be tests for this code ########## object_store/src/lib.rs: ########## @@ -33,9 +33,18 @@ //! //! # Create an [`ObjectStore`] implementation: //! -//! * [Google Cloud Storage](https://cloud.google.com/storage/): [`GoogleCloudStorageBuilder`](gcp::GoogleCloudStorageBuilder) -//! * [Amazon S3](https://aws.amazon.com/s3/): [`AmazonS3Builder`](aws::AmazonS3Builder) -//! * [Azure Blob Storage](https://azure.microsoft.com/en-gb/services/storage/blobs/):: [`MicrosoftAzureBuilder`](azure::MicrosoftAzureBuilder) +#![cfg_attr( Review Comment: ```suggestion // This change is necessary because otherwise the docs build fails complaining about missing intradoc // links when compiling the docs for the parquet crate, as it doesn't specify all features for object_store, // and object_store enables this as an error #![cfg_attr( ``` ########## parquet/src/arrow/async_reader/store.rs: ########## @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ops::Range; +use std::sync::Arc; + +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; + +use object_store::{ObjectMeta, ObjectStore}; + +use crate::arrow::async_reader::{fetch_parquet_metadata, AsyncFileReader}; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::ParquetMetaData; + +/// Implements [`AsyncFileReader`] for a parquet file in object storage +pub struct ParquetObjectReader { + store: Arc<dyn ObjectStore>, + meta: ObjectMeta, + metadata_size_hint: Option<usize>, +} + +impl ParquetObjectReader { + /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`ObjectMeta`] + /// + /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`] + pub fn new(store: Arc<dyn ObjectStore>, meta: ObjectMeta) -> Self { + Self { + store, + meta, + metadata_size_hint: None, + } + } + + /// Provide a hint as to the size of the parquet file's footer, see [fetch_parquet_metadata] + pub fn with_footer_size_hint(self, hint: usize) -> Self { + Self { + metadata_size_hint: Some(hint), + ..self + } + } +} + +impl AsyncFileReader for ParquetObjectReader { + fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> { + self.store + .get_range(&self.meta.location, range) + .map_err(|e| { + ParquetError::General(format!("AsyncChunkReader::get_bytes error: {}", e)) + }) + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec<Range<usize>>, + ) -> BoxFuture<'_, Result<Vec<Bytes>>> + where + Self: Send, + { + async move { + self.store + .get_ranges(&self.meta.location, &ranges) + .await + .map_err(|e| { + ParquetError::General(format!( + "AsyncChunkReader::get_byte_ranges error: {}", + e + )) + }) + } + .boxed() + } + + fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> { + Box::pin(async move { + let metadata = fetch_parquet_metadata( + |range| { + self.store + .get_range(&self.meta.location, range) + .map_err(|e| { + ParquetError::General(format!( + "ParquetFileReader::get_metadata error: {}", + e + )) + }) + }, + self.meta.size, + self.metadata_size_hint, + ) + .await?; + Ok(Arc::new(metadata)) + }) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use futures::TryStreamExt; + + use arrow::util::test_util::parquet_test_data; + use object_store::local::LocalFileSystem; + use object_store::path::Path; + use object_store::ObjectStore; + + use crate::arrow::async_reader::ParquetObjectReader; + use crate::arrow::ParquetRecordBatchStreamBuilder; + + #[tokio::test] + async fn test_simple() { + let res = parquet_test_data(); + let store = LocalFileSystem::new_with_prefix(res).unwrap(); + + let meta = store + .head(&Path::from("alltypes_plain.parquet")) + .await + .unwrap(); + + let object_reader = ParquetObjectReader::new(Arc::new(store), meta); + let builder = ParquetRecordBatchStreamBuilder::new(object_reader) + .await + .unwrap(); + let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 8); + } +} Review Comment: I think a test or two for the error conditions would be worthwhile ########## parquet/src/arrow/async_reader/store.rs: ########## @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ops::Range; +use std::sync::Arc; + +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; + +use object_store::{ObjectMeta, ObjectStore}; + +use crate::arrow::async_reader::{fetch_parquet_metadata, AsyncFileReader}; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::ParquetMetaData; + +/// Implements [`AsyncFileReader`] for a parquet file in object storage +pub struct ParquetObjectReader { + store: Arc<dyn ObjectStore>, + meta: ObjectMeta, + metadata_size_hint: Option<usize>, +} + +impl ParquetObjectReader { + /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`ObjectMeta`] + /// + /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`] + pub fn new(store: Arc<dyn ObjectStore>, meta: ObjectMeta) -> Self { + Self { + store, + meta, + metadata_size_hint: None, + } + } + + /// Provide a hint as to the size of the parquet file's footer, see [fetch_parquet_metadata] + pub fn with_footer_size_hint(self, hint: usize) -> Self { + Self { + metadata_size_hint: Some(hint), + ..self + } + } +} + +impl AsyncFileReader for ParquetObjectReader { + fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> { + self.store + .get_range(&self.meta.location, range) + .map_err(|e| { + ParquetError::General(format!("AsyncChunkReader::get_bytes error: {}", e)) + }) + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec<Range<usize>>, + ) -> BoxFuture<'_, Result<Vec<Bytes>>> + where + Self: Send, + { + async move { + self.store + .get_ranges(&self.meta.location, &ranges) + .await + .map_err(|e| { + ParquetError::General(format!( + "AsyncChunkReader::get_byte_ranges error: {}", + e + )) + }) + } + .boxed() + } + + fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> { + Box::pin(async move { + let metadata = fetch_parquet_metadata( + |range| { + self.store + .get_range(&self.meta.location, range) + .map_err(|e| { + ParquetError::General(format!( + "ParquetFileReader::get_metadata error: {}", Review Comment: ```suggestion "ParquetObjectReader::get_metadata error: {}", ``` ########## parquet/src/arrow/async_reader/metadata.rs: ########## @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::{ParquetError, Result}; +use crate::file::footer::{decode_footer, decode_metadata}; +use crate::file::metadata::ParquetMetaData; +use bytes::{BufMut, Bytes, BytesMut}; +use std::future::Future; +use std::ops::Range; + +/// Fetches parquet metadata using an async function that can fetch byte ranges +/// +/// The length of the parquet footer, which contains file metadata, is not +/// known up front. Therefore this function will first issue a request to read +/// the last 8 bytes to determine the footer's precise length, before +/// issuing a second request to fetch the metadata bytes +/// +/// If a hint is set, this method will read the specified number of bytes +/// in the first request, instead of 8, and only issue a second request +/// if additional bytes are needed. This can therefore eliminate a +/// potentially costly additional fetch operation +pub async fn fetch_parquet_metadata<F, Fut>( + fetch: F, + file_size: usize, + footer_size_hint: Option<usize>, +) -> Result<ParquetMetaData> +where + F: Fn(Range<usize>) -> Fut, + Fut: Future<Output = Result<Bytes>>, +{ + if file_size < 8 { + return Err(ParquetError::EOF(format!( + "file size of {} is less than footer", + file_size + ))); + } + + // If a size hint is provided, read more than the minimum size + // to try and avoid a second fetch. + let footer_start = if let Some(size_hint) = footer_size_hint { + file_size.saturating_sub(size_hint) + } else { + file_size - 8 + }; + + let suffix = fetch(footer_start..file_size).await?; + let suffix_len = suffix.len(); + + let mut footer = [0; 8]; + footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]); + + let length = decode_footer(&footer)?; + + if file_size < length + 8 { + return Err(ParquetError::EOF(format!( + "file size of {} is less than footer + metadata {}", + file_size, + length + 8 + ))); + } + + // Did not fetch the entire file metadata in the initial read, need to make a second request + if length > suffix_len - 8 { + let metadata_start = file_size - length - 8; Review Comment: I wonder if `debug!` logging here would be useful ########## parquet/src/arrow/async_reader/store.rs: ########## @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ops::Range; +use std::sync::Arc; + +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; + +use object_store::{ObjectMeta, ObjectStore}; + +use crate::arrow::async_reader::{fetch_parquet_metadata, AsyncFileReader}; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::ParquetMetaData; + +/// Implements [`AsyncFileReader`] for a parquet file in object storage +pub struct ParquetObjectReader { Review Comment: I request you put a comment into https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/src/physical_plan/file_format/parquet.rs#L549 linking to this PR to track that we can remove the DataFusion copy after this is released ########## parquet/src/arrow/async_reader/metadata.rs: ########## @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::{ParquetError, Result}; +use crate::file::footer::{decode_footer, decode_metadata}; +use crate::file::metadata::ParquetMetaData; +use bytes::{BufMut, Bytes, BytesMut}; +use std::future::Future; +use std::ops::Range; + +/// Fetches parquet metadata using an async function that can fetch byte ranges +/// +/// The length of the parquet footer, which contains file metadata, is not +/// known up front. Therefore this function will first issue a request to read +/// the last 8 bytes to determine the footer's precise length, before +/// issuing a second request to fetch the metadata bytes +/// +/// If a hint is set, this method will read the specified number of bytes +/// in the first request, instead of 8, and only issue a second request +/// if additional bytes are needed. This can therefore eliminate a +/// potentially costly additional fetch operation +pub async fn fetch_parquet_metadata<F, Fut>( Review Comment: cc @thinkharderdev -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org