This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push: new f7201cf09 Minor: consolidate parquet `custom_reader` integration test into parquet_exec (#4175) f7201cf09 is described below commit f7201cf09c3dd238805f37448189fe5ceae5f376 Author: Andrew Lamb <and...@nerdnetworks.org> AuthorDate: Wed Nov 16 07:36:34 2022 -0500 Minor: consolidate parquet `custom_reader` integration test into parquet_exec (#4175) * Minor: consolidate another parquet integration test into parquet_exec * Remove unecessary level of indent, old workaround --- datafusion/core/tests/custom_parquet_reader.rs | 264 ------------------------- datafusion/core/tests/parquet/custom_reader.rs | 250 +++++++++++++++++++++++ datafusion/core/tests/parquet/mod.rs | 1 + 3 files changed, 251 insertions(+), 264 deletions(-) diff --git a/datafusion/core/tests/custom_parquet_reader.rs b/datafusion/core/tests/custom_parquet_reader.rs deleted file mode 100644 index b48061331..000000000 --- a/datafusion/core/tests/custom_parquet_reader.rs +++ /dev/null @@ -1,264 +0,0 @@ -// TODO: Temporary workaround for https://github.com/apache/arrow-rs/issues/2372 (#3081) -#![allow(where_clauses_object_safety)] -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#[cfg(test)] -mod tests { - use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray}; - use arrow::datatypes::{Field, Schema}; - use arrow::record_batch::RecordBatch; - use bytes::Bytes; - use datafusion::assert_batches_sorted_eq; - use datafusion::config::ConfigOptions; - use datafusion::datasource::file_format::parquet::fetch_parquet_metadata; - use datafusion::datasource::listing::PartitionedFile; - use datafusion::datasource::object_store::ObjectStoreUrl; - use datafusion::physical_plan::file_format::{ - FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, - ParquetFileReaderFactory, - }; - use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; - use datafusion::physical_plan::{collect, Statistics}; - use datafusion::prelude::SessionContext; - use datafusion_common::DataFusionError; - use futures::future::BoxFuture; - use futures::{FutureExt, TryFutureExt}; - use object_store::memory::InMemory; - use object_store::path::Path; - use object_store::{ObjectMeta, ObjectStore}; - use parquet::arrow::async_reader::AsyncFileReader; - use parquet::arrow::ArrowWriter; - use parquet::errors::ParquetError; - use parquet::file::metadata::ParquetMetaData; - use std::io::Cursor; - use std::ops::Range; - use std::sync::Arc; - use std::time::SystemTime; - - const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata"; - - #[tokio::test] - async fn route_data_access_ops_to_parquet_file_reader_factory() { - let c1: ArrayRef = - Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); - let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); - let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); - - let batch = create_batch(vec![ - ("c1", c1.clone()), - ("c2", c2.clone()), - ("c3", c3.clone()), - ]); - - let file_schema = batch.schema().clone(); - let (in_memory_object_store, parquet_files_meta) = - store_parquet_in_memory(vec![batch]).await; - let file_groups = parquet_files_meta - .into_iter() - .map(|meta| PartitionedFile { - object_meta: meta, - partition_values: vec![], - range: None, - extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))), - }) - .collect(); - - // prepare the scan - let parquet_exec = ParquetExec::new( - FileScanConfig { - // just any url that doesn't point to in memory object store - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![file_groups], - file_schema, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), - output_ordering: None, - }, - None, - None, - ) - .with_parquet_file_reader_factory(Arc::new( - InMemoryParquetFileReaderFactory(Arc::clone(&in_memory_object_store)), - )); - - let session_ctx = SessionContext::new(); - - let task_ctx = session_ctx.task_ctx(); - let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); - - let expected = vec![ - "+-----+----+----+", - "| c1 | c2 | c3 |", - "+-----+----+----+", - "| Foo | 1 | 10 |", - "| | 2 | 20 |", - "| bar | | |", - "+-----+----+----+", - ]; - - assert_batches_sorted_eq!(expected, &read); - } - - #[derive(Debug)] - struct InMemoryParquetFileReaderFactory(Arc<dyn ObjectStore>); - - impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { - fn create_reader( - &self, - partition_index: usize, - file_meta: FileMeta, - metadata_size_hint: Option<usize>, - metrics: &ExecutionPlanMetricsSet, - ) -> Result<Box<dyn AsyncFileReader + Send>, DataFusionError> { - let metadata = file_meta - .extensions - .as_ref() - .expect("has user defined metadata"); - let metadata = metadata - .downcast_ref::<String>() - .expect("has string metadata"); - - assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]); - - let parquet_file_metrics = ParquetFileMetrics::new( - partition_index, - file_meta.location().as_ref(), - metrics, - ); - - Ok(Box::new(ParquetFileReader { - store: Arc::clone(&self.0), - meta: file_meta.object_meta, - metrics: parquet_file_metrics, - metadata_size_hint, - })) - } - } - - fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch { - columns.into_iter().fold( - RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), - |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()), - ) - } - - fn add_to_batch( - batch: &RecordBatch, - field_name: &str, - array: ArrayRef, - ) -> RecordBatch { - let mut fields = batch.schema().fields().clone(); - fields.push(Field::new(field_name, array.data_type().clone(), true)); - let schema = Arc::new(Schema::new(fields)); - - let mut columns = batch.columns().to_vec(); - columns.push(array); - RecordBatch::try_new(schema, columns).expect("error; creating record batch") - } - - async fn store_parquet_in_memory( - batches: Vec<RecordBatch>, - ) -> (Arc<dyn ObjectStore>, Vec<ObjectMeta>) { - let in_memory = InMemory::new(); - - let parquet_batches: Vec<(ObjectMeta, Bytes)> = batches - .into_iter() - .enumerate() - .map(|(offset, batch)| { - let mut buf = Vec::<u8>::with_capacity(32 * 1024); - let mut output = Cursor::new(&mut buf); - - let mut writer = ArrowWriter::try_new(&mut output, batch.schema(), None) - .expect("creating writer"); - - writer.write(&batch).expect("Writing batch"); - writer.close().unwrap(); - - let meta = ObjectMeta { - location: Path::parse(format!("file-{offset}.parquet")) - .expect("creating path"), - last_modified: chrono::DateTime::from(SystemTime::now()), - size: buf.len(), - }; - - (meta, Bytes::from(buf)) - }) - .collect(); - - let mut objects = Vec::with_capacity(parquet_batches.len()); - for (meta, bytes) in parquet_batches { - in_memory - .put(&meta.location, bytes) - .await - .expect("put parquet file into in memory object store"); - objects.push(meta); - } - - (Arc::new(in_memory), objects) - } - - /// Implements [`AsyncFileReader`] for a parquet file in object storage - struct ParquetFileReader { - store: Arc<dyn ObjectStore>, - meta: ObjectMeta, - metrics: ParquetFileMetrics, - metadata_size_hint: Option<usize>, - } - - impl AsyncFileReader for ParquetFileReader { - fn get_bytes( - &mut self, - range: Range<usize>, - ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> { - self.metrics.bytes_scanned.add(range.end - range.start); - - self.store - .get_range(&self.meta.location, range) - .map_err(|e| { - ParquetError::General(format!( - "AsyncChunkReader::get_bytes error: {}", - e - )) - }) - .boxed() - } - - fn get_metadata( - &mut self, - ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> { - Box::pin(async move { - let metadata = fetch_parquet_metadata( - self.store.as_ref(), - &self.meta, - self.metadata_size_hint, - ) - .await - .map_err(|e| { - ParquetError::General(format!( - "AsyncChunkReader::get_metadata error: {}", - e - )) - })?; - Ok(Arc::new(metadata)) - }) - } - } -} diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs new file mode 100644 index 000000000..8123badb1 --- /dev/null +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -0,0 +1,250 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray}; +use arrow::datatypes::{Field, Schema}; +use arrow::record_batch::RecordBatch; +use bytes::Bytes; +use datafusion::assert_batches_sorted_eq; +use datafusion::config::ConfigOptions; +use datafusion::datasource::file_format::parquet::fetch_parquet_metadata; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::physical_plan::file_format::{ + FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, +}; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion::physical_plan::{collect, Statistics}; +use datafusion::prelude::SessionContext; +use datafusion_common::DataFusionError; +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::{ObjectMeta, ObjectStore}; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::ArrowWriter; +use parquet::errors::ParquetError; +use parquet::file::metadata::ParquetMetaData; +use std::io::Cursor; +use std::ops::Range; +use std::sync::Arc; +use std::time::SystemTime; + +const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata"; + +#[tokio::test] +async fn route_data_access_ops_to_parquet_file_reader_factory() { + let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + let batch = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + let file_schema = batch.schema().clone(); + let (in_memory_object_store, parquet_files_meta) = + store_parquet_in_memory(vec![batch]).await; + let file_groups = parquet_files_meta + .into_iter() + .map(|meta| PartitionedFile { + object_meta: meta, + partition_values: vec![], + range: None, + extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))), + }) + .collect(); + + // prepare the scan + let parquet_exec = ParquetExec::new( + FileScanConfig { + // just any url that doesn't point to in memory object store + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![file_groups], + file_schema, + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), + output_ordering: None, + }, + None, + None, + ) + .with_parquet_file_reader_factory(Arc::new(InMemoryParquetFileReaderFactory( + Arc::clone(&in_memory_object_store), + ))); + + let session_ctx = SessionContext::new(); + + let task_ctx = session_ctx.task_ctx(); + let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); + + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "+-----+----+----+", + ]; + + assert_batches_sorted_eq!(expected, &read); +} + +#[derive(Debug)] +struct InMemoryParquetFileReaderFactory(Arc<dyn ObjectStore>); + +impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option<usize>, + metrics: &ExecutionPlanMetricsSet, + ) -> Result<Box<dyn AsyncFileReader + Send>, DataFusionError> { + let metadata = file_meta + .extensions + .as_ref() + .expect("has user defined metadata"); + let metadata = metadata + .downcast_ref::<String>() + .expect("has string metadata"); + + assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]); + + let parquet_file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + + Ok(Box::new(ParquetFileReader { + store: Arc::clone(&self.0), + meta: file_meta.object_meta, + metrics: parquet_file_metrics, + metadata_size_hint, + })) + } +} + +fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch { + columns.into_iter().fold( + RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()), + ) +} + +fn add_to_batch(batch: &RecordBatch, field_name: &str, array: ArrayRef) -> RecordBatch { + let mut fields = batch.schema().fields().clone(); + fields.push(Field::new(field_name, array.data_type().clone(), true)); + let schema = Arc::new(Schema::new(fields)); + + let mut columns = batch.columns().to_vec(); + columns.push(array); + RecordBatch::try_new(schema, columns).expect("error; creating record batch") +} + +async fn store_parquet_in_memory( + batches: Vec<RecordBatch>, +) -> (Arc<dyn ObjectStore>, Vec<ObjectMeta>) { + let in_memory = InMemory::new(); + + let parquet_batches: Vec<(ObjectMeta, Bytes)> = batches + .into_iter() + .enumerate() + .map(|(offset, batch)| { + let mut buf = Vec::<u8>::with_capacity(32 * 1024); + let mut output = Cursor::new(&mut buf); + + let mut writer = ArrowWriter::try_new(&mut output, batch.schema(), None) + .expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + + let meta = ObjectMeta { + location: Path::parse(format!("file-{offset}.parquet")) + .expect("creating path"), + last_modified: chrono::DateTime::from(SystemTime::now()), + size: buf.len(), + }; + + (meta, Bytes::from(buf)) + }) + .collect(); + + let mut objects = Vec::with_capacity(parquet_batches.len()); + for (meta, bytes) in parquet_batches { + in_memory + .put(&meta.location, bytes) + .await + .expect("put parquet file into in memory object store"); + objects.push(meta); + } + + (Arc::new(in_memory), objects) +} + +/// Implements [`AsyncFileReader`] for a parquet file in object storage +struct ParquetFileReader { + store: Arc<dyn ObjectStore>, + meta: ObjectMeta, + metrics: ParquetFileMetrics, + metadata_size_hint: Option<usize>, +} + +impl AsyncFileReader for ParquetFileReader { + fn get_bytes( + &mut self, + range: Range<usize>, + ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> { + self.metrics.bytes_scanned.add(range.end - range.start); + + self.store + .get_range(&self.meta.location, range) + .map_err(|e| { + ParquetError::General(format!("AsyncChunkReader::get_bytes error: {}", e)) + }) + .boxed() + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> { + Box::pin(async move { + let metadata = fetch_parquet_metadata( + self.store.as_ref(), + &self.meta, + self.metadata_size_hint, + ) + .await + .map_err(|e| { + ParquetError::General(format!( + "AsyncChunkReader::get_metadata error: {}", + e + )) + })?; + Ok(Arc::new(metadata)) + }) + } +} diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 00ca670e3..ab410bd76 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -16,6 +16,7 @@ // under the License. //! Parquet integration tests +mod custom_reader; mod filter_pushdown; mod page_pruning; mod row_group_pruning;