This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 004a151e8 Cleanup parquet tests (#3116)
004a151e8 is described below
commit 004a151e8df711292062236f8a94c09b6e18ef47
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Nov 22 09:27:32 2022 +0000
Cleanup parquet tests (#3116)
---
parquet/src/file/writer.rs | 117 +++++++++++++++++-----------------------
parquet/tests/boolean_writer.rs | 89 ------------------------------
2 files changed, 49 insertions(+), 157 deletions(-)
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index b67bdccfe..2fe0b26e7 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -648,14 +648,15 @@ mod tests {
use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type};
use crate::column::page::PageReader;
use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
- use crate::data_type::Int32Type;
+ use crate::data_type::{BoolType, Int32Type};
+ use crate::file::reader::ChunkReader;
use crate::file::{
properties::{ReaderProperties, WriterProperties, WriterVersion},
reader::{FileReader, SerializedFileReader, SerializedPageReader},
statistics::{from_thrift, to_thrift, Statistics},
};
use crate::format::SortingColumn;
- use crate::record::RowAccessor;
+ use crate::record::{Row, RowAccessor};
use crate::schema::types::{ColumnDescriptor, ColumnPath};
use crate::util::memory::ByteBufferPtr;
@@ -1163,16 +1164,35 @@ mod tests {
assert_eq!(to_thrift(left.statistics()),
to_thrift(right.statistics()));
}
- /// File write-read roundtrip.
- /// `data` consists of arrays of values for each row group.
- fn test_file_roundtrip(
- file: File,
+ /// Tests roundtrip of i32 data written using `W` and read using `R`
+ fn test_roundtrip_i32<W, R>(
+ file: W,
data: Vec<Vec<i32>>,
- ) -> crate::format::FileMetaData {
+ ) -> crate::format::FileMetaData
+ where
+ W: Write,
+ R: ChunkReader + From<W> + 'static,
+ {
+ test_roundtrip::<W, R, Int32Type, _>(file, data, |r|
r.get_int(0).unwrap())
+ }
+
+ /// Tests roundtrip of data of type `D` written using `W` and read using
`R`
+ /// and the provided `values` function
+ fn test_roundtrip<W, R, D, F>(
+ mut file: W,
+ data: Vec<Vec<D::T>>,
+ value: F,
+ ) -> crate::format::FileMetaData
+ where
+ W: Write,
+ R: ChunkReader + From<W> + 'static,
+ D: DataType,
+ F: Fn(Row) -> D::T,
+ {
let schema = Arc::new(
types::Type::group_type_builder("schema")
.with_fields(&mut vec![Arc::new(
- types::Type::primitive_type_builder("col1", Type::INT32)
+ types::Type::primitive_type_builder("col1",
D::get_physical_type())
.with_repetition(Repetition::REQUIRED)
.build()
.unwrap(),
@@ -1181,16 +1201,15 @@ mod tests {
.unwrap(),
);
let props = Arc::new(WriterProperties::builder().build());
- let mut file_writer = assert_send(
- SerializedFileWriter::new(file.try_clone().unwrap(), schema,
props).unwrap(),
- );
+ let mut file_writer =
+ SerializedFileWriter::new(&mut file, schema, props).unwrap();
let mut rows: i64 = 0;
for (idx, subset) in data.iter().enumerate() {
let mut row_group_writer = file_writer.next_row_group().unwrap();
if let Some(mut writer) = row_group_writer.next_column().unwrap() {
rows += writer
- .typed::<Int32Type>()
+ .typed::<D>()
.write_batch(&subset[..], None, None)
.unwrap() as i64;
writer.close().unwrap();
@@ -1202,7 +1221,7 @@ mod tests {
}
let file_metadata = file_writer.close().unwrap();
- let reader = assert_send(SerializedFileReader::new(file).unwrap());
+ let reader = SerializedFileReader::new(R::from(file)).unwrap();
assert_eq!(reader.num_row_groups(), data.len());
assert_eq!(
reader.metadata().file_metadata().num_rows(),
@@ -1212,16 +1231,19 @@ mod tests {
for (i, item) in data.iter().enumerate().take(reader.num_row_groups())
{
let row_group_reader = reader.get_row_group(i).unwrap();
let iter = row_group_reader.get_row_iter(None).unwrap();
- let res = iter
- .map(|elem| elem.get_int(0).unwrap())
- .collect::<Vec<i32>>();
+ let res: Vec<_> = iter.map(&value).collect();
assert_eq!(res, *item);
}
file_metadata
}
- fn assert_send<T: Send>(t: T) -> T {
- t
+ /// File write-read roundtrip.
+ /// `data` consists of arrays of values for each row group.
+ fn test_file_roundtrip(
+ file: File,
+ data: Vec<Vec<i32>>,
+ ) -> crate::format::FileMetaData {
+ test_roundtrip_i32::<File, File>(file, data)
}
#[test]
@@ -1245,58 +1267,17 @@ mod tests {
}
fn test_bytes_roundtrip(data: Vec<Vec<i32>>) {
- let mut buffer = vec![];
-
- let schema = Arc::new(
- types::Type::group_type_builder("schema")
- .with_fields(&mut vec![Arc::new(
- types::Type::primitive_type_builder("col1", Type::INT32)
- .with_repetition(Repetition::REQUIRED)
- .build()
- .unwrap(),
- )])
- .build()
- .unwrap(),
- );
-
- let mut rows: i64 = 0;
- {
- let props = Arc::new(WriterProperties::builder().build());
- let mut writer =
- SerializedFileWriter::new(&mut buffer, schema, props).unwrap();
-
- for subset in &data {
- let mut row_group_writer = writer.next_row_group().unwrap();
- if let Some(mut writer) =
row_group_writer.next_column().unwrap() {
- rows += writer
- .typed::<Int32Type>()
- .write_batch(&subset[..], None, None)
- .unwrap() as i64;
-
- writer.close().unwrap();
- }
- row_group_writer.close().unwrap();
- }
- writer.close().unwrap();
- }
-
- let reading_cursor = Bytes::from(buffer);
- let reader = SerializedFileReader::new(reading_cursor).unwrap();
+ test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data);
+ }
- assert_eq!(reader.num_row_groups(), data.len());
- assert_eq!(
- reader.metadata().file_metadata().num_rows(),
- rows,
- "row count in metadata not equal to number of rows written"
+ #[test]
+ fn test_boolean_roundtrip() {
+ let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 ==
0).collect();
+ test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
+ Vec::with_capacity(1024),
+ vec![my_bool_values],
+ |r| r.get_bool(0).unwrap(),
);
- for (i, item) in data.iter().enumerate().take(reader.num_row_groups())
{
- let row_group_reader = reader.get_row_group(i).unwrap();
- let iter = row_group_reader.get_row_iter(None).unwrap();
- let res = iter
- .map(|elem| elem.get_int(0).unwrap())
- .collect::<Vec<i32>>();
- assert_eq!(res, *item);
- }
}
#[test]
diff --git a/parquet/tests/boolean_writer.rs b/parquet/tests/boolean_writer.rs
deleted file mode 100644
index 8c3d50d8f..000000000
--- a/parquet/tests/boolean_writer.rs
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use parquet::data_type::BoolType;
-use parquet::file::properties::WriterProperties;
-use parquet::file::reader::FileReader;
-use parquet::file::serialized_reader::SerializedFileReader;
-use parquet::file::writer::SerializedFileWriter;
-use parquet::schema::parser::parse_message_type;
-use std::fs;
-use std::path::Path;
-use std::sync::{mpsc, Arc};
-use std::thread;
-use std::time::Duration;
-
-#[test]
-fn it_writes_data_without_hanging() {
- let path = Path::new("it_writes_data_without_hanging.parquet");
-
- let message_type = "
- message BooleanType {
- REQUIRED BOOLEAN DIM0;
- }
-";
- let schema = Arc::new(parse_message_type(message_type).expect("parse
schema"));
- let props = Arc::new(WriterProperties::builder().build());
- let file = fs::File::create(path).expect("create file");
- let mut writer =
- SerializedFileWriter::new(file, schema, props).expect("create parquet
writer");
- for _group in 0..1 {
- let mut row_group_writer = writer.next_row_group().expect("get row
group writer");
- let values: Vec<i64> = vec![0; 2049];
- let my_bool_values: Vec<bool> = values
- .iter()
- .enumerate()
- .map(|(count, _x)| count % 2 == 0)
- .collect();
- while let Some(mut col_writer) =
- row_group_writer.next_column().expect("next column")
- {
- col_writer
- .typed::<BoolType>()
- .write_batch(&my_bool_values, None, None)
- .expect("writing bool column");
-
- col_writer.close().expect("close column");
- }
- let rg_md = row_group_writer.close().expect("close row group");
- println!("total rows written: {}", rg_md.num_rows());
- }
- writer.close().expect("close writer");
-
- let bytes = fs::read(path).expect("read file");
- assert_eq!(&bytes[0..4], &[b'P', b'A', b'R', b'1']);
-
- // Now that we have written our data and are happy with it, make
- // sure we can read it back in < 5 seconds...
- let (sender, receiver) = mpsc::channel();
- let _t = thread::spawn(move || {
- let file =
fs::File::open(Path::new("it_writes_data_without_hanging.parquet"))
- .expect("open file");
- let reader = SerializedFileReader::new(file).expect("get serialized
reader");
- let iter = reader.get_row_iter(None).expect("get iterator");
- for record in iter {
- println!("reading: {}", record);
- }
- println!("finished reading");
- if let Ok(()) = sender.send(true) {}
- });
- assert_ne!(
- Err(mpsc::RecvTimeoutError::Timeout),
- receiver.recv_timeout(Duration::from_millis(5000))
- );
- fs::remove_file("it_writes_data_without_hanging.parquet").expect("remove
file");
-}