This is an automated email from the ASF dual-hosted git repository.

etseidl pushed a commit to branch gh5854_thrift_remodel
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/gh5854_thrift_remodel by this 
push:
     new 3dbd42e516 [thrift-remodel] Use new Thrift encoder/decoder for Parquet 
page headers (#8376)
3dbd42e516 is described below

commit 3dbd42e516ffea475b16033c05d9492a6d2d849e
Author: Ed Seidl <[email protected]>
AuthorDate: Tue Sep 23 11:22:07 2025 -0700

    [thrift-remodel] Use new Thrift encoder/decoder for Parquet page headers 
(#8376)
    
    # Which issue does this PR close?
    **Note: this targets a feature branch, not main**
    
    - Part of #5854.
    
    # Rationale for this change
    
    This continues the remodel, moving on to `PageHeader` support.
    
    # What changes are included in this PR?
    
    Swaps out old `format` page header structs for new ones. This also adds
    a `Read` based implementation of the thrift compact protocol reader (the
    sizes of the Thrift encoded page headers are not knowable in advance, so
    we need a way to read them from the thrift input stream used by the page
    decoder).
    
    This PR also makes decoding of the `Statistics` in the page header
    optional (defaults to `false`). We do not use them, and the decode takes
    a good chunk of time.
    
    # Are these changes tested?
    
    These changes should be covered by existing tests
    
    # Are there any user-facing changes?
    
    Yes, page level stats are no longer decoded by default
---
 parquet/benches/metadata.rs                     |  20 ++
 parquet/src/arrow/arrow_writer/mod.rs           |  27 +-
 parquet/src/column/page.rs                      |  40 ++-
 parquet/src/column/page_encryption.rs           |   8 +-
 parquet/src/column/page_encryption_disabled.rs  |   2 +-
 parquet/src/column/writer/mod.rs                |   1 +
 parquet/src/encryption/encrypt.rs               |  28 ++
 parquet/src/errors.rs                           |   8 +
 parquet/src/file/metadata/thrift_gen.rs         | 443 ++++++++++++++++++------
 parquet/src/file/page_encoding_stats.rs         |   2 +-
 parquet/src/file/properties.rs                  |  24 ++
 parquet/src/file/serialized_reader.rs           |  67 ++--
 parquet/src/file/statistics.rs                  | 201 +++++++++++
 parquet/src/file/writer.rs                      |  11 +-
 parquet/src/parquet_macros.rs                   |  76 +++-
 parquet/src/parquet_thrift.rs                   |  61 +++-
 parquet/src/thrift.rs                           |  15 +-
 parquet/tests/arrow_reader/bad_data.rs          |   2 +-
 parquet/tests/encryption/encryption_agnostic.rs |   4 +-
 19 files changed, 861 insertions(+), 179 deletions(-)

diff --git a/parquet/benches/metadata.rs b/parquet/benches/metadata.rs
index 8c886e4d5e..ced0175da8 100644
--- a/parquet/benches/metadata.rs
+++ b/parquet/benches/metadata.rs
@@ -258,6 +258,26 @@ fn criterion_benchmark(c: &mut Criterion) {
             });
         })
     });
+
+    #[cfg(feature = "arrow")]
+    c.bench_function("page headers (no stats)", |b| {
+        b.iter(|| {
+            metadata.row_groups.iter().for_each(|rg| {
+                rg.columns.iter().for_each(|col| {
+                    if let Some(col_meta) = &col.meta_data {
+                        if let Some(dict_offset) = 
col_meta.dictionary_page_offset {
+                            parquet::thrift::bench_page_header_no_stats(
+                                &file_bytes.slice(dict_offset as usize..),
+                            );
+                        }
+                        parquet::thrift::bench_page_header_no_stats(
+                            &file_bytes.slice(col_meta.data_page_offset as 
usize..),
+                        );
+                    }
+                });
+            });
+        })
+    });
 }
 
 criterion_group!(benches, criterion_benchmark);
diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 5ccab5b24b..e863b74a95 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -23,7 +23,6 @@ use std::iter::Peekable;
 use std::slice::Iter;
 use std::sync::{Arc, Mutex};
 use std::vec::IntoIter;
-use thrift::protocol::TCompactOutputProtocol;
 
 use arrow_array::cast::AsArray;
 use arrow_array::types::*;
@@ -48,8 +47,8 @@ use crate::file::metadata::{KeyValue, RowGroupMetaData};
 use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
 use crate::file::reader::{ChunkReader, Length};
 use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
+use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
 use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
-use crate::thrift::TSerializable;
 use levels::{calculate_array_levels, ArrayLevels};
 
 mod byte_array;
@@ -583,8 +582,8 @@ impl PageWriter for ArrowPageWriter {
                     }
                 }
                 None => {
-                    let mut protocol = TCompactOutputProtocol::new(&mut 
header);
-                    page_header.write_to_out_protocol(&mut protocol)?;
+                    let mut protocol = ThriftCompactOutputProtocol::new(&mut 
header);
+                    page_header.write_thrift(&mut protocol)?;
                 }
             };
 
@@ -1487,12 +1486,12 @@ mod tests {
     use crate::arrow::arrow_reader::{ParquetRecordBatchReader, 
ParquetRecordBatchReaderBuilder};
     use crate::arrow::ARROW_SCHEMA_META_KEY;
     use crate::column::page::{Page, PageReader};
+    use crate::file::metadata::thrift_gen::PageHeader;
     use crate::file::page_encoding_stats::PageEncodingStats;
     use crate::file::page_index::column_index::ColumnIndexMetaData;
     use crate::file::reader::SerializedPageReader;
-    use crate::format::PageHeader;
+    use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
     use crate::schema::types::ColumnPath;
-    use crate::thrift::TCompactSliceInputProtocol;
     use arrow::datatypes::ToByteSlice;
     use arrow::datatypes::{DataType, Schema};
     use arrow::error::Result as ArrowResult;
@@ -4191,8 +4190,8 @@ mod tests {
 
         // decode first page header
         let first_page = &buf[4..];
-        let mut prot = TCompactSliceInputProtocol::new(first_page);
-        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
+        let mut prot = ThriftSliceInputProtocol::new(first_page);
+        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
         let stats = hdr.data_page_header.unwrap().statistics;
 
         assert!(stats.is_none());
@@ -4225,8 +4224,8 @@ mod tests {
 
         // decode first page header
         let first_page = &buf[4..];
-        let mut prot = TCompactSliceInputProtocol::new(first_page);
-        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
+        let mut prot = ThriftSliceInputProtocol::new(first_page);
+        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
         let stats = hdr.data_page_header.unwrap().statistics;
 
         let stats = stats.unwrap();
@@ -4277,8 +4276,8 @@ mod tests {
 
         // decode first page header
         let first_page = &buf[4..];
-        let mut prot = TCompactSliceInputProtocol::new(first_page);
-        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
+        let mut prot = ThriftSliceInputProtocol::new(first_page);
+        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
         let stats = hdr.data_page_header.unwrap().statistics;
         assert!(stats.is_some());
         let stats = stats.unwrap();
@@ -4290,8 +4289,8 @@ mod tests {
 
         // check second page now
         let second_page = &prot.as_slice()[hdr.compressed_page_size as 
usize..];
-        let mut prot = TCompactSliceInputProtocol::new(second_page);
-        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
+        let mut prot = ThriftSliceInputProtocol::new(second_page);
+        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
         let stats = hdr.data_page_header.unwrap().statistics;
         assert!(stats.is_some());
         let stats = stats.unwrap();
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index a2f683d71f..23bf4548fb 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -21,8 +21,10 @@ use bytes::Bytes;
 
 use crate::basic::{Encoding, PageType};
 use crate::errors::{ParquetError, Result};
-use crate::file::statistics::Statistics;
-use crate::format::PageHeader;
+use crate::file::metadata::thrift_gen::{
+    DataPageHeader, DataPageHeaderV2, DictionaryPageHeader, PageHeader,
+};
+use crate::file::statistics::{page_stats_to_thrift, Statistics};
 
 /// Parquet Page definition.
 ///
@@ -216,7 +218,7 @@ impl CompressedPage {
         let page_type = self.page_type();
 
         let mut page_header = PageHeader {
-            type_: page_type.into(),
+            type_: page_type,
             uncompressed_page_size: uncompressed_size as i32,
             compressed_page_size: compressed_size as i32,
             // TODO: Add support for crc checksum
@@ -234,12 +236,12 @@ impl CompressedPage {
                 ref statistics,
                 ..
             } => {
-                let data_page_header = crate::format::DataPageHeader {
+                let data_page_header = DataPageHeader {
                     num_values: num_values as i32,
-                    encoding: encoding.into(),
-                    definition_level_encoding: def_level_encoding.into(),
-                    repetition_level_encoding: rep_level_encoding.into(),
-                    statistics: 
crate::file::statistics::to_thrift(statistics.as_ref()),
+                    encoding,
+                    definition_level_encoding: def_level_encoding,
+                    repetition_level_encoding: rep_level_encoding,
+                    statistics: page_stats_to_thrift(statistics.as_ref()),
                 };
                 page_header.data_page_header = Some(data_page_header);
             }
@@ -252,22 +254,22 @@ impl CompressedPage {
                 ref statistics,
                 ..
             } => {
-                let data_page_header_v2 = crate::format::DataPageHeaderV2 {
+                let data_page_header_v2 = DataPageHeaderV2 {
                     num_values: num_values as i32,
                     num_nulls: num_nulls as i32,
                     num_rows: num_rows as i32,
-                    encoding: encoding.into(),
+                    encoding,
                     definition_levels_byte_length: def_levels_byte_len as i32,
                     repetition_levels_byte_length: rep_levels_byte_len as i32,
                     is_compressed: Some(is_compressed),
-                    statistics: 
crate::file::statistics::to_thrift(statistics.as_ref()),
+                    statistics: page_stats_to_thrift(statistics.as_ref()),
                 };
                 page_header.data_page_header_v2 = Some(data_page_header_v2);
             }
             Page::DictionaryPage { is_sorted, .. } => {
-                let dictionary_page_header = 
crate::format::DictionaryPageHeader {
+                let dictionary_page_header = DictionaryPageHeader {
                     num_values: num_values as i32,
-                    encoding: encoding.into(),
+                    encoding,
                     is_sorted: Some(is_sorted),
                 };
                 page_header.dictionary_page_header = 
Some(dictionary_page_header);
@@ -343,12 +345,14 @@ pub struct PageMetadata {
     pub is_dict: bool,
 }
 
-impl TryFrom<&PageHeader> for PageMetadata {
+impl TryFrom<&crate::file::metadata::thrift_gen::PageHeader> for PageMetadata {
     type Error = ParquetError;
 
-    fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
+    fn try_from(
+        value: &crate::file::metadata::thrift_gen::PageHeader,
+    ) -> std::result::Result<Self, Self::Error> {
         match value.type_ {
-            crate::format::PageType::DATA_PAGE => {
+            PageType::DATA_PAGE => {
                 let header = value.data_page_header.as_ref().unwrap();
                 Ok(PageMetadata {
                     num_rows: None,
@@ -356,12 +360,12 @@ impl TryFrom<&PageHeader> for PageMetadata {
                     is_dict: false,
                 })
             }
-            crate::format::PageType::DICTIONARY_PAGE => Ok(PageMetadata {
+            PageType::DICTIONARY_PAGE => Ok(PageMetadata {
                 num_rows: None,
                 num_levels: None,
                 is_dict: true,
             }),
-            crate::format::PageType::DATA_PAGE_V2 => {
+            PageType::DATA_PAGE_V2 => {
                 let header = value.data_page_header_v2.as_ref().unwrap();
                 Ok(PageMetadata {
                     num_rows: Some(header.num_rows as _),
diff --git a/parquet/src/column/page_encryption.rs 
b/parquet/src/column/page_encryption.rs
index 0fb7c89426..7ee367a289 100644
--- a/parquet/src/column/page_encryption.rs
+++ b/parquet/src/column/page_encryption.rs
@@ -15,14 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::basic::PageType;
 use crate::column::page::CompressedPage;
 use crate::encryption::ciphers::BlockEncryptor;
-use crate::encryption::encrypt::{encrypt_object, FileEncryptor};
+use crate::encryption::encrypt::{encrypt_thrift_object, FileEncryptor};
 use crate::encryption::modules::{create_module_aad, ModuleType};
 use crate::errors::ParquetError;
 use crate::errors::Result;
-use crate::format::PageHeader;
-use crate::format::PageType;
+use crate::file::metadata::thrift_gen::PageHeader;
 use bytes::Bytes;
 use std::io::Write;
 use std::sync::Arc;
@@ -114,6 +114,6 @@ impl PageEncryptor {
             Some(self.page_index),
         )?;
 
-        encrypt_object(page_header, &mut self.block_encryptor, sink, &aad)
+        encrypt_thrift_object(page_header, &mut self.block_encryptor, sink, 
&aad)
     }
 }
diff --git a/parquet/src/column/page_encryption_disabled.rs 
b/parquet/src/column/page_encryption_disabled.rs
index e85b028116..347024f7f2 100644
--- a/parquet/src/column/page_encryption_disabled.rs
+++ b/parquet/src/column/page_encryption_disabled.rs
@@ -17,7 +17,7 @@
 
 use crate::column::page::CompressedPage;
 use crate::errors::Result;
-use crate::format::PageHeader;
+use crate::file::metadata::thrift_gen::PageHeader;
 use std::io::Write;
 
 #[derive(Debug)]
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index e5a9139fb7..059287011b 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -2264,6 +2264,7 @@ mod tests {
 
         let props = ReaderProperties::builder()
             .set_backward_compatible_lz4(false)
+            .set_read_page_statistics(true)
             .build();
         let reader = SerializedPageReader::new_with_properties(
             Arc::new(Bytes::from(buf)),
diff --git a/parquet/src/encryption/encrypt.rs 
b/parquet/src/encryption/encrypt.rs
index 1a241bf7b1..9789302169 100644
--- a/parquet/src/encryption/encrypt.rs
+++ b/parquet/src/encryption/encrypt.rs
@@ -22,6 +22,7 @@ use crate::encryption::ciphers::{
 };
 use crate::errors::{ParquetError, Result};
 use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, 
EncryptionWithColumnKey};
+use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
 use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
 use crate::thrift::TSerializable;
 use ring::rand::{SecureRandom, SystemRandom};
@@ -376,6 +377,18 @@ pub(crate) fn encrypt_object<T: TSerializable, W: Write>(
     Ok(())
 }
 
+/// Write an encrypted Thrift serializable object
+pub(crate) fn encrypt_thrift_object<T: WriteThrift, W: Write>(
+    object: &T,
+    encryptor: &mut Box<dyn BlockEncryptor>,
+    sink: &mut W,
+    module_aad: &[u8],
+) -> Result<()> {
+    let encrypted_buffer = encrypt_thrift_object_to_vec(object, encryptor, 
module_aad)?;
+    sink.write_all(&encrypted_buffer)?;
+    Ok(())
+}
+
 pub(crate) fn write_signed_plaintext_object<T: TSerializable, W: Write>(
     object: &T,
     encryptor: &mut Box<dyn BlockEncryptor>,
@@ -414,6 +427,21 @@ pub(crate) fn encrypt_object_to_vec<T: TSerializable>(
     encryptor.encrypt(buffer.as_ref(), module_aad)
 }
 
+/// Encrypt a Thrift serializable object to a byte vector
+pub(crate) fn encrypt_thrift_object_to_vec<T: WriteThrift>(
+    object: &T,
+    encryptor: &mut Box<dyn BlockEncryptor>,
+    module_aad: &[u8],
+) -> Result<Vec<u8>> {
+    let mut buffer: Vec<u8> = vec![];
+    {
+        let mut unencrypted_protocol = ThriftCompactOutputProtocol::new(&mut 
buffer);
+        object.write_thrift(&mut unencrypted_protocol)?;
+    }
+
+    encryptor.encrypt(buffer.as_ref(), module_aad)
+}
+
 /// Get the crypto metadata for a column from the file encryption properties
 pub(crate) fn get_column_crypto_metadata(
     properties: &FileEncryptionProperties,
diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs
index be08245e95..dab444a28f 100644
--- a/parquet/src/errors.rs
+++ b/parquet/src/errors.rs
@@ -19,6 +19,7 @@
 
 use core::num::TryFromIntError;
 use std::error::Error;
+use std::string::FromUtf8Error;
 use std::{cell, io, result, str};
 
 #[cfg(feature = "arrow")]
@@ -124,6 +125,13 @@ impl From<str::Utf8Error> for ParquetError {
         ParquetError::External(Box::new(e))
     }
 }
+
+impl From<FromUtf8Error> for ParquetError {
+    fn from(e: FromUtf8Error) -> ParquetError {
+        ParquetError::External(Box::new(e))
+    }
+}
+
 #[cfg(feature = "arrow")]
 impl From<ArrowError> for ParquetError {
     fn from(e: ArrowError) -> ParquetError {
diff --git a/parquet/src/file/metadata/thrift_gen.rs 
b/parquet/src/file/metadata/thrift_gen.rs
index b656bacc8c..7515a70a63 100644
--- a/parquet/src/file/metadata/thrift_gen.rs
+++ b/parquet/src/file/metadata/thrift_gen.rs
@@ -67,103 +67,6 @@ pub(crate) struct SchemaElement<'a> {
 }
 );
 
-thrift_struct!(
-pub(crate) struct DataPageHeader {
-  /// Number of values, including NULLs, in this data page.
-  ///
-  /// If a OffsetIndex is present, a page must begin at a row
-  /// boundary (repetition_level = 0). Otherwise, pages may begin
-  /// within a row (repetition_level > 0).
-  1: required i32 num_values
-
-  /// Encoding used for this data page
-  2: required Encoding encoding
-
-  /// Encoding used for definition levels
-  3: required Encoding definition_level_encoding;
-
-  /// Encoding used for repetition levels
-  4: required Encoding repetition_level_encoding;
-
-  // Optional statistics for the data in this page
-  // page stats are pretty useless...lets ignore them
-  //5: optional Statistics statistics;
-}
-);
-
-thrift_struct!(
-    pub(crate) struct IndexPageHeader {}
-);
-
-thrift_struct!(
-pub(crate) struct DictionaryPageHeader {
-  /// Number of values in the dictionary
-  1: required i32 num_values;
-
-  /// Encoding using this dictionary page
-  2: required Encoding encoding
-
-  /// If true, the entries in the dictionary are sorted in ascending order
-  3: optional bool is_sorted;
-}
-);
-
-thrift_struct!(
-pub(crate) struct DataPageHeaderV2 {
-  /// Number of values, including NULLs, in this data page.
-  1: required i32 num_values
-  /// Number of NULL values, in this data page.
-  /// Number of non-null = num_values - num_nulls which is also the number of 
values in the data section
-  2: required i32 num_nulls
-  /// Number of rows in this data page. Every page must begin at a
-  /// row boundary (repetition_level = 0): rows must **not** be
-  /// split across page boundaries when using V2 data pages.
-  3: required i32 num_rows
-  /// Encoding used for data in this page
-  4: required Encoding encoding
-
-  // repetition levels and definition levels are always using RLE (without 
size in it)
-
-  /// Length of the definition levels
-  5: required i32 definition_levels_byte_length;
-  /// Length of the repetition levels
-  6: required i32 repetition_levels_byte_length;
-
-  /// Whether the values are compressed.
-  /// Which means the section of the page between
-  /// definition_levels_byte_length + repetition_levels_byte_length + 1 and 
compressed_page_size (included)
-  /// is compressed with the compression_codec.
-  /// If missing it is considered compressed
-  7: optional bool is_compressed = true;
-
-  // Optional statistics for the data in this page
-  //8: optional Statistics statistics;
-}
-);
-
-thrift_struct!(
-#[allow(dead_code)]
-pub(crate) struct PageHeader {
-  /// the type of the page: indicates which of the *_header fields is set
-  1: required PageType type_
-
-  /// Uncompressed page size in bytes (not including this header)
-  2: required i32 uncompressed_page_size
-
-  /// Compressed (and potentially encrypted) page size in bytes, not including 
this header
-  3: required i32 compressed_page_size
-
-  /// The 32-bit CRC checksum for the page, to be be calculated as follows:
-  4: optional i32 crc
-
-  // Headers for page specific data.  One only will be set.
-  5: optional DataPageHeader data_page_header;
-  6: optional IndexPageHeader index_page_header;
-  7: optional DictionaryPageHeader dictionary_page_header;
-  8: optional DataPageHeaderV2 data_page_header_v2;
-}
-);
-
 thrift_struct!(
 pub(crate) struct AesGcmV1<'a> {
   /// AAD prefix
@@ -214,6 +117,13 @@ pub(crate) struct FileCryptoMetaData<'a> {
 }
 );
 
+// expose for benchmarking
+pub(crate) fn bench_file_metadata(bytes: &bytes::Bytes) {
+    use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
+    let mut prot = ThriftSliceInputProtocol::new(bytes);
+    crate::file::metadata::thrift_gen::FileMetaData::read_thrift(&mut 
prot).unwrap();
+}
+
 // the following are only used internally so are private
 thrift_struct!(
 struct FileMetaData<'a> {
@@ -909,6 +819,345 @@ impl<'a, R: ThriftCompactInputProtocol<'a>> 
ReadThrift<'a, R> for ParquetMetaDat
     }
 }
 
+thrift_struct!(
+    pub(crate) struct IndexPageHeader {}
+);
+
+thrift_struct!(
+pub(crate) struct DictionaryPageHeader {
+  /// Number of values in the dictionary
+  1: required i32 num_values;
+
+  /// Encoding using this dictionary page
+  2: required Encoding encoding
+
+  /// If true, the entries in the dictionary are sorted in ascending order
+  3: optional bool is_sorted;
+}
+);
+
+thrift_struct!(
+/// Statistics for the page header.
+///
+/// This is a duplicate of the [`Statistics`] struct above. Because the page 
reader uses
+/// the [`Read`] API, we cannot read the min/max values as slices. This should 
not be
+/// a huge problem since this crate no longer reads the page header statistics 
by default.
+///
+/// [`Read`]: crate::parquet_thrift::ThriftReadInputProtocol
+pub(crate) struct PageStatistics {
+   1: optional binary max;
+   2: optional binary min;
+   3: optional i64 null_count;
+   4: optional i64 distinct_count;
+   5: optional binary max_value;
+   6: optional binary min_value;
+   7: optional bool is_max_value_exact;
+   8: optional bool is_min_value_exact;
+}
+);
+
+thrift_struct!(
+pub(crate) struct DataPageHeader {
+  1: required i32 num_values
+  2: required Encoding encoding
+  3: required Encoding definition_level_encoding;
+  4: required Encoding repetition_level_encoding;
+  5: optional PageStatistics statistics;
+}
+);
+
+impl DataPageHeader {
+    // reader that skips decoding page statistics
+    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
+    where
+        R: ThriftCompactInputProtocol<'a>,
+    {
+        let mut num_values: Option<i32> = None;
+        let mut encoding: Option<Encoding> = None;
+        let mut definition_level_encoding: Option<Encoding> = None;
+        let mut repetition_level_encoding: Option<Encoding> = None;
+        let statistics: Option<PageStatistics> = None;
+        let mut last_field_id = 0i16;
+        loop {
+            let field_ident = prot.read_field_begin(last_field_id)?;
+            if field_ident.field_type == FieldType::Stop {
+                break;
+            }
+            match field_ident.id {
+                1 => {
+                    let val = i32::read_thrift(&mut *prot)?;
+                    num_values = Some(val);
+                }
+                2 => {
+                    let val = Encoding::read_thrift(&mut *prot)?;
+                    encoding = Some(val);
+                }
+                3 => {
+                    let val = Encoding::read_thrift(&mut *prot)?;
+                    definition_level_encoding = Some(val);
+                }
+                4 => {
+                    let val = Encoding::read_thrift(&mut *prot)?;
+                    repetition_level_encoding = Some(val);
+                }
+                _ => {
+                    prot.skip(field_ident.field_type)?;
+                }
+            };
+            last_field_id = field_ident.id;
+        }
+        let Some(num_values) = num_values else {
+            return Err(ParquetError::General(
+                "Required field num_values is missing".to_owned(),
+            ));
+        };
+        let Some(encoding) = encoding else {
+            return Err(ParquetError::General(
+                "Required field encoding is missing".to_owned(),
+            ));
+        };
+        let Some(definition_level_encoding) = definition_level_encoding else {
+            return Err(ParquetError::General(
+                "Required field definition_level_encoding is 
missing".to_owned(),
+            ));
+        };
+        let Some(repetition_level_encoding) = repetition_level_encoding else {
+            return Err(ParquetError::General(
+                "Required field repetition_level_encoding is 
missing".to_owned(),
+            ));
+        };
+        Ok(Self {
+            num_values,
+            encoding,
+            definition_level_encoding,
+            repetition_level_encoding,
+            statistics,
+        })
+    }
+}
+
+thrift_struct!(
+pub(crate) struct DataPageHeaderV2 {
+  1: required i32 num_values
+  2: required i32 num_nulls
+  3: required i32 num_rows
+  4: required Encoding encoding
+  5: required i32 definition_levels_byte_length;
+  6: required i32 repetition_levels_byte_length;
+  7: optional bool is_compressed = true;
+  8: optional PageStatistics statistics;
+}
+);
+
+impl DataPageHeaderV2 {
+    // reader that skips decoding page statistics
+    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
+    where
+        R: ThriftCompactInputProtocol<'a>,
+    {
+        let mut num_values: Option<i32> = None;
+        let mut num_nulls: Option<i32> = None;
+        let mut num_rows: Option<i32> = None;
+        let mut encoding: Option<Encoding> = None;
+        let mut definition_levels_byte_length: Option<i32> = None;
+        let mut repetition_levels_byte_length: Option<i32> = None;
+        let mut is_compressed: Option<bool> = None;
+        let statistics: Option<PageStatistics> = None;
+        let mut last_field_id = 0i16;
+        loop {
+            let field_ident = prot.read_field_begin(last_field_id)?;
+            if field_ident.field_type == FieldType::Stop {
+                break;
+            }
+            match field_ident.id {
+                1 => {
+                    let val = i32::read_thrift(&mut *prot)?;
+                    num_values = Some(val);
+                }
+                2 => {
+                    let val = i32::read_thrift(&mut *prot)?;
+                    num_nulls = Some(val);
+                }
+                3 => {
+                    let val = i32::read_thrift(&mut *prot)?;
+                    num_rows = Some(val);
+                }
+                4 => {
+                    let val = Encoding::read_thrift(&mut *prot)?;
+                    encoding = Some(val);
+                }
+                5 => {
+                    let val = i32::read_thrift(&mut *prot)?;
+                    definition_levels_byte_length = Some(val);
+                }
+                6 => {
+                    let val = i32::read_thrift(&mut *prot)?;
+                    repetition_levels_byte_length = Some(val);
+                }
+                7 => {
+                    let val = field_ident.bool_val.unwrap();
+                    is_compressed = Some(val);
+                }
+                _ => {
+                    prot.skip(field_ident.field_type)?;
+                }
+            };
+            last_field_id = field_ident.id;
+        }
+        let Some(num_values) = num_values else {
+            return Err(ParquetError::General(
+                "Required field num_values is missing".to_owned(),
+            ));
+        };
+        let Some(num_nulls) = num_nulls else {
+            return Err(ParquetError::General(
+                "Required field num_nulls is missing".to_owned(),
+            ));
+        };
+        let Some(num_rows) = num_rows else {
+            return Err(ParquetError::General(
+                "Required field num_rows is missing".to_owned(),
+            ));
+        };
+        let Some(encoding) = encoding else {
+            return Err(ParquetError::General(
+                "Required field encoding is missing".to_owned(),
+            ));
+        };
+        let Some(definition_levels_byte_length) = 
definition_levels_byte_length else {
+            return Err(ParquetError::General(
+                "Required field definition_levels_byte_length is 
missing".to_owned(),
+            ));
+        };
+        let Some(repetition_levels_byte_length) = 
repetition_levels_byte_length else {
+            return Err(ParquetError::General(
+                "Required field repetition_levels_byte_length is 
missing".to_owned(),
+            ));
+        };
+        Ok(Self {
+            num_values,
+            num_nulls,
+            num_rows,
+            encoding,
+            definition_levels_byte_length,
+            repetition_levels_byte_length,
+            is_compressed,
+            statistics,
+        })
+    }
+}
+
+thrift_struct!(
+pub(crate) struct PageHeader {
+  /// the type of the page: indicates which of the *_header fields is set
+  1: required PageType type_
+
+  /// Uncompressed page size in bytes (not including this header)
+  2: required i32 uncompressed_page_size
+
+  /// Compressed (and potentially encrypted) page size in bytes, not including 
this header
+  3: required i32 compressed_page_size
+
+  /// The 32-bit CRC checksum for the page, to be be calculated as follows:
+  4: optional i32 crc
+
+  // Headers for page specific data.  One only will be set.
+  5: optional DataPageHeader data_page_header;
+  6: optional IndexPageHeader index_page_header;
+  7: optional DictionaryPageHeader dictionary_page_header;
+  8: optional DataPageHeaderV2 data_page_header_v2;
+}
+);
+
+impl PageHeader {
+    // reader that skips reading page statistics. obtained by running
+    // `cargo expand -p parquet --all-features --lib 
file::metadata::thrift_gen`
+    // and modifying the impl of `read_thrift`
+    pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) -> 
Result<Self>
+    where
+        R: ThriftCompactInputProtocol<'a>,
+    {
+        let mut type_: Option<PageType> = None;
+        let mut uncompressed_page_size: Option<i32> = None;
+        let mut compressed_page_size: Option<i32> = None;
+        let mut crc: Option<i32> = None;
+        let mut data_page_header: Option<DataPageHeader> = None;
+        let mut index_page_header: Option<IndexPageHeader> = None;
+        let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
+        let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
+        let mut last_field_id = 0i16;
+        loop {
+            let field_ident = prot.read_field_begin(last_field_id)?;
+            if field_ident.field_type == FieldType::Stop {
+                break;
+            }
+            match field_ident.id {
+                1 => {
+                    let val = PageType::read_thrift(&mut *prot)?;
+                    type_ = Some(val);
+                }
+                2 => {
+                    let val = i32::read_thrift(&mut *prot)?;
+                    uncompressed_page_size = Some(val);
+                }
+                3 => {
+                    let val = i32::read_thrift(&mut *prot)?;
+                    compressed_page_size = Some(val);
+                }
+                4 => {
+                    let val = i32::read_thrift(&mut *prot)?;
+                    crc = Some(val);
+                }
+                5 => {
+                    let val = DataPageHeader::read_thrift_without_stats(&mut 
*prot)?;
+                    data_page_header = Some(val);
+                }
+                6 => {
+                    let val = IndexPageHeader::read_thrift(&mut *prot)?;
+                    index_page_header = Some(val);
+                }
+                7 => {
+                    let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
+                    dictionary_page_header = Some(val);
+                }
+                8 => {
+                    let val = DataPageHeaderV2::read_thrift_without_stats(&mut 
*prot)?;
+                    data_page_header_v2 = Some(val);
+                }
+                _ => {
+                    prot.skip(field_ident.field_type)?;
+                }
+            };
+            last_field_id = field_ident.id;
+        }
+        let Some(type_) = type_ else {
+            return Err(ParquetError::General(
+                "Required field type_ is missing".to_owned(),
+            ));
+        };
+        let Some(uncompressed_page_size) = uncompressed_page_size else {
+            return Err(ParquetError::General(
+                "Required field uncompressed_page_size is missing".to_owned(),
+            ));
+        };
+        let Some(compressed_page_size) = compressed_page_size else {
+            return Err(ParquetError::General(
+                "Required field compressed_page_size is missing".to_owned(),
+            ));
+        };
+        Ok(Self {
+            type_,
+            uncompressed_page_size,
+            compressed_page_size,
+            crc,
+            data_page_header,
+            index_page_header,
+            dictionary_page_header,
+            data_page_header_v2,
+        })
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use crate::file::metadata::thrift_gen::BoundingBox;
diff --git a/parquet/src/file/page_encoding_stats.rs 
b/parquet/src/file/page_encoding_stats.rs
index 934e177de0..3f81353e28 100644
--- a/parquet/src/file/page_encoding_stats.rs
+++ b/parquet/src/file/page_encoding_stats.rs
@@ -20,7 +20,7 @@
 use std::io::Write;
 
 use crate::basic::{Encoding, PageType};
-use crate::errors::Result;
+use crate::errors::{ParquetError, Result};
 use crate::parquet_thrift::{
     ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, 
ThriftCompactOutputProtocol,
     WriteThrift, WriteThriftField,
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index cb6b5167c8..b6003dc4d9 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -1191,6 +1191,7 @@ impl ColumnProperties {
 pub type ReaderPropertiesPtr = Arc<ReaderProperties>;
 
 const DEFAULT_READ_BLOOM_FILTER: bool = false;
+const DEFAULT_READ_PAGE_STATS: bool = false;
 
 /// Configuration settings for reading parquet files.
 ///
@@ -1213,6 +1214,7 @@ const DEFAULT_READ_BLOOM_FILTER: bool = false;
 pub struct ReaderProperties {
     codec_options: CodecOptions,
     read_bloom_filter: bool,
+    read_page_stats: bool,
 }
 
 impl ReaderProperties {
@@ -1230,6 +1232,11 @@ impl ReaderProperties {
     pub(crate) fn read_bloom_filter(&self) -> bool {
         self.read_bloom_filter
     }
+
+    /// Returns whether to read page level statistics
+    pub(crate) fn read_page_stats(&self) -> bool {
+        self.read_page_stats
+    }
 }
 
 /// Builder for parquet file reader configuration. See example on
@@ -1237,6 +1244,7 @@ impl ReaderProperties {
 pub struct ReaderPropertiesBuilder {
     codec_options_builder: CodecOptionsBuilder,
     read_bloom_filter: Option<bool>,
+    read_page_stats: Option<bool>,
 }
 
 /// Reader properties builder.
@@ -1246,6 +1254,7 @@ impl ReaderPropertiesBuilder {
         Self {
             codec_options_builder: CodecOptionsBuilder::default(),
             read_bloom_filter: None,
+            read_page_stats: None,
         }
     }
 
@@ -1254,6 +1263,7 @@ impl ReaderPropertiesBuilder {
         ReaderProperties {
             codec_options: self.codec_options_builder.build(),
             read_bloom_filter: 
self.read_bloom_filter.unwrap_or(DEFAULT_READ_BLOOM_FILTER),
+            read_page_stats: 
self.read_page_stats.unwrap_or(DEFAULT_READ_PAGE_STATS),
         }
     }
 
@@ -1282,6 +1292,20 @@ impl ReaderPropertiesBuilder {
         self.read_bloom_filter = Some(value);
         self
     }
+
+    /// Enable/disable reading page-level statistics
+    ///
+    /// If set to `true`, then the reader will decode and populate the 
[`Statistics`] for
+    /// each page, if present.
+    /// If set to `false`, then the reader will skip decoding the statistics.
+    ///
+    /// By default statistics will not be decoded.
+    ///
+    /// [`Statistics`]: crate::file::statistics::Statistics
+    pub fn set_read_page_statistics(mut self, value: bool) -> Self {
+        self.read_page_stats = Some(value);
+        self
+    }
 }
 
 #[cfg(test)]
diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index 7285980453..1442f0f67c 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -18,31 +18,30 @@
 //! Contains implementations of the reader traits FileReader, RowGroupReader 
and PageReader
 //! Also contains implementations of the ChunkReader for files (with 
buffering) and byte arrays (RAM)
 
-use crate::basic::{Encoding, Type};
+use crate::basic::{PageType, Type};
 use crate::bloom_filter::Sbbf;
 use crate::column::page::{Page, PageMetadata, PageReader};
 use crate::compression::{create_codec, Codec};
 #[cfg(feature = "encryption")]
 use crate::encryption::decrypt::{read_and_decrypt, CryptoContext};
 use crate::errors::{ParquetError, Result};
+use crate::file::metadata::thrift_gen::PageHeader;
 use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
+use crate::file::statistics;
 use crate::file::{
     metadata::*,
     properties::{ReaderProperties, ReaderPropertiesPtr},
     reader::*,
-    statistics,
 };
-use crate::format::{PageHeader, PageType};
+#[cfg(feature = "encryption")]
+use crate::parquet_thrift::ThriftSliceInputProtocol;
+use crate::parquet_thrift::{ReadThrift, ThriftReadInputProtocol};
 use crate::record::reader::RowIter;
 use crate::record::Row;
 use crate::schema::types::Type as SchemaType;
-#[cfg(feature = "encryption")]
-use crate::thrift::TCompactSliceInputProtocol;
-use crate::thrift::TSerializable;
 use bytes::Bytes;
 use std::collections::VecDeque;
 use std::{fs::File, io::Read, path::Path, sync::Arc};
-use thrift::protocol::TCompactInputProtocol;
 
 impl TryFrom<File> for SerializedFileReader<File> {
     type Error = ParquetError;
@@ -423,7 +422,7 @@ pub(crate) fn decode_page(
             Page::DictionaryPage {
                 buf: buffer,
                 num_values: dict_header.num_values.try_into()?,
-                encoding: Encoding::try_from(dict_header.encoding)?,
+                encoding: dict_header.encoding,
                 is_sorted,
             }
         }
@@ -434,10 +433,10 @@ pub(crate) fn decode_page(
             Page::DataPage {
                 buf: buffer,
                 num_values: header.num_values.try_into()?,
-                encoding: Encoding::try_from(header.encoding)?,
-                def_level_encoding: 
Encoding::try_from(header.definition_level_encoding)?,
-                rep_level_encoding: 
Encoding::try_from(header.repetition_level_encoding)?,
-                statistics: statistics::from_thrift(physical_type, 
header.statistics)?,
+                encoding: header.encoding,
+                def_level_encoding: header.definition_level_encoding,
+                rep_level_encoding: header.repetition_level_encoding,
+                statistics: statistics::from_thrift_page_stats(physical_type, 
header.statistics)?,
             }
         }
         PageType::DATA_PAGE_V2 => {
@@ -448,13 +447,13 @@ pub(crate) fn decode_page(
             Page::DataPageV2 {
                 buf: buffer,
                 num_values: header.num_values.try_into()?,
-                encoding: Encoding::try_from(header.encoding)?,
+                encoding: header.encoding,
                 num_nulls: header.num_nulls.try_into()?,
                 num_rows: header.num_rows.try_into()?,
                 def_levels_byte_len: 
header.definition_levels_byte_length.try_into()?,
                 rep_levels_byte_len: 
header.repetition_levels_byte_length.try_into()?,
                 is_compressed,
-                statistics: statistics::from_thrift(physical_type, 
header.statistics)?,
+                statistics: statistics::from_thrift_page_stats(physical_type, 
header.statistics)?,
             }
         }
         _ => {
@@ -499,6 +498,8 @@ enum SerializedPageReaderState {
 
 #[derive(Default)]
 struct SerializedPageReaderContext {
+    /// Controls decoding of page-level statistics
+    read_stats: bool,
     /// Crypto context carrying objects required for decryption
     #[cfg(feature = "encryption")]
     crypto_context: Option<Arc<CryptoContext>>,
@@ -610,12 +611,16 @@ impl<R: ChunkReader> SerializedPageReader<R> {
                 require_dictionary: meta.dictionary_page_offset().is_some(),
             },
         };
+        let mut context = SerializedPageReaderContext::default();
+        if props.read_page_stats() {
+            context.read_stats = true;
+        }
         Ok(Self {
             reader,
             decompressor,
             state,
             physical_type: meta.column_type(),
-            context: Default::default(),
+            context,
         })
     }
 
@@ -732,8 +737,12 @@ impl SerializedPageReaderContext {
         _page_index: usize,
         _dictionary_page: bool,
     ) -> Result<PageHeader> {
-        let mut prot = TCompactInputProtocol::new(input);
-        Ok(PageHeader::read_from_in_protocol(&mut prot)?)
+        let mut prot = ThriftReadInputProtocol::new(input);
+        if self.read_stats {
+            Ok(PageHeader::read_thrift(&mut prot)?)
+        } else {
+            Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
+        }
     }
 
     fn decrypt_page_data<T>(
@@ -756,8 +765,14 @@ impl SerializedPageReaderContext {
     ) -> Result<PageHeader> {
         match self.page_crypto_context(page_index, dictionary_page) {
             None => {
-                let mut prot = TCompactInputProtocol::new(input);
-                Ok(PageHeader::read_from_in_protocol(&mut prot)?)
+                let mut prot = ThriftReadInputProtocol::new(input);
+                if self.read_stats {
+                    Ok(PageHeader::read_thrift(&mut prot)?)
+                } else {
+                    use crate::file::metadata::thrift_gen::PageHeader;
+
+                    Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
+                }
             }
             Some(page_crypto_context) => {
                 let data_decryptor = page_crypto_context.data_decryptor();
@@ -770,8 +785,12 @@ impl SerializedPageReaderContext {
                     ))
                 })?;
 
-                let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
-                Ok(PageHeader::read_from_in_protocol(&mut prot)?)
+                let mut prot = ThriftSliceInputProtocol::new(buf.as_slice());
+                if self.read_stats {
+                    Ok(PageHeader::read_thrift(&mut prot)?)
+                } else {
+                    Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
+                }
             }
         }
     }
@@ -1107,7 +1126,7 @@ mod tests {
     };
     use crate::file::properties::{EnabledStatistics, WriterProperties};
 
-    use crate::basic::{self, BoundaryOrder, ColumnOrder, SortOrder};
+    use crate::basic::{self, BoundaryOrder, ColumnOrder, Encoding, SortOrder};
     use crate::column::reader::ColumnReader;
     use crate::data_type::private::ParquetValueType;
     use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
@@ -1396,7 +1415,7 @@ mod tests {
                     assert_eq!(def_levels_byte_len, 2);
                     assert_eq!(rep_levels_byte_len, 0);
                     assert!(is_compressed);
-                    assert!(statistics.is_some());
+                    assert!(statistics.is_none()); // page stats are no longer 
read
                     true
                 }
                 _ => false,
@@ -1498,7 +1517,7 @@ mod tests {
                     assert_eq!(def_levels_byte_len, 2);
                     assert_eq!(rep_levels_byte_len, 0);
                     assert!(is_compressed);
-                    assert!(statistics.is_some());
+                    assert!(statistics.is_none()); // page stats are no longer 
read
                     true
                 }
                 _ => false,
diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs
index d4501830ac..e51f445b7e 100644
--- a/parquet/src/file/statistics.rs
+++ b/parquet/src/file/statistics.rs
@@ -45,6 +45,7 @@ use crate::basic::Type;
 use crate::data_type::private::ParquetValueType;
 use crate::data_type::*;
 use crate::errors::{ParquetError, Result};
+use crate::file::metadata::thrift_gen::PageStatistics;
 use crate::util::bit_util::FromBytes;
 
 pub(crate) mod private {
@@ -117,6 +118,7 @@ macro_rules! statistics_enum_func {
     }};
 }
 
+// FIXME(ets): remove this when done with format changes
 /// Converts Thrift definition into `Statistics`.
 pub fn from_thrift(
     physical_type: Type,
@@ -266,6 +268,156 @@ pub fn from_thrift(
     })
 }
 
+/// Converts Thrift definition into `Statistics`.
+pub(crate) fn from_thrift_page_stats(
+    physical_type: Type,
+    thrift_stats: Option<PageStatistics>,
+) -> Result<Option<Statistics>> {
+    Ok(match thrift_stats {
+        Some(stats) => {
+            // Number of nulls recorded, when it is not available, we just 
mark it as 0.
+            // TODO this should be `None` if there is no information about 
NULLS.
+            // see https://github.com/apache/arrow-rs/pull/6216/files
+            let null_count = stats.null_count.unwrap_or(0);
+
+            if null_count < 0 {
+                return Err(ParquetError::General(format!(
+                    "Statistics null count is negative {null_count}",
+                )));
+            }
+
+            // Generic null count.
+            let null_count = Some(null_count as u64);
+            // Generic distinct count (count of distinct values occurring)
+            let distinct_count = stats.distinct_count.map(|value| value as 
u64);
+            // Whether or not statistics use deprecated min/max fields.
+            let old_format = stats.min_value.is_none() && 
stats.max_value.is_none();
+            // Generic min value as bytes.
+            let min = if old_format {
+                stats.min
+            } else {
+                stats.min_value
+            };
+            // Generic max value as bytes.
+            let max = if old_format {
+                stats.max
+            } else {
+                stats.max_value
+            };
+
+            fn check_len(min: &Option<Vec<u8>>, max: &Option<Vec<u8>>, len: 
usize) -> Result<()> {
+                if let Some(min) = min {
+                    if min.len() < len {
+                        return Err(ParquetError::General(
+                            "Insufficient bytes to parse min 
statistic".to_string(),
+                        ));
+                    }
+                }
+                if let Some(max) = max {
+                    if max.len() < len {
+                        return Err(ParquetError::General(
+                            "Insufficient bytes to parse max 
statistic".to_string(),
+                        ));
+                    }
+                }
+                Ok(())
+            }
+
+            match physical_type {
+                Type::BOOLEAN => check_len(&min, &max, 1),
+                Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
+                Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
+                Type::INT96 => check_len(&min, &max, 12),
+                _ => Ok(()),
+            }?;
+
+            // Values are encoded using PLAIN encoding definition, except that
+            // variable-length byte arrays do not include a length prefix.
+            //
+            // Instead of using actual decoder, we manually convert values.
+            let res = match physical_type {
+                Type::BOOLEAN => Statistics::boolean(
+                    min.map(|data| data[0] != 0),
+                    max.map(|data| data[0] != 0),
+                    distinct_count,
+                    null_count,
+                    old_format,
+                ),
+                Type::INT32 => Statistics::int32(
+                    min.map(|data| 
i32::from_le_bytes(data[..4].try_into().unwrap())),
+                    max.map(|data| 
i32::from_le_bytes(data[..4].try_into().unwrap())),
+                    distinct_count,
+                    null_count,
+                    old_format,
+                ),
+                Type::INT64 => Statistics::int64(
+                    min.map(|data| 
i64::from_le_bytes(data[..8].try_into().unwrap())),
+                    max.map(|data| 
i64::from_le_bytes(data[..8].try_into().unwrap())),
+                    distinct_count,
+                    null_count,
+                    old_format,
+                ),
+                Type::INT96 => {
+                    // INT96 statistics may not be correct, because comparison 
is signed
+                    let min = if let Some(data) = min {
+                        assert_eq!(data.len(), 12);
+                        Some(Int96::try_from_le_slice(&data)?)
+                    } else {
+                        None
+                    };
+                    let max = if let Some(data) = max {
+                        assert_eq!(data.len(), 12);
+                        Some(Int96::try_from_le_slice(&data)?)
+                    } else {
+                        None
+                    };
+                    Statistics::int96(min, max, distinct_count, null_count, 
old_format)
+                }
+                Type::FLOAT => Statistics::float(
+                    min.map(|data| 
f32::from_le_bytes(data[..4].try_into().unwrap())),
+                    max.map(|data| 
f32::from_le_bytes(data[..4].try_into().unwrap())),
+                    distinct_count,
+                    null_count,
+                    old_format,
+                ),
+                Type::DOUBLE => Statistics::double(
+                    min.map(|data| 
f64::from_le_bytes(data[..8].try_into().unwrap())),
+                    max.map(|data| 
f64::from_le_bytes(data[..8].try_into().unwrap())),
+                    distinct_count,
+                    null_count,
+                    old_format,
+                ),
+                Type::BYTE_ARRAY => Statistics::ByteArray(
+                    ValueStatistics::new(
+                        min.map(ByteArray::from),
+                        max.map(ByteArray::from),
+                        distinct_count,
+                        null_count,
+                        old_format,
+                    )
+                    
.with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
+                    
.with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
+                ),
+                Type::FIXED_LEN_BYTE_ARRAY => Statistics::FixedLenByteArray(
+                    ValueStatistics::new(
+                        min.map(ByteArray::from).map(FixedLenByteArray::from),
+                        max.map(ByteArray::from).map(FixedLenByteArray::from),
+                        distinct_count,
+                        null_count,
+                        old_format,
+                    )
+                    
.with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
+                    
.with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
+                ),
+            };
+
+            Some(res)
+        }
+        None => None,
+    })
+}
+
+// FIXME(ets): remove when done with format changes
 /// Convert Statistics into Thrift definition.
 pub fn to_thrift(stats: Option<&Statistics>) -> 
Option<crate::format::Statistics> {
     let stats = stats?;
@@ -315,6 +467,55 @@ pub fn to_thrift(stats: Option<&Statistics>) -> 
Option<crate::format::Statistics
     Some(thrift_stats)
 }
 
+/// Convert Statistics into Thrift definition.
+pub(crate) fn page_stats_to_thrift(stats: Option<&Statistics>) -> 
Option<PageStatistics> {
+    let stats = stats?;
+
+    // record null count if it can fit in i64
+    let null_count = stats
+        .null_count_opt()
+        .and_then(|value| i64::try_from(value).ok());
+
+    // record distinct count if it can fit in i64
+    let distinct_count = stats
+        .distinct_count_opt()
+        .and_then(|value| i64::try_from(value).ok());
+
+    let mut thrift_stats = PageStatistics {
+        max: None,
+        min: None,
+        null_count,
+        distinct_count,
+        max_value: None,
+        min_value: None,
+        is_max_value_exact: None,
+        is_min_value_exact: None,
+    };
+
+    // Get min/max if set.
+    let (min, max, min_exact, max_exact) = (
+        stats.min_bytes_opt().map(|x| x.to_vec()),
+        stats.max_bytes_opt().map(|x| x.to_vec()),
+        Some(stats.min_is_exact()),
+        Some(stats.max_is_exact()),
+    );
+    if stats.is_min_max_backwards_compatible() {
+        // Copy to deprecated min, max values for compatibility with older 
readers
+        thrift_stats.min.clone_from(&min);
+        thrift_stats.max.clone_from(&max);
+    }
+
+    if !stats.is_min_max_deprecated() {
+        thrift_stats.min_value = min;
+        thrift_stats.max_value = max;
+    }
+
+    thrift_stats.is_min_value_exact = min_exact;
+    thrift_stats.is_max_value_exact = max_exact;
+
+    Some(thrift_stats)
+}
+
 /// Strongly typed statistics for a column chunk within a row group.
 ///
 /// This structure is a natively typed, in memory representation of the
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 158b2a21b7..71881b00ff 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -19,13 +19,13 @@
 //! using row group writers and column writers respectively.
 
 use crate::bloom_filter::Sbbf;
+use crate::file::metadata::thrift_gen::PageHeader;
 use crate::file::page_index::index::Index;
 use crate::file::page_index::offset_index::OffsetIndexMetaData;
-use crate::thrift::TSerializable;
+use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
 use std::fmt::Debug;
 use std::io::{BufWriter, IoSlice, Read};
 use std::{io::Write, sync::Arc};
-use thrift::protocol::TCompactOutputProtocol;
 
 use crate::column::page_encryption::PageEncryptor;
 use crate::column::writer::{get_typed_column_writer_mut, ColumnCloseResult, 
ColumnWriterImpl};
@@ -939,15 +939,15 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> {
     /// Serializes page header into Thrift.
     /// Returns number of bytes that have been written into the sink.
     #[inline]
-    fn serialize_page_header(&mut self, header: crate::format::PageHeader) -> 
Result<usize> {
+    fn serialize_page_header(&mut self, header: PageHeader) -> Result<usize> {
         let start_pos = self.sink.bytes_written();
         match self.page_encryptor_and_sink_mut() {
             Some((page_encryptor, sink)) => {
                 page_encryptor.encrypt_page_header(&header, sink)?;
             }
             None => {
-                let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
-                header.write_to_out_protocol(&mut protocol)?;
+                let mut protocol = ThriftCompactOutputProtocol::new(&mut 
self.sink);
+                header.write_thrift(&mut protocol)?;
             }
         }
         Ok(self.sink.bytes_written() - start_pos)
@@ -1602,6 +1602,7 @@ mod tests {
 
             let props = ReaderProperties::builder()
                 .set_backward_compatible_lz4(false)
+                .set_read_page_statistics(true)
                 .build();
             let mut page_reader = SerializedPageReader::new_with_properties(
                 Arc::new(reader),
diff --git a/parquet/src/parquet_macros.rs b/parquet/src/parquet_macros.rs
index 889e5fafef..5720fd4ce0 100644
--- a/parquet/src/parquet_macros.rs
+++ b/parquet/src/parquet_macros.rs
@@ -323,6 +323,66 @@ macro_rules! thrift_struct {
     }
 }
 
+/// only implements ReadThrift for the give IDL struct definition
+#[macro_export]
+macro_rules! thrift_struct_read_impl {
+    ($(#[$($def_attrs:tt)*])* $vis:vis struct $identifier:ident $(< 
$lt:lifetime >)? { $($(#[$($field_attrs:tt)*])* $field_id:literal : 
$required_or_optional:ident $field_type:ident $(< $field_lt:lifetime >)? $(< 
$element_type:ident >)? $field_name:ident $(= $default_value:literal)? $(;)?)* 
}) => {
+        $(#[cfg_attr(not(doctest), $($def_attrs)*)])*
+        impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for 
$identifier $(<$lt>)? {
+            fn read_thrift(prot: &mut R) -> Result<Self> {
+                $(let mut $field_name: 
Option<$crate::__thrift_field_type!($field_type $($field_lt)? 
$($element_type)?)> = None;)*
+                let mut last_field_id = 0i16;
+                loop {
+                    let field_ident = prot.read_field_begin(last_field_id)?;
+                    if field_ident.field_type == FieldType::Stop {
+                        break;
+                    }
+                    match field_ident.id {
+                        $($field_id => {
+                            let val = $crate::__thrift_read_field!(prot, 
field_ident, $field_type $($field_lt)? $($element_type)?);
+                            $field_name = Some(val);
+                        })*
+                        _ => {
+                            prot.skip(field_ident.field_type)?;
+                        }
+                    };
+                    last_field_id = field_ident.id;
+                }
+                
$($crate::__thrift_result_required_or_optional!($required_or_optional 
$field_name);)*
+                Ok(Self {
+                    $($field_name),*
+                })
+            }
+        }
+    }
+}
+
+/// only implements WriteThrift for the give IDL struct definition
+#[macro_export]
+macro_rules! thrift_struct_write_impl {
+    ($(#[$($def_attrs:tt)*])* $vis:vis struct $identifier:ident $(< 
$lt:lifetime >)? { $($(#[$($field_attrs:tt)*])* $field_id:literal : 
$required_or_optional:ident $field_type:ident $(< $field_lt:lifetime >)? $(< 
$element_type:ident >)? $field_name:ident $(= $default_value:literal)? $(;)?)* 
}) => {
+        impl $(<$lt>)? WriteThrift for $identifier $(<$lt>)? {
+            const ELEMENT_TYPE: ElementType = ElementType::Struct;
+
+            #[allow(unused_assignments)]
+            fn write_thrift<W: Write>(&self, writer: &mut 
ThriftCompactOutputProtocol<W>) -> Result<()> {
+                #[allow(unused_mut, unused_variables)]
+                let mut last_field_id = 0i16;
+                
$($crate::__thrift_write_required_or_optional_field!($required_or_optional 
$field_name, $field_id, $field_type, self, writer, last_field_id);)*
+                writer.write_struct_end()
+            }
+        }
+
+        impl $(<$lt>)? WriteThriftField for $identifier $(<$lt>)? {
+            fn write_thrift_field<W: Write>(&self, writer: &mut 
ThriftCompactOutputProtocol<W>, field_id: i16, last_field_id: i16) -> 
Result<i16> {
+                writer.write_field_begin(FieldType::Struct, field_id, 
last_field_id)?;
+                self.write_thrift(writer)?;
+                Ok(field_id)
+            }
+        }
+    }
+}
+
 #[doc(hidden)]
 #[macro_export]
 macro_rules! __thrift_write_required_or_optional_field {
@@ -391,15 +451,19 @@ macro_rules! __thrift_required_or_optional {
     (optional $field_type:ty) => { Option<$field_type> };
 }
 
+// Performance note: using `expect` here is about 4% faster on the page index 
bench,
+// but we want to propagate errors. Using `ok_or` is *much* slower.
 #[doc(hidden)]
 #[macro_export]
 macro_rules! __thrift_result_required_or_optional {
     (required $field_name:ident) => {
-        let $field_name = $field_name.expect(concat!(
-            "Required field ",
-            stringify!($field_name),
-            " is missing",
-        ));
+        let Some($field_name) = $field_name else {
+            return Err(general_err!(concat!(
+                "Required field ",
+                stringify!($field_name),
+                " is missing",
+            )));
+        };
     };
     (optional $field_name:ident) => {};
 }
@@ -433,7 +497,7 @@ macro_rules! __thrift_read_field {
     };
     ($prot:tt, $field_ident:tt, binary) => {
         // this one needs to not conflict with `list<i8>`
-        $prot.read_bytes()?.to_vec()
+        $prot.read_bytes_owned()?
     };
     ($prot:tt, $field_ident:tt, double) => {
         $crate::parquet_thrift::OrderedF64::read_thrift(&mut *$prot)?
diff --git a/parquet/src/parquet_thrift.rs b/parquet/src/parquet_thrift.rs
index 17847d0b71..5d549f012c 100644
--- a/parquet/src/parquet_thrift.rs
+++ b/parquet/src/parquet_thrift.rs
@@ -20,7 +20,10 @@
 // to not allocate byte arrays or strings.
 #![allow(dead_code)]
 
-use std::{cmp::Ordering, io::Write};
+use std::{
+    cmp::Ordering,
+    io::{Read, Write},
+};
 
 use crate::errors::{ParquetError, Result};
 
@@ -197,6 +200,8 @@ pub(crate) trait ThriftCompactInputProtocol<'a> {
     /// [binary]: 
https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#binary-encoding
     fn read_bytes(&mut self) -> Result<&'a [u8]>;
 
+    fn read_bytes_owned(&mut self) -> Result<Vec<u8>>;
+
     /// Skip the next `n` bytes of input.
     fn skip_bytes(&mut self, n: usize) -> Result<()>;
 
@@ -459,6 +464,10 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'b> for 
ThriftSliceInputProtocol<'a>
         Ok(ret)
     }
 
+    fn read_bytes_owned(&mut self) -> Result<Vec<u8>> {
+        Ok(self.read_bytes()?.to_vec())
+    }
+
     #[inline]
     fn skip_bytes(&mut self, n: usize) -> Result<()> {
         self.buf.get(..n).ok_or_else(eof_error)?;
@@ -480,6 +489,54 @@ fn eof_error() -> ParquetError {
     eof_err!("Unexpected EOF")
 }
 
+/// A Thrift input protocol that wraps a [`Read`] object.
+///
+/// Note that this is only intended for use in reading Parquet page headers. 
This will panic
+/// if Thrift `binary` data is encountered because a slice of that data cannot 
be returned.
+pub(crate) struct ThriftReadInputProtocol<R: Read> {
+    reader: R,
+}
+
+impl<R: Read> ThriftReadInputProtocol<R> {
+    pub(crate) fn new(reader: R) -> Self {
+        Self { reader }
+    }
+}
+
+impl<'a, R: Read> ThriftCompactInputProtocol<'a> for 
ThriftReadInputProtocol<R> {
+    #[inline]
+    fn read_byte(&mut self) -> Result<u8> {
+        let mut buf = [0_u8; 1];
+        self.reader.read_exact(&mut buf)?;
+        Ok(buf[0])
+    }
+
+    fn read_bytes(&mut self) -> Result<&'a [u8]> {
+        unimplemented!()
+    }
+
+    fn read_bytes_owned(&mut self) -> Result<Vec<u8>> {
+        let len = self.read_vlq()? as usize;
+        let mut v = Vec::with_capacity(len);
+        std::io::copy(&mut self.reader.by_ref().take(len as u64), &mut v)?;
+        Ok(v)
+    }
+
+    fn skip_bytes(&mut self, n: usize) -> Result<()> {
+        std::io::copy(
+            &mut self.reader.by_ref().take(n as u64),
+            &mut std::io::sink(),
+        )?;
+        Ok(())
+    }
+
+    fn read_double(&mut self) -> Result<f64> {
+        let mut buf = [0_u8; 8];
+        self.reader.read_exact(&mut buf)?;
+        Ok(f64::from_le_bytes(buf))
+    }
+}
+
 /// Trait implemented for objects that can be deserialized from a Thrift input 
stream.
 /// Implementations are provided for Thrift primitive types.
 pub(crate) trait ReadThrift<'a, R: ThriftCompactInputProtocol<'a>> {
@@ -533,7 +590,7 @@ impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, 
R> for &'a str {
 
 impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for String {
     fn read_thrift(prot: &mut R) -> Result<Self> {
-        Ok(prot.read_string()?.to_owned())
+        Ok(String::from_utf8(prot.read_bytes_owned()?)?)
     }
 }
 
diff --git a/parquet/src/thrift.rs b/parquet/src/thrift.rs
index 580ac2d7db..2492910a31 100644
--- a/parquet/src/thrift.rs
+++ b/parquet/src/thrift.rs
@@ -36,15 +36,22 @@ pub trait TSerializable: Sized {
 // Public function to aid benchmarking. Reads Parquet `FileMetaData` encoded 
in `bytes`.
 #[doc(hidden)]
 pub fn bench_file_metadata(bytes: &bytes::Bytes) {
-    let mut input = TCompactSliceInputProtocol::new(bytes);
-    crate::format::FileMetaData::read_from_in_protocol(&mut input).unwrap();
+    crate::file::metadata::thrift_gen::bench_file_metadata(bytes);
 }
 
 // Public function to aid benchmarking. Reads Parquet `PageHeader` encoded in 
`bytes`.
 #[doc(hidden)]
 pub fn bench_page_header(bytes: &bytes::Bytes) {
-    let mut prot = TCompactSliceInputProtocol::new(bytes);
-    crate::format::PageHeader::read_from_in_protocol(&mut prot).unwrap();
+    use crate::parquet_thrift::ReadThrift;
+    let mut prot = 
crate::parquet_thrift::ThriftReadInputProtocol::new(bytes.as_ref());
+    crate::file::metadata::thrift_gen::PageHeader::read_thrift(&mut 
prot).unwrap();
+}
+
+// Public function to aid benchmarking. Reads Parquet `PageHeader` encoded in 
`bytes`.
+#[doc(hidden)]
+pub fn bench_page_header_no_stats(bytes: &bytes::Bytes) {
+    let mut prot = 
crate::parquet_thrift::ThriftReadInputProtocol::new(bytes.as_ref());
+    
crate::file::metadata::thrift_gen::PageHeader::read_thrift_without_stats(&mut 
prot).unwrap();
 }
 
 /// A more performant implementation of [`TCompactInputProtocol`] that reads a 
slice
diff --git a/parquet/tests/arrow_reader/bad_data.rs 
b/parquet/tests/arrow_reader/bad_data.rs
index ecf449a7ce..be401030e7 100644
--- a/parquet/tests/arrow_reader/bad_data.rs
+++ b/parquet/tests/arrow_reader/bad_data.rs
@@ -101,7 +101,7 @@ fn test_arrow_gh_41317() {
     let err = read_file("ARROW-GH-41317.parquet").unwrap_err();
     assert_eq!(
         err.to_string(),
-        "External: Parquet argument error: External: bad data"
+        "External: Parquet argument error: Parquet error: StructArrayReader 
out of sync in read_records, expected 5 read, got 2"
     );
 }
 
diff --git a/parquet/tests/encryption/encryption_agnostic.rs 
b/parquet/tests/encryption/encryption_agnostic.rs
index e071471712..48b5c77d9b 100644
--- a/parquet/tests/encryption/encryption_agnostic.rs
+++ b/parquet/tests/encryption/encryption_agnostic.rs
@@ -72,7 +72,7 @@ pub fn 
read_plaintext_footer_file_without_decryption_properties() {
 
     match record_reader.next() {
         Some(Err(ArrowError::ParquetError(s))) => {
-            assert!(s.contains("protocol error"));
+            assert!(s.contains("Parquet error"));
         }
         _ => {
             panic!("Expected ArrowError::ParquetError");
@@ -137,7 +137,7 @@ pub async fn 
read_plaintext_footer_file_without_decryption_properties_async() {
 
     match record_reader.next().await {
         Some(Err(ParquetError::ArrowError(s))) => {
-            assert!(s.contains("protocol error"));
+            assert!(s.contains("Parquet error"));
         }
         _ => {
             panic!("Expected ArrowError::ParquetError");

Reply via email to