This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1bf7112171 Test for reading read statistics from parquet files without
statistics and boolean & struct data type (#10608)
1bf7112171 is described below
commit 1bf7112171fd820c101e325822dc4d44dd65b2ff
Author: Nga Tran <[email protected]>
AuthorDate: Wed May 22 15:21:19 2024 -0400
Test for reading read statistics from parquet files without statistics and
boolean & struct data type (#10608)
* read statistics from parquet without statistics
* tests for boolean and struct array
* link to a the struct array bug
* Make builder style API
* Port to builder style
* simlify
* Add column name to test
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/tests/parquet/arrow_statistics.rs | 430 +++++++++++++++-------
datafusion/core/tests/parquet/mod.rs | 71 +++-
2 files changed, 358 insertions(+), 143 deletions(-)
diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs
b/datafusion/core/tests/parquet/arrow_statistics.rs
index 432d109b22..36fffe5ac4 100644
--- a/datafusion/core/tests/parquet/arrow_statistics.rs
+++ b/datafusion/core/tests/parquet/arrow_statistics.rs
@@ -22,8 +22,9 @@ use std::fs::File;
use std::sync::Arc;
use arrow_array::{
- make_array, Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray,
Float64Array,
- Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, StringArray,
UInt64Array,
+ make_array, Array, ArrayRef, BooleanArray, Decimal128Array,
FixedSizeBinaryArray,
+ Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch,
+ StringArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::physical_plan::parquet::{
@@ -33,7 +34,7 @@ use parquet::arrow::arrow_reader::{ArrowReaderBuilder,
ParquetRecordBatchReaderB
use parquet::arrow::ArrowWriter;
use parquet::file::properties::{EnabledStatistics, WriterProperties};
-use crate::parquet::Scenario;
+use crate::parquet::{struct_array, Scenario};
use super::make_test_file_rg;
@@ -73,6 +74,28 @@ pub fn parquet_file_one_column(
no_null_values_start: i64,
no_null_values_end: i64,
row_per_group: usize,
+) -> ParquetRecordBatchReaderBuilder<File> {
+ parquet_file_one_column_stats(
+ num_null,
+ no_null_values_start,
+ no_null_values_end,
+ row_per_group,
+ EnabledStatistics::Chunk,
+ )
+}
+
+// Create a parquet file with one column for data type i64
+// Data of the file include
+// . Number of null rows is the given num_null
+// . There are non-null values in the range [no_null_values_start,
no_null_values_end], one value each row
+// . The file is divided into row groups of size row_per_group
+// . Statistics are enabled/disabled based on the given enable_stats
+pub fn parquet_file_one_column_stats(
+ num_null: usize,
+ no_null_values_start: i64,
+ no_null_values_end: i64,
+ row_per_group: usize,
+ enable_stats: EnabledStatistics,
) -> ParquetRecordBatchReaderBuilder<File> {
let mut output_file = tempfile::Builder::new()
.prefix("parquert_statistics_test")
@@ -82,7 +105,7 @@ pub fn parquet_file_one_column(
let props = WriterProperties::builder()
.set_max_row_group_size(row_per_group)
- .set_statistics_enabled(EnabledStatistics::Chunk)
+ .set_statistics_enabled(enable_stats)
.build();
let batches = vec![make_int64_batches_with_null(
@@ -107,40 +130,56 @@ pub fn parquet_file_one_column(
ArrowReaderBuilder::try_new(file).unwrap()
}
-// Create a parquet file with many columns each has different data type
-// - Data types are specified by the given scenario
-// - Row group sizes are withe the same or different depending on the
provided row_per_group & data created in the scenario
-pub async fn parquet_file_many_columns(
- scenario: super::Scenario,
+/// Defines what data to create in a parquet file
+#[derive(Debug, Clone, Copy)]
+struct TestReader {
+ /// What data to create in the parquet file
+ scenario: Scenario,
+ /// Number of rows per row group
row_per_group: usize,
-) -> ParquetRecordBatchReaderBuilder<File> {
- let file = make_test_file_rg(scenario, row_per_group).await;
+}
- // open the file & get the reader
- let file = file.reopen().unwrap();
- ArrowReaderBuilder::try_new(file).unwrap()
+impl TestReader {
+ /// Create a parquet file with the specified data, and return a
+ /// ParquetRecordBatchReaderBuilder opened to that file.
+ async fn build(self) -> ParquetRecordBatchReaderBuilder<File> {
+ let TestReader {
+ scenario,
+ row_per_group,
+ } = self;
+ let file = make_test_file_rg(scenario, row_per_group).await;
+
+ // open the file & get the reader
+ let file = file.reopen().unwrap();
+ ArrowReaderBuilder::try_new(file).unwrap()
+ }
}
+/// Defines a test case for statistics extraction
struct Test {
+ /// The parquet file reader
reader: ParquetRecordBatchReaderBuilder<File>,
expected_min: ArrayRef,
expected_max: ArrayRef,
expected_null_counts: UInt64Array,
expected_row_counts: UInt64Array,
+ /// Which column to extract statistics from
+ column_name: &'static str,
}
impl Test {
- fn run(self, col_name: &str) {
+ fn run(self) {
let Self {
reader,
expected_min,
expected_max,
expected_null_counts,
expected_row_counts,
+ column_name,
} = self;
let min = StatisticsConverter::try_new(
- col_name,
+ column_name,
RequestedStatistics::Min,
reader.schema(),
)
@@ -151,7 +190,7 @@ impl Test {
assert_eq!(&min, &expected_min, "Mismatch with expected minimums");
let max = StatisticsConverter::try_new(
- col_name,
+ column_name,
RequestedStatistics::Max,
reader.schema(),
)
@@ -161,7 +200,7 @@ impl Test {
assert_eq!(&max, &expected_max, "Mismatch with expected maximum");
let null_counts = StatisticsConverter::try_new(
- col_name,
+ column_name,
RequestedStatistics::NullCount,
reader.schema(),
)
@@ -181,17 +220,19 @@ impl Test {
);
}
- fn run_col_not_found(self, col_name: &str) {
+ /// Run the test and expect a column not found error
+ fn run_col_not_found(self) {
let Self {
reader,
expected_min: _,
expected_max: _,
expected_null_counts: _,
expected_row_counts: _,
+ column_name,
} = self;
let min = StatisticsConverter::try_new(
- col_name,
+ column_name,
RequestedStatistics::Min,
reader.schema(),
);
@@ -203,9 +244,7 @@ impl Test {
// TESTS
//
// Remaining cases
-// - Create parquet files / metadata with missing statistic values
-// - Create parquet files / metadata with different data types --
included but not all data types yet
-// - Create parquet files / metadata with different row group sizes -- done
+// f64::NAN
// - Using truncated statistics ("exact min value" and "exact max value"
https://docs.rs/parquet/latest/parquet/file/statistics/enum.Statistics.html#method.max_is_exact)
#[tokio::test]
@@ -222,8 +261,9 @@ async fn test_one_row_group_without_null() {
expected_null_counts: UInt64Array::from(vec![0]),
// 3 rows
expected_row_counts: UInt64Array::from(vec![3]),
+ column_name: "i64",
}
- .run("i64")
+ .run()
}
#[tokio::test]
@@ -241,8 +281,9 @@ async fn test_one_row_group_with_null_and_negative() {
expected_null_counts: UInt64Array::from(vec![2]),
// 8 rows
expected_row_counts: UInt64Array::from(vec![8]),
+ column_name: "i64",
}
- .run("i64")
+ .run()
}
#[tokio::test]
@@ -260,8 +301,9 @@ async fn test_two_row_group_with_null() {
expected_null_counts: UInt64Array::from(vec![0, 2]),
// row counts are [10, 5]
expected_row_counts: UInt64Array::from(vec![10, 5]),
+ column_name: "i64",
}
- .run("i64")
+ .run()
}
#[tokio::test]
@@ -279,8 +321,9 @@ async fn test_two_row_groups_with_all_nulls_in_one() {
expected_null_counts: UInt64Array::from(vec![1, 3]),
// row counts are [5, 3]
expected_row_counts: UInt64Array::from(vec![5, 3]),
+ column_name: "i64",
}
- .run("i64")
+ .run()
}
/////////////// MORE GENERAL TESTS //////////////////////
@@ -291,12 +334,14 @@ async fn test_two_row_groups_with_all_nulls_in_one() {
// Four different integer types
#[tokio::test]
async fn test_int_64() {
- let row_per_group = 5;
// This creates a parquet files of 4 columns named "i8", "i16", "i32",
"i64"
- let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await;
+ let reader = TestReader {
+ scenario: Scenario::Int,
+ row_per_group: 5,
+ };
Test {
- reader,
+ reader: reader.build().await,
// mins are [-5, -4, 0, 5]
expected_min: Arc::new(Int64Array::from(vec![-5, -4, 0, 5])),
// maxes are [-1, 0, 4, 9]
@@ -305,18 +350,21 @@ async fn test_int_64() {
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
// row counts are [5, 5, 5, 5]
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+ column_name: "i64",
}
- .run("i64");
+ .run();
}
#[tokio::test]
async fn test_int_32() {
- let row_per_group = 5;
// This creates a parquet files of 4 columns named "i8", "i16", "i32",
"i64"
- let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await;
+ let reader = TestReader {
+ scenario: Scenario::Int,
+ row_per_group: 5,
+ };
Test {
- reader,
+ reader: reader.build().await,
// mins are [-5, -4, 0, 5]
expected_min: Arc::new(Int32Array::from(vec![-5, -4, 0, 5])),
// maxes are [-1, 0, 4, 9]
@@ -325,8 +373,9 @@ async fn test_int_32() {
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
// row counts are [5, 5, 5, 5]
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+ column_name: "i32",
}
- .run("i32");
+ .run();
}
// BUG: ignore this test for now
@@ -337,12 +386,14 @@ async fn test_int_32() {
#[ignore]
#[tokio::test]
async fn test_int_16() {
- let row_per_group = 5;
// This creates a parquet files of 4 columns named "i8", "i16", "i32",
"i64"
- let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await;
+ let reader = TestReader {
+ scenario: Scenario::Int,
+ row_per_group: 5,
+ };
Test {
- reader,
+ reader: reader.build().await,
// mins are [-5, -4, 0, 5]
// BUG: not sure why this returns same data but in Int32Array type
even though I debugged and the columns name is "i16" an its data is Int16
// My debugging tells me the bug is either at:
@@ -361,8 +412,9 @@ async fn test_int_16() {
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
// row counts are [5, 5, 5, 5]
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+ column_name: "i16",
}
- .run("i16");
+ .run();
}
// BUG (same as above): ignore this test for now
@@ -370,12 +422,14 @@ async fn test_int_16() {
#[ignore]
#[tokio::test]
async fn test_int_8() {
- let row_per_group = 5;
// This creates a parquet files of 4 columns named "i8", "i16", "i32",
"i64"
- let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await;
+ let reader = TestReader {
+ scenario: Scenario::Int,
+ row_per_group: 5,
+ };
Test {
- reader,
+ reader: reader.build().await,
// mins are [-5, -4, 0, 5]
// BUG: not sure why this returns same data but in Int32Array even
though I debugged and the columns name is "i8" an its data is Int8
expected_min: Arc::new(Int8Array::from(vec![-5, -4, 0, 5])), // panic
here because the actual data is Int32Array
@@ -385,15 +439,14 @@ async fn test_int_8() {
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
// row counts are [5, 5, 5, 5]
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+ column_name: "i8",
}
- .run("i8");
+ .run();
}
// timestamp
#[tokio::test]
async fn test_timestamp() {
- let row_per_group = 5;
-
// This creates a parquet files of 5 columns named "nanos", "micros",
"millis", "seconds", "names"
// "nanos" --> TimestampNanosecondArray
// "micros" --> TimestampMicrosecondArray
@@ -403,10 +456,14 @@ async fn test_timestamp() {
//
// The file is created by 4 record batches, each has 5 rowws.
// Since the row group isze is set to 5, those 4 batches will go into 4
row groups
- let reader = parquet_file_many_columns(Scenario::Timestamps,
row_per_group).await;
+ // This creates a parquet files of 4 columns named "i8", "i16", "i32",
"i64"
+ let reader = TestReader {
+ scenario: Scenario::Timestamps,
+ row_per_group: 5,
+ };
Test {
- reader,
+ reader: reader.build().await,
// mins are [1577840461000000000, 1577840471000000000,
1577841061000000000, 1578704461000000000,]
expected_min: Arc::new(Int64Array::from(vec![
1577840461000000000,
@@ -425,13 +482,14 @@ async fn test_timestamp() {
expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
// row counts are [5, 5, 5, 5]
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+ column_name: "nanos",
}
- .run("nanos");
+ .run();
// micros
- let reader = parquet_file_many_columns(Scenario::Timestamps,
row_per_group).await;
+
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(Int64Array::from(vec![
1577840461000000,
1577840471000000,
@@ -446,13 +504,13 @@ async fn test_timestamp() {
])),
expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+ column_name: "micros",
}
- .run("micros");
+ .run();
// millis
- let reader = parquet_file_many_columns(Scenario::Timestamps,
row_per_group).await;
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(Int64Array::from(vec![
1577840461000,
1577840471000,
@@ -467,13 +525,13 @@ async fn test_timestamp() {
])),
expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+ column_name: "millis",
}
- .run("millis");
+ .run();
// seconds
- let reader = parquet_file_many_columns(Scenario::Timestamps,
row_per_group).await;
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(Int64Array::from(vec![
1577840461, 1577840471, 1577841061, 1578704461,
])),
@@ -482,15 +540,14 @@ async fn test_timestamp() {
])),
expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+ column_name: "seconds",
}
- .run("seconds");
+ .run();
}
// timestamp with different row group sizes
#[tokio::test]
async fn test_timestamp_diff_rg_sizes() {
- let row_per_group = 8;
-
// This creates a parquet files of 5 columns named "nanos", "micros",
"millis", "seconds", "names"
// "nanos" --> TimestampNanosecondArray
// "micros" --> TimestampMicrosecondArray
@@ -499,10 +556,13 @@ async fn test_timestamp_diff_rg_sizes() {
// "names" --> StringArray
//
// The file is created by 4 record batches (each has a null row), each has
5 rows but then will be split into 3 row groups with size 8, 8, 4
- let reader = parquet_file_many_columns(Scenario::Timestamps,
row_per_group).await;
+ let reader = TestReader {
+ scenario: Scenario::Timestamps,
+ row_per_group: 8, // note that the row group size is 8
+ };
Test {
- reader,
+ reader: reader.build().await,
// mins are [1577840461000000000, 1577841061000000000,
1578704521000000000]
expected_min: Arc::new(Int64Array::from(vec![
1577840461000000000,
@@ -519,13 +579,13 @@ async fn test_timestamp_diff_rg_sizes() {
expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
// row counts are [8, 8, 4]
expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
+ column_name: "nanos",
}
- .run("nanos");
+ .run();
// micros
- let reader = parquet_file_many_columns(Scenario::Timestamps,
row_per_group).await;
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(Int64Array::from(vec![
1577840461000000,
1577841061000000,
@@ -538,13 +598,13 @@ async fn test_timestamp_diff_rg_sizes() {
])),
expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
+ column_name: "micros",
}
- .run("micros");
+ .run();
// millis
- let reader = parquet_file_many_columns(Scenario::Timestamps,
row_per_group).await;
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(Int64Array::from(vec![
1577840461000,
1577841061000,
@@ -557,13 +617,13 @@ async fn test_timestamp_diff_rg_sizes() {
])),
expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
+ column_name: "millis",
}
- .run("millis");
+ .run();
// seconds
- let reader = parquet_file_many_columns(Scenario::Timestamps,
row_per_group).await;
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(Int64Array::from(vec![
1577840461, 1577841061, 1578704521,
])),
@@ -572,8 +632,9 @@ async fn test_timestamp_diff_rg_sizes() {
])),
expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
+ column_name: "seconds",
}
- .run("seconds");
+ .run();
}
// date with different row group sizes
@@ -581,18 +642,18 @@ async fn test_timestamp_diff_rg_sizes() {
// https://github.com/apache/datafusion/issues/10587
#[tokio::test]
async fn test_dates_32_diff_rg_sizes() {
- let row_per_group = 13;
-
// This creates a parquet files of 3 columns named "date32", "date64",
"names"
// "date32" --> Date32Array
// "date64" --> Date64Array
// "names" --> StringArray
//
// The file is created by 4 record batches (each has a null row), each has
5 rows but then will be split into 2 row groups with size 13, 7
- let reader = parquet_file_many_columns(Scenario::Dates,
row_per_group).await;
-
+ let reader = TestReader {
+ scenario: Scenario::Dates,
+ row_per_group: 13,
+ };
Test {
- reader,
+ reader: reader.build().await,
// mins are [18262, 18565,]
expected_min: Arc::new(Int32Array::from(vec![18262, 18565])),
// maxes are [18564, 21865,]
@@ -601,8 +662,9 @@ async fn test_dates_32_diff_rg_sizes() {
expected_null_counts: UInt64Array::from(vec![2, 2]),
// row counts are [13, 7]
expected_row_counts: UInt64Array::from(vec![13, 7]),
+ column_name: "date32",
}
- .run("date32");
+ .run();
}
// BUG: same as above. Expect to return Date64Array but returns Int32Array
@@ -611,25 +673,26 @@ async fn test_dates_32_diff_rg_sizes() {
#[ignore]
#[tokio::test]
async fn test_dates_64_diff_rg_sizes() {
- let row_per_group = 13;
// The file is created by 4 record batches (each has a null row), each has
5 rows but then will be split into 2 row groups with size 13, 7
- let reader = parquet_file_many_columns(Scenario::Dates,
row_per_group).await;
+ let reader = TestReader {
+ scenario: Scenario::Dates,
+ row_per_group: 13,
+ };
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(Int64Array::from(vec![18262, 18565])), // panic
here because the actual data is Int32Array
expected_max: Arc::new(Int64Array::from(vec![18564, 21865])),
expected_null_counts: UInt64Array::from(vec![2, 2]),
expected_row_counts: UInt64Array::from(vec![13, 7]),
+ column_name: "date64",
}
- .run("date64");
+ .run();
}
// BUG:
// https://github.com/apache/datafusion/issues/10604
#[tokio::test]
async fn test_uint() {
- let row_per_group = 4;
-
// This creates a parquet files of 4 columns named "u8", "u16", "u32",
"u64"
// "u8" --> UInt8Array
// "u16" --> UInt16Array
@@ -637,118 +700,133 @@ async fn test_uint() {
// "u64" --> UInt64Array
// The file is created by 4 record batches (each has a null row), each has
5 rows but then will be split into 5 row groups with size 4
- let reader = parquet_file_many_columns(Scenario::UInt,
row_per_group).await;
+ let reader = TestReader {
+ scenario: Scenario::UInt,
+ row_per_group: 4,
+ };
// u8
// BUG: expect UInt8Array but returns Int32Array
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), //
shoudld be UInt8Array
expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), //
shoudld be UInt8Array
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]),
+ column_name: "u8",
}
- .run("u8");
+ .run();
// u16
// BUG: expect UInt16Array but returns Int32Array
- let reader = parquet_file_many_columns(Scenario::UInt,
row_per_group).await;
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), //
shoudld be UInt16Array
expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), //
shoudld be UInt16Array
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]),
+ column_name: "u16",
}
- .run("u16");
+ .run();
// u32
// BUG: expect UInt32Array but returns Int32Array
- let reader = parquet_file_many_columns(Scenario::UInt,
row_per_group).await;
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), //
shoudld be UInt32Array
expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), //
shoudld be UInt32Array
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]),
+ column_name: "u32",
}
- .run("u32");
+ .run();
// u64
// BUG: expect UInt64rray but returns Int64Array
- let reader = parquet_file_many_columns(Scenario::UInt,
row_per_group).await;
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(Int64Array::from(vec![0, 1, 4, 7, 251])), //
shoudld be UInt64Array
expected_max: Arc::new(Int64Array::from(vec![3, 4, 6, 250, 254])), //
shoudld be UInt64Array
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]),
+ column_name: "u64",
}
- .run("u64");
+ .run();
}
#[tokio::test]
async fn test_int32_range() {
- let row_per_group = 5;
// This creates a parquet file of 1 column "i"
// file has 2 record batches, each has 2 rows. They will be saved into one
row group
- let reader = parquet_file_many_columns(Scenario::Int32Range,
row_per_group).await;
+ let reader = TestReader {
+ scenario: Scenario::Int32Range,
+ row_per_group: 5,
+ };
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(Int32Array::from(vec![0])),
expected_max: Arc::new(Int32Array::from(vec![300000])),
expected_null_counts: UInt64Array::from(vec![0]),
expected_row_counts: UInt64Array::from(vec![4]),
+ column_name: "i",
}
- .run("i");
+ .run();
}
// BUG: not convert UInt32Array to Int32Array
// https://github.com/apache/datafusion/issues/10604
#[tokio::test]
async fn test_uint32_range() {
- let row_per_group = 5;
// This creates a parquet file of 1 column "u"
// file has 2 record batches, each has 2 rows. They will be saved into one
row group
- let reader = parquet_file_many_columns(Scenario::UInt32Range,
row_per_group).await;
+ let reader = TestReader {
+ scenario: Scenario::UInt32Range,
+ row_per_group: 5,
+ };
Test {
- reader,
- expected_min: Arc::new(Int32Array::from(vec![0])), // shoudld be
UInt32Array
- expected_max: Arc::new(Int32Array::from(vec![300000])), // shoudld be
UInt32Array
+ reader: reader.build().await,
+ expected_min: Arc::new(Int32Array::from(vec![0])), // should be
UInt32Array
+ expected_max: Arc::new(Int32Array::from(vec![300000])), // should be
UInt32Array
expected_null_counts: UInt64Array::from(vec![0]),
expected_row_counts: UInt64Array::from(vec![4]),
+ column_name: "u",
}
- .run("u");
+ .run();
}
#[tokio::test]
async fn test_float64() {
- let row_per_group = 5;
// This creates a parquet file of 1 column "f"
// file has 4 record batches, each has 5 rows. They will be saved into 4
row groups
- let reader = parquet_file_many_columns(Scenario::Float64,
row_per_group).await;
+ let reader = TestReader {
+ scenario: Scenario::Float64,
+ row_per_group: 5,
+ };
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(Float64Array::from(vec![-5.0, -4.0, -0.0,
5.0])),
expected_max: Arc::new(Float64Array::from(vec![-1.0, 0.0, 4.0, 9.0])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+ column_name: "f",
}
- .run("f");
+ .run();
}
#[tokio::test]
async fn test_decimal() {
- let row_per_group = 5;
// This creates a parquet file of 1 column "decimal_col" with decimal data
type and precicion 9, scale 2
// file has 3 record batches, each has 5 rows. They will be saved into 3
row groups
- let reader = parquet_file_many_columns(Scenario::Decimal,
row_per_group).await;
+ let reader = TestReader {
+ scenario: Scenario::Decimal,
+ row_per_group: 5,
+ };
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(
Decimal128Array::from(vec![100, -500, 2000])
.with_precision_and_scale(9, 2)
@@ -761,16 +839,15 @@ async fn test_decimal() {
),
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
+ column_name: "decimal_col",
}
- .run("decimal_col");
+ .run();
}
// BUG: not convert BinaryArray to StringArray
// https://github.com/apache/datafusion/issues/10605
#[tokio::test]
async fn test_byte() {
- let row_per_group = 5;
-
// This creates a parquet file of 4 columns
// "name"
// "service_string"
@@ -778,11 +855,14 @@ async fn test_byte() {
// "service_fixedsize"
// file has 3 record batches, each has 5 rows. They will be saved into 3
row groups
- let reader = parquet_file_many_columns(Scenario::ByteArray,
row_per_group).await;
+ let reader = TestReader {
+ scenario: Scenario::ByteArray,
+ row_per_group: 5,
+ };
// column "name"
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(StringArray::from(vec![
"all frontends",
"mixed",
@@ -795,13 +875,13 @@ async fn test_byte() {
])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
+ column_name: "name",
}
- .run("name");
+ .run();
// column "service_string"
- let reader = parquet_file_many_columns(Scenario::ByteArray,
row_per_group).await;
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(StringArray::from(vec![
"frontend five",
"backend one",
@@ -814,13 +894,13 @@ async fn test_byte() {
])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
+ column_name: "service_string",
}
- .run("service_string");
+ .run();
// column "service_binary"
- let reader = parquet_file_many_columns(Scenario::ByteArray,
row_per_group).await;
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(StringArray::from(vec![
"frontend five",
"backend one",
@@ -833,17 +913,18 @@ async fn test_byte() {
])), // Shuld be BinaryArray
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
+ column_name: "service_binary",
}
- .run("service_binary");
+ .run();
// column "service_fixedsize"
// b"fe1", b"be1", b"be4"
let min_input = vec![vec![102, 101, 49], vec![98, 101, 49], vec![98, 101,
52]];
// b"fe5", b"fe6", b"be8"
let max_input = vec![vec![102, 101, 55], vec![102, 101, 54], vec![98, 101,
56]];
- let reader = parquet_file_many_columns(Scenario::ByteArray,
row_per_group).await;
+
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(
FixedSizeBinaryArray::try_from_iter(min_input.into_iter()).unwrap(),
),
@@ -852,22 +933,24 @@ async fn test_byte() {
),
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
+ column_name: "service_fixedsize",
}
- .run("service_fixedsize");
+ .run();
}
// PeriodsInColumnNames
#[tokio::test]
async fn test_period_in_column_names() {
- let row_per_group = 5;
// This creates a parquet file of 2 columns "name" and "service.name"
// file has 3 record batches, each has 5 rows. They will be saved into 3
row groups
- let reader =
- parquet_file_many_columns(Scenario::PeriodsInColumnNames,
row_per_group).await;
+ let reader = TestReader {
+ scenario: Scenario::PeriodsInColumnNames,
+ row_per_group: 5,
+ };
// column "name"
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(StringArray::from(vec![
"HTTP GET / DISPATCH",
"HTTP PUT / DISPATCH",
@@ -880,39 +963,102 @@ async fn test_period_in_column_names() {
])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
+ column_name: "name",
}
- .run("name");
+ .run();
// column "service.name"
- let reader =
- parquet_file_many_columns(Scenario::PeriodsInColumnNames,
row_per_group).await;
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(StringArray::from(vec!["frontend", "backend",
"backend"])),
expected_max: Arc::new(StringArray::from(vec![
"frontend", "frontend", "backend",
])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
+ column_name: "service.name",
}
- .run("service.name");
+ .run();
}
-// TODO:
-// WITHOUT Stats
+// Boolean
+#[tokio::test]
+async fn test_boolean() {
+ // This creates a parquet files of 1 column named "bool"
+ // The file is created by 2 record batches each has 5 rows --> 2 row groups
+ let reader = TestReader {
+ scenario: Scenario::Boolean,
+ row_per_group: 5,
+ };
+
+ Test {
+ reader: reader.build().await,
+ expected_min: Arc::new(BooleanArray::from(vec![false, false])),
+ expected_max: Arc::new(BooleanArray::from(vec![true, false])),
+ expected_null_counts: UInt64Array::from(vec![1, 0]),
+ expected_row_counts: UInt64Array::from(vec![5, 5]),
+ column_name: "bool",
+ }
+ .run();
+}
+
+// struct array
+// BUG
+// https://github.com/apache/datafusion/issues/10609
+// Note that: since I have not worked on struct array before, there may be a
bug in the test code rather than the real bug in the code
+#[ignore]
+#[tokio::test]
+async fn test_struct() {
+ // This creates a parquet files of 1 column named "struct"
+ // The file is created by 1 record batch with 3 rows in the struct array
+ let reader = TestReader {
+ scenario: Scenario::StructArray,
+ row_per_group: 5,
+ };
+ Test {
+ reader: reader.build().await,
+ expected_min: Arc::new(struct_array(vec![(Some(1), Some(6.0),
Some(12.0))])),
+ expected_max: Arc::new(struct_array(vec![(Some(2), Some(8.5),
Some(14.0))])),
+ expected_null_counts: UInt64Array::from(vec![0]),
+ expected_row_counts: UInt64Array::from(vec![3]),
+ column_name: "struct",
+ }
+ .run();
+}
+////// Files with missing statistics ///////
+
+#[tokio::test]
+async fn test_missing_statistics() {
+ let row_per_group = 5;
+ let reader =
+ parquet_file_one_column_stats(0, 4, 7, row_per_group,
EnabledStatistics::None);
+
+ Test {
+ reader,
+ expected_min: Arc::new(Int64Array::from(vec![None])),
+ expected_max: Arc::new(Int64Array::from(vec![None])),
+ expected_null_counts: UInt64Array::from(vec![None]),
+ expected_row_counts: UInt64Array::from(vec![3]), // stil has row count
statistics
+ column_name: "i64",
+ }
+ .run();
+}
/////// NEGATIVE TESTS ///////
// column not found
#[tokio::test]
async fn test_column_not_found() {
- let row_per_group = 5;
- let reader = parquet_file_many_columns(Scenario::Dates,
row_per_group).await;
+ let reader = TestReader {
+ scenario: Scenario::Dates,
+ row_per_group: 5,
+ };
Test {
- reader,
+ reader: reader.build().await,
expected_min: Arc::new(Int64Array::from(vec![18262, 18565])),
expected_max: Arc::new(Int64Array::from(vec![18564, 21865])),
expected_null_counts: UInt64Array::from(vec![2, 2]),
expected_row_counts: UInt64Array::from(vec![13, 7]),
+ column_name: "not_a_column",
}
- .run_col_not_found("not_a_column");
+ .run_col_not_found();
}
diff --git a/datafusion/core/tests/parquet/mod.rs
b/datafusion/core/tests/parquet/mod.rs
index bdc39c269d..c5d0ad60bc 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -28,7 +28,7 @@ use arrow::{
record_batch::RecordBatch,
util::pretty::pretty_format_batches,
};
-use arrow_array::make_array;
+use arrow_array::{make_array, BooleanArray, Float32Array, StructArray};
use chrono::{Datelike, Duration, TimeDelta};
use datafusion::{
datasource::{physical_plan::ParquetExec, provider_as_source,
TableProvider},
@@ -64,7 +64,9 @@ fn init() {
// ----------------------
/// What data to use
+#[derive(Debug, Clone, Copy)]
enum Scenario {
+ Boolean,
Timestamps,
Dates,
Int,
@@ -81,6 +83,7 @@ enum Scenario {
PeriodsInColumnNames,
WithNullValues,
WithNullValuesPageLevel,
+ StructArray,
}
enum Unit {
@@ -312,6 +315,16 @@ impl ContextWithParquet {
}
}
+fn make_boolean_batch(v: Vec<Option<bool>>) -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "bool",
+ DataType::Boolean,
+ true,
+ )]));
+ let array = Arc::new(BooleanArray::from(v)) as ArrayRef;
+ RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
+}
+
/// Return record batch with a few rows of data for all of the supported
timestamp types
/// values with the specified offset
///
@@ -699,6 +712,24 @@ fn make_int_batches_with_null(
fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
match scenario {
+ Scenario::Boolean => {
+ vec![
+ make_boolean_batch(vec![
+ Some(true),
+ Some(false),
+ Some(true),
+ Some(false),
+ None,
+ ]),
+ make_boolean_batch(vec![
+ Some(false),
+ Some(false),
+ Some(false),
+ Some(false),
+ Some(false),
+ ]),
+ ]
+ }
Scenario::Timestamps => {
vec![
make_timestamp_batch(TimeDelta::try_seconds(0).unwrap()),
@@ -881,6 +912,20 @@ fn create_data_batch(scenario: Scenario) ->
Vec<RecordBatch> {
make_int_batches_with_null(5, 1, 6),
]
}
+ Scenario::StructArray => {
+ let struct_array_data = struct_array(vec![
+ (Some(1), Some(6.0), Some(12.0)),
+ (Some(2), Some(8.5), None),
+ (None, Some(8.5), Some(14.0)),
+ ]);
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "struct",
+ struct_array_data.data_type().clone(),
+ true,
+ )]));
+ vec![RecordBatch::try_new(schema,
vec![struct_array_data]).unwrap()]
+ }
}
}
@@ -936,3 +981,27 @@ async fn make_test_file_page(scenario: Scenario,
row_per_page: usize) -> NamedTe
writer.close().unwrap();
output_file
}
+
+// returns a struct array with columns "int32_col", "float32_col" and
"float64_col" with the specified values
+fn struct_array(input: Vec<(Option<i32>, Option<f32>, Option<f64>)>) ->
ArrayRef {
+ let int_32: Int32Array = input.iter().map(|(i, _, _)| i).collect();
+ let float_32: Float32Array = input.iter().map(|(_, f, _)| f).collect();
+ let float_64: Float64Array = input.iter().map(|(_, _, f)| f).collect();
+
+ let nullable = true;
+ let struct_array = StructArray::from(vec![
+ (
+ Arc::new(Field::new("int32_col", DataType::Int32, nullable)),
+ Arc::new(int_32) as ArrayRef,
+ ),
+ (
+ Arc::new(Field::new("float32_col", DataType::Float32, nullable)),
+ Arc::new(float_32) as ArrayRef,
+ ),
+ (
+ Arc::new(Field::new("float64_col", DataType::Float64, nullable)),
+ Arc::new(float_64) as ArrayRef,
+ ),
+ ]);
+ Arc::new(struct_array)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]