This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 10948ca2f0 Support Decimal and Decimal256 Parquet Data Page Statistics 
(#11138)
10948ca2f0 is described below

commit 10948ca2f08ce2f28b8b24c38e4c45190d23ca20
Author: Lordworms <[email protected]>
AuthorDate: Thu Jun 27 13:53:52 2024 -0700

    Support Decimal and Decimal256 Parquet Data Page Statistics (#11138)
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../datasource/physical_plan/parquet/statistics.rs | 118 +++++++++++++++++++++
 datafusion/core/tests/parquet/arrow_statistics.rs  |   4 +-
 2 files changed, 120 insertions(+), 2 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
index 64b0c2c4b1..67c517ddbc 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -600,6 +600,119 @@ make_data_page_stats_iterator!(
     Index::DOUBLE,
     f64
 );
+
+macro_rules! get_decimal_page_stats_iterator {
+    ($iterator_type: ident, $func: ident, $stat_value_type: ident, 
$convert_func: ident) => {
+        struct $iterator_type<'a, I>
+        where
+            I: Iterator<Item = (usize, &'a Index)>,
+        {
+            iter: I,
+        }
+
+        impl<'a, I> $iterator_type<'a, I>
+        where
+            I: Iterator<Item = (usize, &'a Index)>,
+        {
+            fn new(iter: I) -> Self {
+                Self { iter }
+            }
+        }
+
+        impl<'a, I> Iterator for $iterator_type<'a, I>
+        where
+            I: Iterator<Item = (usize, &'a Index)>,
+        {
+            type Item = Vec<Option<$stat_value_type>>;
+
+            fn next(&mut self) -> Option<Self::Item> {
+                let next = self.iter.next();
+                match next {
+                    Some((len, index)) => match index {
+                        Index::INT32(native_index) => Some(
+                            native_index
+                                .indexes
+                                .iter()
+                                .map(|x| {
+                                    Some($stat_value_type::from(
+                                        x.$func.unwrap_or_default(),
+                                    ))
+                                })
+                                .collect::<Vec<_>>(),
+                        ),
+                        Index::INT64(native_index) => Some(
+                            native_index
+                                .indexes
+                                .iter()
+                                .map(|x| {
+                                    Some($stat_value_type::from(
+                                        x.$func.unwrap_or_default(),
+                                    ))
+                                })
+                                .collect::<Vec<_>>(),
+                        ),
+                        Index::BYTE_ARRAY(native_index) => Some(
+                            native_index
+                                .indexes
+                                .iter()
+                                .map(|x| {
+                                    Some($convert_func(
+                                        
x.clone().$func.unwrap_or_default().data(),
+                                    ))
+                                })
+                                .collect::<Vec<_>>(),
+                        ),
+                        Index::FIXED_LEN_BYTE_ARRAY(native_index) => Some(
+                            native_index
+                                .indexes
+                                .iter()
+                                .map(|x| {
+                                    Some($convert_func(
+                                        
x.clone().$func.unwrap_or_default().data(),
+                                    ))
+                                })
+                                .collect::<Vec<_>>(),
+                        ),
+                        _ => Some(vec![None; len]),
+                    },
+                    _ => None,
+                }
+            }
+
+            fn size_hint(&self) -> (usize, Option<usize>) {
+                self.iter.size_hint()
+            }
+        }
+    };
+}
+
+get_decimal_page_stats_iterator!(
+    MinDecimal128DataPageStatsIterator,
+    min,
+    i128,
+    from_bytes_to_i128
+);
+
+get_decimal_page_stats_iterator!(
+    MaxDecimal128DataPageStatsIterator,
+    max,
+    i128,
+    from_bytes_to_i128
+);
+
+get_decimal_page_stats_iterator!(
+    MinDecimal256DataPageStatsIterator,
+    min,
+    i256,
+    from_bytes_to_i256
+);
+
+get_decimal_page_stats_iterator!(
+    MaxDecimal256DataPageStatsIterator,
+    max,
+    i256,
+    from_bytes_to_i256
+);
 make_data_page_stats_iterator!(
     MinByteArrayDataPageStatsIterator,
     |x: &PageIndex<ByteArray>| { x.min.clone() },
@@ -612,6 +725,7 @@ make_data_page_stats_iterator!(
     Index::BYTE_ARRAY,
     ByteArray
 );
+
 macro_rules! get_data_page_statistics {
     ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
         paste! {
@@ -755,6 +869,10 @@ macro_rules! get_data_page_statistics {
                         )
                     )
                 ),
+                Some(DataType::Decimal128(precision, scale)) => Ok(Arc::new(
+                    Decimal128Array::from_iter([<$stat_type_prefix 
Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision,
 *scale)?)),
+                Some(DataType::Decimal256(precision, scale)) => Ok(Arc::new(
+                    Decimal256Array::from_iter([<$stat_type_prefix 
Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision,
 *scale)?)),
                 _ => unimplemented!()
             }
         }
diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs 
b/datafusion/core/tests/parquet/arrow_statistics.rs
index eba9687f04..47f079063d 100644
--- a/datafusion/core/tests/parquet/arrow_statistics.rs
+++ b/datafusion/core/tests/parquet/arrow_statistics.rs
@@ -1683,7 +1683,7 @@ async fn test_decimal() {
         expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
         expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
         column_name: "decimal_col",
-        check: Check::RowGroup,
+        check: Check::Both,
     }
     .run();
 }
@@ -1721,7 +1721,7 @@ async fn test_decimal_256() {
         expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
         expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
         column_name: "decimal256_col",
-        check: Check::RowGroup,
+        check: Check::Both,
     }
     .run();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to