This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 268f648db9 Minor: add parquet page stats for float{16, 32, 64} (#10982)
268f648db9 is described below

commit 268f648db9175cb8a24c46c6e40cb5bc03995b2b
Author: tmi <[email protected]>
AuthorDate: Wed Jun 19 15:46:24 2024 +0200

    Minor: add parquet page stats for float{16, 32, 64} (#10982)
---
 .../datasource/physical_plan/parquet/statistics.rs | 100 +++++++++++++++++++--
 datafusion/core/tests/parquet/arrow_statistics.rs  |  88 ++++++++++++++++++
 datafusion/core/tests/parquet/mod.rs               |  15 ++++
 3 files changed, 195 insertions(+), 8 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
index 6ad78a82b9..2ca47de990 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -33,8 +33,9 @@ use arrow_array::{
 use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
 use datafusion_common::{internal_datafusion_err, internal_err, plan_err, 
Result};
 use half::f16;
+use parquet::data_type::FixedLenByteArray;
 use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, 
RowGroupMetaData};
-use parquet::file::page_index::index::Index;
+use parquet::file::page_index::index::{Index, PageIndex};
 use parquet::file::statistics::Statistics as ParquetStatistics;
 use parquet::schema::types::SchemaDescriptor;
 use paste::paste;
@@ -495,7 +496,7 @@ macro_rules! get_statistics {
 }
 
 macro_rules! make_data_page_stats_iterator {
-    ($iterator_type: ident, $func: ident, $index_type: path, $stat_value_type: 
ty) => {
+    ($iterator_type: ident, $func: expr, $index_type: path, $stat_value_type: 
ty) => {
         struct $iterator_type<'a, I>
         where
             I: Iterator<Item = (usize, &'a Index)>,
@@ -526,7 +527,7 @@ macro_rules! make_data_page_stats_iterator {
                             native_index
                                 .indexes
                                 .iter()
-                                .map(|x| x.$func)
+                                .map(|x| $func(x))
                                 .collect::<Vec<_>>(),
                         ),
                         // No matching `Index` found;
@@ -548,11 +549,66 @@ macro_rules! make_data_page_stats_iterator {
     };
 }
 
-make_data_page_stats_iterator!(MinInt32DataPageStatsIterator, min, 
Index::INT32, i32);
-make_data_page_stats_iterator!(MaxInt32DataPageStatsIterator, max, 
Index::INT32, i32);
-make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min, 
Index::INT64, i64);
-make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max, 
Index::INT64, i64);
-
+make_data_page_stats_iterator!(
+    MinInt32DataPageStatsIterator,
+    |x: &PageIndex<i32>| { x.min },
+    Index::INT32,
+    i32
+);
+make_data_page_stats_iterator!(
+    MaxInt32DataPageStatsIterator,
+    |x: &PageIndex<i32>| { x.max },
+    Index::INT32,
+    i32
+);
+make_data_page_stats_iterator!(
+    MinInt64DataPageStatsIterator,
+    |x: &PageIndex<i64>| { x.min },
+    Index::INT64,
+    i64
+);
+make_data_page_stats_iterator!(
+    MaxInt64DataPageStatsIterator,
+    |x: &PageIndex<i64>| { x.max },
+    Index::INT64,
+    i64
+);
+make_data_page_stats_iterator!(
+    MinFloat16DataPageStatsIterator,
+    |x: &PageIndex<FixedLenByteArray>| { x.min.clone() },
+    Index::FIXED_LEN_BYTE_ARRAY,
+    FixedLenByteArray
+);
+make_data_page_stats_iterator!(
+    MaxFloat16DataPageStatsIterator,
+    |x: &PageIndex<FixedLenByteArray>| { x.max.clone() },
+    Index::FIXED_LEN_BYTE_ARRAY,
+    FixedLenByteArray
+);
+make_data_page_stats_iterator!(
+    MinFloat32DataPageStatsIterator,
+    |x: &PageIndex<f32>| { x.min },
+    Index::FLOAT,
+    f32
+);
+make_data_page_stats_iterator!(
+    MaxFloat32DataPageStatsIterator,
+    |x: &PageIndex<f32>| { x.max },
+    Index::FLOAT,
+    f32
+);
+make_data_page_stats_iterator!(
+    MinFloat64DataPageStatsIterator,
+    |x: &PageIndex<f64>| { x.min },
+    Index::DOUBLE,
+    f64
+);
+make_data_page_stats_iterator!(
+    MaxFloat64DataPageStatsIterator,
+    |x: &PageIndex<f64>| { x.max },
+    Index::DOUBLE,
+    f64
+);
 macro_rules! get_data_page_statistics {
     ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
         paste! {
@@ -581,6 +637,19 @@ macro_rules! get_data_page_statistics {
                 )),
                 Some(DataType::Int32) => 
Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix 
Int32DataPageStatsIterator>]::new($iterator).flatten()))),
                 Some(DataType::Int64) => 
Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix 
Int64DataPageStatsIterator>]::new($iterator).flatten()))),
+                Some(DataType::Float16) => Ok(Arc::new(
+                    Float16Array::from_iter(
+                        [<$stat_type_prefix 
Float16DataPageStatsIterator>]::new($iterator)
+                            .map(|x| {
+                                x.into_iter().filter_map(|x| {
+                                    x.and_then(|x| 
Some(from_bytes_to_f16(x.data())))
+                                })
+                            })
+                            .flatten()
+                    )
+                )),
+                Some(DataType::Float32) => 
Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix 
Float32DataPageStatsIterator>]::new($iterator).flatten()))),
+                Some(DataType::Float64) => 
Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix 
Float64DataPageStatsIterator>]::new($iterator).flatten()))),
                 _ => unimplemented!()
             }
         }
@@ -677,6 +746,21 @@ where
             .iter()
             .map(|x| x.null_count.map(|x| x as u64))
             .collect::<Vec<_>>(),
+        Index::FLOAT(native_index) => native_index
+            .indexes
+            .iter()
+            .map(|x| x.null_count.map(|x| x as u64))
+            .collect::<Vec<_>>(),
+        Index::DOUBLE(native_index) => native_index
+            .indexes
+            .iter()
+            .map(|x| x.null_count.map(|x| x as u64))
+            .collect::<Vec<_>>(),
+        Index::FIXED_LEN_BYTE_ARRAY(native_index) => native_index
+            .indexes
+            .iter()
+            .map(|x| x.null_count.map(|x| x as u64))
+            .collect::<Vec<_>>(),
         _ => unimplemented!(),
     });
 
diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs 
b/datafusion/core/tests/parquet/arrow_statistics.rs
index 4c68a57333..bdae9f4786 100644
--- a/datafusion/core/tests/parquet/arrow_statistics.rs
+++ b/datafusion/core/tests/parquet/arrow_statistics.rs
@@ -614,6 +614,94 @@ async fn test_int_8() {
     .run();
 }
 
+#[tokio::test]
+async fn test_float_16() {
+    // This creates a parquet files of 1 column named f
+    let reader = TestReader {
+        scenario: Scenario::Float16,
+        row_per_group: 5,
+    }
+    .build()
+    .await;
+
+    Test {
+        reader: &reader,
+        // mins are [-5, -4, 0, 5]
+        expected_min: Arc::new(Float16Array::from(vec![
+            f16::from_f32(-5.),
+            f16::from_f32(-4.),
+            f16::from_f32(-0.),
+            f16::from_f32(5.),
+        ])),
+        // maxes are [-1, 0, 4, 9]
+        expected_max: Arc::new(Float16Array::from(vec![
+            f16::from_f32(-1.),
+            f16::from_f32(0.),
+            f16::from_f32(4.),
+            f16::from_f32(9.),
+        ])),
+        // nulls are [0, 0, 0, 0]
+        expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
+        // row counts are [5, 5, 5, 5]
+        expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
+        column_name: "f",
+        check: Check::Both,
+    }
+    .run();
+}
+
+#[tokio::test]
+async fn test_float_32() {
+    // This creates a parquet files of 1 column named f
+    let reader = TestReader {
+        scenario: Scenario::Float32,
+        row_per_group: 5,
+    }
+    .build()
+    .await;
+
+    Test {
+        reader: &reader,
+        // mins are [-5, -4, 0, 5]
+        expected_min: Arc::new(Float32Array::from(vec![-5., -4., -0., 5.0])),
+        // maxes are [-1, 0, 4, 9]
+        expected_max: Arc::new(Float32Array::from(vec![-1., 0., 4., 9.])),
+        // nulls are [0, 0, 0, 0]
+        expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
+        // row counts are [5, 5, 5, 5]
+        expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
+        column_name: "f",
+        check: Check::Both,
+    }
+    .run();
+}
+
+#[tokio::test]
+async fn test_float_64() {
+    // This creates a parquet files of 1 column named f
+    let reader = TestReader {
+        scenario: Scenario::Float64,
+        row_per_group: 5,
+    }
+    .build()
+    .await;
+
+    Test {
+        reader: &reader,
+        // mins are [-5, -4, 0, 5]
+        expected_min: Arc::new(Float64Array::from(vec![-5., -4., -0., 5.0])),
+        // maxes are [-1, 0, 4, 9]
+        expected_max: Arc::new(Float64Array::from(vec![-1., 0., 4., 9.])),
+        // nulls are [0, 0, 0, 0]
+        expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
+        // row counts are [5, 5, 5, 5]
+        expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
+        column_name: "f",
+        check: Check::Both,
+    }
+    .run();
+}
+
 // timestamp
 #[tokio::test]
 async fn test_timestamp() {
diff --git a/datafusion/core/tests/parquet/mod.rs 
b/datafusion/core/tests/parquet/mod.rs
index 0434a271c3..1b68a4aa4e 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -90,6 +90,7 @@ enum Scenario {
     /// -MIN, -100, -1, 0, 1, 100, MAX
     NumericLimits,
     Float16,
+    Float32,
     Float64,
     Decimal,
     Decimal256,
@@ -586,6 +587,12 @@ fn make_f64_batch(v: Vec<f64>) -> RecordBatch {
     RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
 }
 
+fn make_f32_batch(v: Vec<f32>) -> RecordBatch {
+    let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float32, 
true)]));
+    let array = Arc::new(Float32Array::from(v)) as ArrayRef;
+    RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
+}
+
 fn make_f16_batch(v: Vec<f16>) -> RecordBatch {
     let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float16, 
true)]));
     let array = Arc::new(Float16Array::from(v)) as ArrayRef;
@@ -1003,6 +1010,14 @@ fn create_data_batch(scenario: Scenario) -> 
Vec<RecordBatch> {
                 ),
             ]
         }
+        Scenario::Float32 => {
+            vec![
+                make_f32_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
+                make_f32_batch(vec![-4.0, -3.0, -2.0, -1.0, 0.0]),
+                make_f32_batch(vec![0.0, 1.0, 2.0, 3.0, 4.0]),
+                make_f32_batch(vec![5.0, 6.0, 7.0, 8.0, 9.0]),
+            ]
+        }
         Scenario::Float64 => {
             vec![
                 make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to