This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 268f648db9 Minor: add parquet page stats for float{16, 32, 64} (#10982)
268f648db9 is described below
commit 268f648db9175cb8a24c46c6e40cb5bc03995b2b
Author: tmi <[email protected]>
AuthorDate: Wed Jun 19 15:46:24 2024 +0200
Minor: add parquet page stats for float{16, 32, 64} (#10982)
---
.../datasource/physical_plan/parquet/statistics.rs | 100 +++++++++++++++++++--
datafusion/core/tests/parquet/arrow_statistics.rs | 88 ++++++++++++++++++
datafusion/core/tests/parquet/mod.rs | 15 ++++
3 files changed, 195 insertions(+), 8 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
index 6ad78a82b9..2ca47de990 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -33,8 +33,9 @@ use arrow_array::{
use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
use datafusion_common::{internal_datafusion_err, internal_err, plan_err,
Result};
use half::f16;
+use parquet::data_type::FixedLenByteArray;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex,
RowGroupMetaData};
-use parquet::file::page_index::index::Index;
+use parquet::file::page_index::index::{Index, PageIndex};
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::schema::types::SchemaDescriptor;
use paste::paste;
@@ -495,7 +496,7 @@ macro_rules! get_statistics {
}
macro_rules! make_data_page_stats_iterator {
- ($iterator_type: ident, $func: ident, $index_type: path, $stat_value_type:
ty) => {
+ ($iterator_type: ident, $func: expr, $index_type: path, $stat_value_type:
ty) => {
struct $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
@@ -526,7 +527,7 @@ macro_rules! make_data_page_stats_iterator {
native_index
.indexes
.iter()
- .map(|x| x.$func)
+ .map(|x| $func(x))
.collect::<Vec<_>>(),
),
// No matching `Index` found;
@@ -548,11 +549,66 @@ macro_rules! make_data_page_stats_iterator {
};
}
-make_data_page_stats_iterator!(MinInt32DataPageStatsIterator, min,
Index::INT32, i32);
-make_data_page_stats_iterator!(MaxInt32DataPageStatsIterator, max,
Index::INT32, i32);
-make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min,
Index::INT64, i64);
-make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max,
Index::INT64, i64);
-
+make_data_page_stats_iterator!(
+ MinInt32DataPageStatsIterator,
+ |x: &PageIndex<i32>| { x.min },
+ Index::INT32,
+ i32
+);
+make_data_page_stats_iterator!(
+ MaxInt32DataPageStatsIterator,
+ |x: &PageIndex<i32>| { x.max },
+ Index::INT32,
+ i32
+);
+make_data_page_stats_iterator!(
+ MinInt64DataPageStatsIterator,
+ |x: &PageIndex<i64>| { x.min },
+ Index::INT64,
+ i64
+);
+make_data_page_stats_iterator!(
+ MaxInt64DataPageStatsIterator,
+ |x: &PageIndex<i64>| { x.max },
+ Index::INT64,
+ i64
+);
+make_data_page_stats_iterator!(
+ MinFloat16DataPageStatsIterator,
+ |x: &PageIndex<FixedLenByteArray>| { x.min.clone() },
+ Index::FIXED_LEN_BYTE_ARRAY,
+ FixedLenByteArray
+);
+make_data_page_stats_iterator!(
+ MaxFloat16DataPageStatsIterator,
+ |x: &PageIndex<FixedLenByteArray>| { x.max.clone() },
+ Index::FIXED_LEN_BYTE_ARRAY,
+ FixedLenByteArray
+);
+make_data_page_stats_iterator!(
+ MinFloat32DataPageStatsIterator,
+ |x: &PageIndex<f32>| { x.min },
+ Index::FLOAT,
+ f32
+);
+make_data_page_stats_iterator!(
+ MaxFloat32DataPageStatsIterator,
+ |x: &PageIndex<f32>| { x.max },
+ Index::FLOAT,
+ f32
+);
+make_data_page_stats_iterator!(
+ MinFloat64DataPageStatsIterator,
+ |x: &PageIndex<f64>| { x.min },
+ Index::DOUBLE,
+ f64
+);
+make_data_page_stats_iterator!(
+ MaxFloat64DataPageStatsIterator,
+ |x: &PageIndex<f64>| { x.max },
+ Index::DOUBLE,
+ f64
+);
macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
paste! {
@@ -581,6 +637,19 @@ macro_rules! get_data_page_statistics {
)),
Some(DataType::Int32) =>
Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Int64) =>
Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix
Int64DataPageStatsIterator>]::new($iterator).flatten()))),
+ Some(DataType::Float16) => Ok(Arc::new(
+ Float16Array::from_iter(
+ [<$stat_type_prefix
Float16DataPageStatsIterator>]::new($iterator)
+ .map(|x| {
+ x.into_iter().filter_map(|x| {
+ x.and_then(|x|
Some(from_bytes_to_f16(x.data())))
+ })
+ })
+ .flatten()
+ )
+ )),
+ Some(DataType::Float32) =>
Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix
Float32DataPageStatsIterator>]::new($iterator).flatten()))),
+ Some(DataType::Float64) =>
Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix
Float64DataPageStatsIterator>]::new($iterator).flatten()))),
_ => unimplemented!()
}
}
@@ -677,6 +746,21 @@ where
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
+ Index::FLOAT(native_index) => native_index
+ .indexes
+ .iter()
+ .map(|x| x.null_count.map(|x| x as u64))
+ .collect::<Vec<_>>(),
+ Index::DOUBLE(native_index) => native_index
+ .indexes
+ .iter()
+ .map(|x| x.null_count.map(|x| x as u64))
+ .collect::<Vec<_>>(),
+ Index::FIXED_LEN_BYTE_ARRAY(native_index) => native_index
+ .indexes
+ .iter()
+ .map(|x| x.null_count.map(|x| x as u64))
+ .collect::<Vec<_>>(),
_ => unimplemented!(),
});
diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs
b/datafusion/core/tests/parquet/arrow_statistics.rs
index 4c68a57333..bdae9f4786 100644
--- a/datafusion/core/tests/parquet/arrow_statistics.rs
+++ b/datafusion/core/tests/parquet/arrow_statistics.rs
@@ -614,6 +614,94 @@ async fn test_int_8() {
.run();
}
+#[tokio::test]
+async fn test_float_16() {
+ // This creates a parquet files of 1 column named f
+ let reader = TestReader {
+ scenario: Scenario::Float16,
+ row_per_group: 5,
+ }
+ .build()
+ .await;
+
+ Test {
+ reader: &reader,
+ // mins are [-5, -4, 0, 5]
+ expected_min: Arc::new(Float16Array::from(vec![
+ f16::from_f32(-5.),
+ f16::from_f32(-4.),
+ f16::from_f32(-0.),
+ f16::from_f32(5.),
+ ])),
+ // maxes are [-1, 0, 4, 9]
+ expected_max: Arc::new(Float16Array::from(vec![
+ f16::from_f32(-1.),
+ f16::from_f32(0.),
+ f16::from_f32(4.),
+ f16::from_f32(9.),
+ ])),
+ // nulls are [0, 0, 0, 0]
+ expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
+ // row counts are [5, 5, 5, 5]
+ expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
+ column_name: "f",
+ check: Check::Both,
+ }
+ .run();
+}
+
+#[tokio::test]
+async fn test_float_32() {
+ // This creates a parquet files of 1 column named f
+ let reader = TestReader {
+ scenario: Scenario::Float32,
+ row_per_group: 5,
+ }
+ .build()
+ .await;
+
+ Test {
+ reader: &reader,
+ // mins are [-5, -4, 0, 5]
+ expected_min: Arc::new(Float32Array::from(vec![-5., -4., -0., 5.0])),
+ // maxes are [-1, 0, 4, 9]
+ expected_max: Arc::new(Float32Array::from(vec![-1., 0., 4., 9.])),
+ // nulls are [0, 0, 0, 0]
+ expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
+ // row counts are [5, 5, 5, 5]
+ expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
+ column_name: "f",
+ check: Check::Both,
+ }
+ .run();
+}
+
+#[tokio::test]
+async fn test_float_64() {
+ // This creates a parquet files of 1 column named f
+ let reader = TestReader {
+ scenario: Scenario::Float64,
+ row_per_group: 5,
+ }
+ .build()
+ .await;
+
+ Test {
+ reader: &reader,
+ // mins are [-5, -4, 0, 5]
+ expected_min: Arc::new(Float64Array::from(vec![-5., -4., -0., 5.0])),
+ // maxes are [-1, 0, 4, 9]
+ expected_max: Arc::new(Float64Array::from(vec![-1., 0., 4., 9.])),
+ // nulls are [0, 0, 0, 0]
+ expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
+ // row counts are [5, 5, 5, 5]
+ expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
+ column_name: "f",
+ check: Check::Both,
+ }
+ .run();
+}
+
// timestamp
#[tokio::test]
async fn test_timestamp() {
diff --git a/datafusion/core/tests/parquet/mod.rs
b/datafusion/core/tests/parquet/mod.rs
index 0434a271c3..1b68a4aa4e 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -90,6 +90,7 @@ enum Scenario {
/// -MIN, -100, -1, 0, 1, 100, MAX
NumericLimits,
Float16,
+ Float32,
Float64,
Decimal,
Decimal256,
@@ -586,6 +587,12 @@ fn make_f64_batch(v: Vec<f64>) -> RecordBatch {
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
}
+fn make_f32_batch(v: Vec<f32>) -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float32,
true)]));
+ let array = Arc::new(Float32Array::from(v)) as ArrayRef;
+ RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
+}
+
fn make_f16_batch(v: Vec<f16>) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float16,
true)]));
let array = Arc::new(Float16Array::from(v)) as ArrayRef;
@@ -1003,6 +1010,14 @@ fn create_data_batch(scenario: Scenario) ->
Vec<RecordBatch> {
),
]
}
+ Scenario::Float32 => {
+ vec![
+ make_f32_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
+ make_f32_batch(vec![-4.0, -3.0, -2.0, -1.0, 0.0]),
+ make_f32_batch(vec![0.0, 1.0, 2.0, 3.0, 4.0]),
+ make_f32_batch(vec![5.0, 6.0, 7.0, 8.0, 9.0]),
+ ]
+ }
Scenario::Float64 => {
vec![
make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]