This is an automated email from the ASF dual-hosted git repository.
etseidl pushed a commit to branch gh5854_thrift_remodel
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/gh5854_thrift_remodel by this
push:
new 3dbd42e516 [thrift-remodel] Use new Thrift encoder/decoder for Parquet
page headers (#8376)
3dbd42e516 is described below
commit 3dbd42e516ffea475b16033c05d9492a6d2d849e
Author: Ed Seidl <[email protected]>
AuthorDate: Tue Sep 23 11:22:07 2025 -0700
[thrift-remodel] Use new Thrift encoder/decoder for Parquet page headers
(#8376)
# Which issue does this PR close?
**Note: this targets a feature branch, not main**
- Part of #5854.
# Rationale for this change
This continues the remodel, moving on to `PageHeader` support.
# What changes are included in this PR?
Swaps out old `format` page header structs for new ones. This also adds
a `Read` based implementation of the thrift compact protocol reader (the
sizes of the Thrift encoded page headers are not knowable in advance, so
we need a way to read them from the thrift input stream used by the page
decoder).
This PR also makes decoding of the `Statistics` in the page header
optional (defaults to `false`). We do not use them, and the decode takes
a good chunk of time.
# Are these changes tested?
These changes should be covered by existing tests
# Are there any user-facing changes?
Yes, page level stats are no longer decoded by default
---
parquet/benches/metadata.rs | 20 ++
parquet/src/arrow/arrow_writer/mod.rs | 27 +-
parquet/src/column/page.rs | 40 ++-
parquet/src/column/page_encryption.rs | 8 +-
parquet/src/column/page_encryption_disabled.rs | 2 +-
parquet/src/column/writer/mod.rs | 1 +
parquet/src/encryption/encrypt.rs | 28 ++
parquet/src/errors.rs | 8 +
parquet/src/file/metadata/thrift_gen.rs | 443 ++++++++++++++++++------
parquet/src/file/page_encoding_stats.rs | 2 +-
parquet/src/file/properties.rs | 24 ++
parquet/src/file/serialized_reader.rs | 67 ++--
parquet/src/file/statistics.rs | 201 +++++++++++
parquet/src/file/writer.rs | 11 +-
parquet/src/parquet_macros.rs | 76 +++-
parquet/src/parquet_thrift.rs | 61 +++-
parquet/src/thrift.rs | 15 +-
parquet/tests/arrow_reader/bad_data.rs | 2 +-
parquet/tests/encryption/encryption_agnostic.rs | 4 +-
19 files changed, 861 insertions(+), 179 deletions(-)
diff --git a/parquet/benches/metadata.rs b/parquet/benches/metadata.rs
index 8c886e4d5e..ced0175da8 100644
--- a/parquet/benches/metadata.rs
+++ b/parquet/benches/metadata.rs
@@ -258,6 +258,26 @@ fn criterion_benchmark(c: &mut Criterion) {
});
})
});
+
+ #[cfg(feature = "arrow")]
+ c.bench_function("page headers (no stats)", |b| {
+ b.iter(|| {
+ metadata.row_groups.iter().for_each(|rg| {
+ rg.columns.iter().for_each(|col| {
+ if let Some(col_meta) = &col.meta_data {
+ if let Some(dict_offset) =
col_meta.dictionary_page_offset {
+ parquet::thrift::bench_page_header_no_stats(
+ &file_bytes.slice(dict_offset as usize..),
+ );
+ }
+ parquet::thrift::bench_page_header_no_stats(
+ &file_bytes.slice(col_meta.data_page_offset as
usize..),
+ );
+ }
+ });
+ });
+ })
+ });
}
criterion_group!(benches, criterion_benchmark);
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 5ccab5b24b..e863b74a95 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -23,7 +23,6 @@ use std::iter::Peekable;
use std::slice::Iter;
use std::sync::{Arc, Mutex};
use std::vec::IntoIter;
-use thrift::protocol::TCompactOutputProtocol;
use arrow_array::cast::AsArray;
use arrow_array::types::*;
@@ -48,8 +47,8 @@ use crate::file::metadata::{KeyValue, RowGroupMetaData};
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
use crate::file::reader::{ChunkReader, Length};
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
+use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
-use crate::thrift::TSerializable;
use levels::{calculate_array_levels, ArrayLevels};
mod byte_array;
@@ -583,8 +582,8 @@ impl PageWriter for ArrowPageWriter {
}
}
None => {
- let mut protocol = TCompactOutputProtocol::new(&mut
header);
- page_header.write_to_out_protocol(&mut protocol)?;
+ let mut protocol = ThriftCompactOutputProtocol::new(&mut
header);
+ page_header.write_thrift(&mut protocol)?;
}
};
@@ -1487,12 +1486,12 @@ mod tests {
use crate::arrow::arrow_reader::{ParquetRecordBatchReader,
ParquetRecordBatchReaderBuilder};
use crate::arrow::ARROW_SCHEMA_META_KEY;
use crate::column::page::{Page, PageReader};
+ use crate::file::metadata::thrift_gen::PageHeader;
use crate::file::page_encoding_stats::PageEncodingStats;
use crate::file::page_index::column_index::ColumnIndexMetaData;
use crate::file::reader::SerializedPageReader;
- use crate::format::PageHeader;
+ use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
use crate::schema::types::ColumnPath;
- use crate::thrift::TCompactSliceInputProtocol;
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Schema};
use arrow::error::Result as ArrowResult;
@@ -4191,8 +4190,8 @@ mod tests {
// decode first page header
let first_page = &buf[4..];
- let mut prot = TCompactSliceInputProtocol::new(first_page);
- let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
+ let mut prot = ThriftSliceInputProtocol::new(first_page);
+ let hdr = PageHeader::read_thrift(&mut prot).unwrap();
let stats = hdr.data_page_header.unwrap().statistics;
assert!(stats.is_none());
@@ -4225,8 +4224,8 @@ mod tests {
// decode first page header
let first_page = &buf[4..];
- let mut prot = TCompactSliceInputProtocol::new(first_page);
- let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
+ let mut prot = ThriftSliceInputProtocol::new(first_page);
+ let hdr = PageHeader::read_thrift(&mut prot).unwrap();
let stats = hdr.data_page_header.unwrap().statistics;
let stats = stats.unwrap();
@@ -4277,8 +4276,8 @@ mod tests {
// decode first page header
let first_page = &buf[4..];
- let mut prot = TCompactSliceInputProtocol::new(first_page);
- let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
+ let mut prot = ThriftSliceInputProtocol::new(first_page);
+ let hdr = PageHeader::read_thrift(&mut prot).unwrap();
let stats = hdr.data_page_header.unwrap().statistics;
assert!(stats.is_some());
let stats = stats.unwrap();
@@ -4290,8 +4289,8 @@ mod tests {
// check second page now
let second_page = &prot.as_slice()[hdr.compressed_page_size as
usize..];
- let mut prot = TCompactSliceInputProtocol::new(second_page);
- let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
+ let mut prot = ThriftSliceInputProtocol::new(second_page);
+ let hdr = PageHeader::read_thrift(&mut prot).unwrap();
let stats = hdr.data_page_header.unwrap().statistics;
assert!(stats.is_some());
let stats = stats.unwrap();
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index a2f683d71f..23bf4548fb 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -21,8 +21,10 @@ use bytes::Bytes;
use crate::basic::{Encoding, PageType};
use crate::errors::{ParquetError, Result};
-use crate::file::statistics::Statistics;
-use crate::format::PageHeader;
+use crate::file::metadata::thrift_gen::{
+ DataPageHeader, DataPageHeaderV2, DictionaryPageHeader, PageHeader,
+};
+use crate::file::statistics::{page_stats_to_thrift, Statistics};
/// Parquet Page definition.
///
@@ -216,7 +218,7 @@ impl CompressedPage {
let page_type = self.page_type();
let mut page_header = PageHeader {
- type_: page_type.into(),
+ type_: page_type,
uncompressed_page_size: uncompressed_size as i32,
compressed_page_size: compressed_size as i32,
// TODO: Add support for crc checksum
@@ -234,12 +236,12 @@ impl CompressedPage {
ref statistics,
..
} => {
- let data_page_header = crate::format::DataPageHeader {
+ let data_page_header = DataPageHeader {
num_values: num_values as i32,
- encoding: encoding.into(),
- definition_level_encoding: def_level_encoding.into(),
- repetition_level_encoding: rep_level_encoding.into(),
- statistics:
crate::file::statistics::to_thrift(statistics.as_ref()),
+ encoding,
+ definition_level_encoding: def_level_encoding,
+ repetition_level_encoding: rep_level_encoding,
+ statistics: page_stats_to_thrift(statistics.as_ref()),
};
page_header.data_page_header = Some(data_page_header);
}
@@ -252,22 +254,22 @@ impl CompressedPage {
ref statistics,
..
} => {
- let data_page_header_v2 = crate::format::DataPageHeaderV2 {
+ let data_page_header_v2 = DataPageHeaderV2 {
num_values: num_values as i32,
num_nulls: num_nulls as i32,
num_rows: num_rows as i32,
- encoding: encoding.into(),
+ encoding,
definition_levels_byte_length: def_levels_byte_len as i32,
repetition_levels_byte_length: rep_levels_byte_len as i32,
is_compressed: Some(is_compressed),
- statistics:
crate::file::statistics::to_thrift(statistics.as_ref()),
+ statistics: page_stats_to_thrift(statistics.as_ref()),
};
page_header.data_page_header_v2 = Some(data_page_header_v2);
}
Page::DictionaryPage { is_sorted, .. } => {
- let dictionary_page_header =
crate::format::DictionaryPageHeader {
+ let dictionary_page_header = DictionaryPageHeader {
num_values: num_values as i32,
- encoding: encoding.into(),
+ encoding,
is_sorted: Some(is_sorted),
};
page_header.dictionary_page_header =
Some(dictionary_page_header);
@@ -343,12 +345,14 @@ pub struct PageMetadata {
pub is_dict: bool,
}
-impl TryFrom<&PageHeader> for PageMetadata {
+impl TryFrom<&crate::file::metadata::thrift_gen::PageHeader> for PageMetadata {
type Error = ParquetError;
- fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
+ fn try_from(
+ value: &crate::file::metadata::thrift_gen::PageHeader,
+ ) -> std::result::Result<Self, Self::Error> {
match value.type_ {
- crate::format::PageType::DATA_PAGE => {
+ PageType::DATA_PAGE => {
let header = value.data_page_header.as_ref().unwrap();
Ok(PageMetadata {
num_rows: None,
@@ -356,12 +360,12 @@ impl TryFrom<&PageHeader> for PageMetadata {
is_dict: false,
})
}
- crate::format::PageType::DICTIONARY_PAGE => Ok(PageMetadata {
+ PageType::DICTIONARY_PAGE => Ok(PageMetadata {
num_rows: None,
num_levels: None,
is_dict: true,
}),
- crate::format::PageType::DATA_PAGE_V2 => {
+ PageType::DATA_PAGE_V2 => {
let header = value.data_page_header_v2.as_ref().unwrap();
Ok(PageMetadata {
num_rows: Some(header.num_rows as _),
diff --git a/parquet/src/column/page_encryption.rs
b/parquet/src/column/page_encryption.rs
index 0fb7c89426..7ee367a289 100644
--- a/parquet/src/column/page_encryption.rs
+++ b/parquet/src/column/page_encryption.rs
@@ -15,14 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+use crate::basic::PageType;
use crate::column::page::CompressedPage;
use crate::encryption::ciphers::BlockEncryptor;
-use crate::encryption::encrypt::{encrypt_object, FileEncryptor};
+use crate::encryption::encrypt::{encrypt_thrift_object, FileEncryptor};
use crate::encryption::modules::{create_module_aad, ModuleType};
use crate::errors::ParquetError;
use crate::errors::Result;
-use crate::format::PageHeader;
-use crate::format::PageType;
+use crate::file::metadata::thrift_gen::PageHeader;
use bytes::Bytes;
use std::io::Write;
use std::sync::Arc;
@@ -114,6 +114,6 @@ impl PageEncryptor {
Some(self.page_index),
)?;
- encrypt_object(page_header, &mut self.block_encryptor, sink, &aad)
+ encrypt_thrift_object(page_header, &mut self.block_encryptor, sink,
&aad)
}
}
diff --git a/parquet/src/column/page_encryption_disabled.rs
b/parquet/src/column/page_encryption_disabled.rs
index e85b028116..347024f7f2 100644
--- a/parquet/src/column/page_encryption_disabled.rs
+++ b/parquet/src/column/page_encryption_disabled.rs
@@ -17,7 +17,7 @@
use crate::column::page::CompressedPage;
use crate::errors::Result;
-use crate::format::PageHeader;
+use crate::file::metadata::thrift_gen::PageHeader;
use std::io::Write;
#[derive(Debug)]
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index e5a9139fb7..059287011b 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -2264,6 +2264,7 @@ mod tests {
let props = ReaderProperties::builder()
.set_backward_compatible_lz4(false)
+ .set_read_page_statistics(true)
.build();
let reader = SerializedPageReader::new_with_properties(
Arc::new(Bytes::from(buf)),
diff --git a/parquet/src/encryption/encrypt.rs
b/parquet/src/encryption/encrypt.rs
index 1a241bf7b1..9789302169 100644
--- a/parquet/src/encryption/encrypt.rs
+++ b/parquet/src/encryption/encrypt.rs
@@ -22,6 +22,7 @@ use crate::encryption::ciphers::{
};
use crate::errors::{ParquetError, Result};
use crate::file::column_crypto_metadata::{ColumnCryptoMetaData,
EncryptionWithColumnKey};
+use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
use crate::thrift::TSerializable;
use ring::rand::{SecureRandom, SystemRandom};
@@ -376,6 +377,18 @@ pub(crate) fn encrypt_object<T: TSerializable, W: Write>(
Ok(())
}
+/// Write an encrypted Thrift serializable object
+pub(crate) fn encrypt_thrift_object<T: WriteThrift, W: Write>(
+ object: &T,
+ encryptor: &mut Box<dyn BlockEncryptor>,
+ sink: &mut W,
+ module_aad: &[u8],
+) -> Result<()> {
+ let encrypted_buffer = encrypt_thrift_object_to_vec(object, encryptor,
module_aad)?;
+ sink.write_all(&encrypted_buffer)?;
+ Ok(())
+}
+
pub(crate) fn write_signed_plaintext_object<T: TSerializable, W: Write>(
object: &T,
encryptor: &mut Box<dyn BlockEncryptor>,
@@ -414,6 +427,21 @@ pub(crate) fn encrypt_object_to_vec<T: TSerializable>(
encryptor.encrypt(buffer.as_ref(), module_aad)
}
+/// Encrypt a Thrift serializable object to a byte vector
+pub(crate) fn encrypt_thrift_object_to_vec<T: WriteThrift>(
+ object: &T,
+ encryptor: &mut Box<dyn BlockEncryptor>,
+ module_aad: &[u8],
+) -> Result<Vec<u8>> {
+ let mut buffer: Vec<u8> = vec![];
+ {
+ let mut unencrypted_protocol = ThriftCompactOutputProtocol::new(&mut
buffer);
+ object.write_thrift(&mut unencrypted_protocol)?;
+ }
+
+ encryptor.encrypt(buffer.as_ref(), module_aad)
+}
+
/// Get the crypto metadata for a column from the file encryption properties
pub(crate) fn get_column_crypto_metadata(
properties: &FileEncryptionProperties,
diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs
index be08245e95..dab444a28f 100644
--- a/parquet/src/errors.rs
+++ b/parquet/src/errors.rs
@@ -19,6 +19,7 @@
use core::num::TryFromIntError;
use std::error::Error;
+use std::string::FromUtf8Error;
use std::{cell, io, result, str};
#[cfg(feature = "arrow")]
@@ -124,6 +125,13 @@ impl From<str::Utf8Error> for ParquetError {
ParquetError::External(Box::new(e))
}
}
+
+impl From<FromUtf8Error> for ParquetError {
+ fn from(e: FromUtf8Error) -> ParquetError {
+ ParquetError::External(Box::new(e))
+ }
+}
+
#[cfg(feature = "arrow")]
impl From<ArrowError> for ParquetError {
fn from(e: ArrowError) -> ParquetError {
diff --git a/parquet/src/file/metadata/thrift_gen.rs
b/parquet/src/file/metadata/thrift_gen.rs
index b656bacc8c..7515a70a63 100644
--- a/parquet/src/file/metadata/thrift_gen.rs
+++ b/parquet/src/file/metadata/thrift_gen.rs
@@ -67,103 +67,6 @@ pub(crate) struct SchemaElement<'a> {
}
);
-thrift_struct!(
-pub(crate) struct DataPageHeader {
- /// Number of values, including NULLs, in this data page.
- ///
- /// If a OffsetIndex is present, a page must begin at a row
- /// boundary (repetition_level = 0). Otherwise, pages may begin
- /// within a row (repetition_level > 0).
- 1: required i32 num_values
-
- /// Encoding used for this data page
- 2: required Encoding encoding
-
- /// Encoding used for definition levels
- 3: required Encoding definition_level_encoding;
-
- /// Encoding used for repetition levels
- 4: required Encoding repetition_level_encoding;
-
- // Optional statistics for the data in this page
- // page stats are pretty useless...lets ignore them
- //5: optional Statistics statistics;
-}
-);
-
-thrift_struct!(
- pub(crate) struct IndexPageHeader {}
-);
-
-thrift_struct!(
-pub(crate) struct DictionaryPageHeader {
- /// Number of values in the dictionary
- 1: required i32 num_values;
-
- /// Encoding using this dictionary page
- 2: required Encoding encoding
-
- /// If true, the entries in the dictionary are sorted in ascending order
- 3: optional bool is_sorted;
-}
-);
-
-thrift_struct!(
-pub(crate) struct DataPageHeaderV2 {
- /// Number of values, including NULLs, in this data page.
- 1: required i32 num_values
- /// Number of NULL values, in this data page.
- /// Number of non-null = num_values - num_nulls which is also the number of
values in the data section
- 2: required i32 num_nulls
- /// Number of rows in this data page. Every page must begin at a
- /// row boundary (repetition_level = 0): rows must **not** be
- /// split across page boundaries when using V2 data pages.
- 3: required i32 num_rows
- /// Encoding used for data in this page
- 4: required Encoding encoding
-
- // repetition levels and definition levels are always using RLE (without
size in it)
-
- /// Length of the definition levels
- 5: required i32 definition_levels_byte_length;
- /// Length of the repetition levels
- 6: required i32 repetition_levels_byte_length;
-
- /// Whether the values are compressed.
- /// Which means the section of the page between
- /// definition_levels_byte_length + repetition_levels_byte_length + 1 and
compressed_page_size (included)
- /// is compressed with the compression_codec.
- /// If missing it is considered compressed
- 7: optional bool is_compressed = true;
-
- // Optional statistics for the data in this page
- //8: optional Statistics statistics;
-}
-);
-
-thrift_struct!(
-#[allow(dead_code)]
-pub(crate) struct PageHeader {
- /// the type of the page: indicates which of the *_header fields is set
- 1: required PageType type_
-
- /// Uncompressed page size in bytes (not including this header)
- 2: required i32 uncompressed_page_size
-
- /// Compressed (and potentially encrypted) page size in bytes, not including
this header
- 3: required i32 compressed_page_size
-
- /// The 32-bit CRC checksum for the page, to be be calculated as follows:
- 4: optional i32 crc
-
- // Headers for page specific data. One only will be set.
- 5: optional DataPageHeader data_page_header;
- 6: optional IndexPageHeader index_page_header;
- 7: optional DictionaryPageHeader dictionary_page_header;
- 8: optional DataPageHeaderV2 data_page_header_v2;
-}
-);
-
thrift_struct!(
pub(crate) struct AesGcmV1<'a> {
/// AAD prefix
@@ -214,6 +117,13 @@ pub(crate) struct FileCryptoMetaData<'a> {
}
);
+// expose for benchmarking
+pub(crate) fn bench_file_metadata(bytes: &bytes::Bytes) {
+ use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
+ let mut prot = ThriftSliceInputProtocol::new(bytes);
+ crate::file::metadata::thrift_gen::FileMetaData::read_thrift(&mut
prot).unwrap();
+}
+
// the following are only used internally so are private
thrift_struct!(
struct FileMetaData<'a> {
@@ -909,6 +819,345 @@ impl<'a, R: ThriftCompactInputProtocol<'a>>
ReadThrift<'a, R> for ParquetMetaDat
}
}
+thrift_struct!(
+ pub(crate) struct IndexPageHeader {}
+);
+
+thrift_struct!(
+pub(crate) struct DictionaryPageHeader {
+ /// Number of values in the dictionary
+ 1: required i32 num_values;
+
+ /// Encoding using this dictionary page
+ 2: required Encoding encoding
+
+ /// If true, the entries in the dictionary are sorted in ascending order
+ 3: optional bool is_sorted;
+}
+);
+
+thrift_struct!(
+/// Statistics for the page header.
+///
+/// This is a duplicate of the [`Statistics`] struct above. Because the page
reader uses
+/// the [`Read`] API, we cannot read the min/max values as slices. This should
not be
+/// a huge problem since this crate no longer reads the page header statistics
by default.
+///
+/// [`Read`]: crate::parquet_thrift::ThriftReadInputProtocol
+pub(crate) struct PageStatistics {
+ 1: optional binary max;
+ 2: optional binary min;
+ 3: optional i64 null_count;
+ 4: optional i64 distinct_count;
+ 5: optional binary max_value;
+ 6: optional binary min_value;
+ 7: optional bool is_max_value_exact;
+ 8: optional bool is_min_value_exact;
+}
+);
+
+thrift_struct!(
+pub(crate) struct DataPageHeader {
+ 1: required i32 num_values
+ 2: required Encoding encoding
+ 3: required Encoding definition_level_encoding;
+ 4: required Encoding repetition_level_encoding;
+ 5: optional PageStatistics statistics;
+}
+);
+
+impl DataPageHeader {
+ // reader that skips decoding page statistics
+ fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
+ where
+ R: ThriftCompactInputProtocol<'a>,
+ {
+ let mut num_values: Option<i32> = None;
+ let mut encoding: Option<Encoding> = None;
+ let mut definition_level_encoding: Option<Encoding> = None;
+ let mut repetition_level_encoding: Option<Encoding> = None;
+ let statistics: Option<PageStatistics> = None;
+ let mut last_field_id = 0i16;
+ loop {
+ let field_ident = prot.read_field_begin(last_field_id)?;
+ if field_ident.field_type == FieldType::Stop {
+ break;
+ }
+ match field_ident.id {
+ 1 => {
+ let val = i32::read_thrift(&mut *prot)?;
+ num_values = Some(val);
+ }
+ 2 => {
+ let val = Encoding::read_thrift(&mut *prot)?;
+ encoding = Some(val);
+ }
+ 3 => {
+ let val = Encoding::read_thrift(&mut *prot)?;
+ definition_level_encoding = Some(val);
+ }
+ 4 => {
+ let val = Encoding::read_thrift(&mut *prot)?;
+ repetition_level_encoding = Some(val);
+ }
+ _ => {
+ prot.skip(field_ident.field_type)?;
+ }
+ };
+ last_field_id = field_ident.id;
+ }
+ let Some(num_values) = num_values else {
+ return Err(ParquetError::General(
+ "Required field num_values is missing".to_owned(),
+ ));
+ };
+ let Some(encoding) = encoding else {
+ return Err(ParquetError::General(
+ "Required field encoding is missing".to_owned(),
+ ));
+ };
+ let Some(definition_level_encoding) = definition_level_encoding else {
+ return Err(ParquetError::General(
+ "Required field definition_level_encoding is
missing".to_owned(),
+ ));
+ };
+ let Some(repetition_level_encoding) = repetition_level_encoding else {
+ return Err(ParquetError::General(
+ "Required field repetition_level_encoding is
missing".to_owned(),
+ ));
+ };
+ Ok(Self {
+ num_values,
+ encoding,
+ definition_level_encoding,
+ repetition_level_encoding,
+ statistics,
+ })
+ }
+}
+
+thrift_struct!(
+pub(crate) struct DataPageHeaderV2 {
+ 1: required i32 num_values
+ 2: required i32 num_nulls
+ 3: required i32 num_rows
+ 4: required Encoding encoding
+ 5: required i32 definition_levels_byte_length;
+ 6: required i32 repetition_levels_byte_length;
+ 7: optional bool is_compressed = true;
+ 8: optional PageStatistics statistics;
+}
+);
+
+impl DataPageHeaderV2 {
+ // reader that skips decoding page statistics
+ fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
+ where
+ R: ThriftCompactInputProtocol<'a>,
+ {
+ let mut num_values: Option<i32> = None;
+ let mut num_nulls: Option<i32> = None;
+ let mut num_rows: Option<i32> = None;
+ let mut encoding: Option<Encoding> = None;
+ let mut definition_levels_byte_length: Option<i32> = None;
+ let mut repetition_levels_byte_length: Option<i32> = None;
+ let mut is_compressed: Option<bool> = None;
+ let statistics: Option<PageStatistics> = None;
+ let mut last_field_id = 0i16;
+ loop {
+ let field_ident = prot.read_field_begin(last_field_id)?;
+ if field_ident.field_type == FieldType::Stop {
+ break;
+ }
+ match field_ident.id {
+ 1 => {
+ let val = i32::read_thrift(&mut *prot)?;
+ num_values = Some(val);
+ }
+ 2 => {
+ let val = i32::read_thrift(&mut *prot)?;
+ num_nulls = Some(val);
+ }
+ 3 => {
+ let val = i32::read_thrift(&mut *prot)?;
+ num_rows = Some(val);
+ }
+ 4 => {
+ let val = Encoding::read_thrift(&mut *prot)?;
+ encoding = Some(val);
+ }
+ 5 => {
+ let val = i32::read_thrift(&mut *prot)?;
+ definition_levels_byte_length = Some(val);
+ }
+ 6 => {
+ let val = i32::read_thrift(&mut *prot)?;
+ repetition_levels_byte_length = Some(val);
+ }
+ 7 => {
+ let val = field_ident.bool_val.unwrap();
+ is_compressed = Some(val);
+ }
+ _ => {
+ prot.skip(field_ident.field_type)?;
+ }
+ };
+ last_field_id = field_ident.id;
+ }
+ let Some(num_values) = num_values else {
+ return Err(ParquetError::General(
+ "Required field num_values is missing".to_owned(),
+ ));
+ };
+ let Some(num_nulls) = num_nulls else {
+ return Err(ParquetError::General(
+ "Required field num_nulls is missing".to_owned(),
+ ));
+ };
+ let Some(num_rows) = num_rows else {
+ return Err(ParquetError::General(
+ "Required field num_rows is missing".to_owned(),
+ ));
+ };
+ let Some(encoding) = encoding else {
+ return Err(ParquetError::General(
+ "Required field encoding is missing".to_owned(),
+ ));
+ };
+ let Some(definition_levels_byte_length) =
definition_levels_byte_length else {
+ return Err(ParquetError::General(
+ "Required field definition_levels_byte_length is
missing".to_owned(),
+ ));
+ };
+ let Some(repetition_levels_byte_length) =
repetition_levels_byte_length else {
+ return Err(ParquetError::General(
+ "Required field repetition_levels_byte_length is
missing".to_owned(),
+ ));
+ };
+ Ok(Self {
+ num_values,
+ num_nulls,
+ num_rows,
+ encoding,
+ definition_levels_byte_length,
+ repetition_levels_byte_length,
+ is_compressed,
+ statistics,
+ })
+ }
+}
+
+thrift_struct!(
+pub(crate) struct PageHeader {
+ /// the type of the page: indicates which of the *_header fields is set
+ 1: required PageType type_
+
+ /// Uncompressed page size in bytes (not including this header)
+ 2: required i32 uncompressed_page_size
+
+ /// Compressed (and potentially encrypted) page size in bytes, not including
this header
+ 3: required i32 compressed_page_size
+
+ /// The 32-bit CRC checksum for the page, to be be calculated as follows:
+ 4: optional i32 crc
+
+ // Headers for page specific data. One only will be set.
+ 5: optional DataPageHeader data_page_header;
+ 6: optional IndexPageHeader index_page_header;
+ 7: optional DictionaryPageHeader dictionary_page_header;
+ 8: optional DataPageHeaderV2 data_page_header_v2;
+}
+);
+
+impl PageHeader {
+ // reader that skips reading page statistics. obtained by running
+ // `cargo expand -p parquet --all-features --lib
file::metadata::thrift_gen`
+ // and modifying the impl of `read_thrift`
+ pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) ->
Result<Self>
+ where
+ R: ThriftCompactInputProtocol<'a>,
+ {
+ let mut type_: Option<PageType> = None;
+ let mut uncompressed_page_size: Option<i32> = None;
+ let mut compressed_page_size: Option<i32> = None;
+ let mut crc: Option<i32> = None;
+ let mut data_page_header: Option<DataPageHeader> = None;
+ let mut index_page_header: Option<IndexPageHeader> = None;
+ let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
+ let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
+ let mut last_field_id = 0i16;
+ loop {
+ let field_ident = prot.read_field_begin(last_field_id)?;
+ if field_ident.field_type == FieldType::Stop {
+ break;
+ }
+ match field_ident.id {
+ 1 => {
+ let val = PageType::read_thrift(&mut *prot)?;
+ type_ = Some(val);
+ }
+ 2 => {
+ let val = i32::read_thrift(&mut *prot)?;
+ uncompressed_page_size = Some(val);
+ }
+ 3 => {
+ let val = i32::read_thrift(&mut *prot)?;
+ compressed_page_size = Some(val);
+ }
+ 4 => {
+ let val = i32::read_thrift(&mut *prot)?;
+ crc = Some(val);
+ }
+ 5 => {
+ let val = DataPageHeader::read_thrift_without_stats(&mut
*prot)?;
+ data_page_header = Some(val);
+ }
+ 6 => {
+ let val = IndexPageHeader::read_thrift(&mut *prot)?;
+ index_page_header = Some(val);
+ }
+ 7 => {
+ let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
+ dictionary_page_header = Some(val);
+ }
+ 8 => {
+ let val = DataPageHeaderV2::read_thrift_without_stats(&mut
*prot)?;
+ data_page_header_v2 = Some(val);
+ }
+ _ => {
+ prot.skip(field_ident.field_type)?;
+ }
+ };
+ last_field_id = field_ident.id;
+ }
+ let Some(type_) = type_ else {
+ return Err(ParquetError::General(
+ "Required field type_ is missing".to_owned(),
+ ));
+ };
+ let Some(uncompressed_page_size) = uncompressed_page_size else {
+ return Err(ParquetError::General(
+ "Required field uncompressed_page_size is missing".to_owned(),
+ ));
+ };
+ let Some(compressed_page_size) = compressed_page_size else {
+ return Err(ParquetError::General(
+ "Required field compressed_page_size is missing".to_owned(),
+ ));
+ };
+ Ok(Self {
+ type_,
+ uncompressed_page_size,
+ compressed_page_size,
+ crc,
+ data_page_header,
+ index_page_header,
+ dictionary_page_header,
+ data_page_header_v2,
+ })
+ }
+}
+
#[cfg(test)]
mod tests {
use crate::file::metadata::thrift_gen::BoundingBox;
diff --git a/parquet/src/file/page_encoding_stats.rs
b/parquet/src/file/page_encoding_stats.rs
index 934e177de0..3f81353e28 100644
--- a/parquet/src/file/page_encoding_stats.rs
+++ b/parquet/src/file/page_encoding_stats.rs
@@ -20,7 +20,7 @@
use std::io::Write;
use crate::basic::{Encoding, PageType};
-use crate::errors::Result;
+use crate::errors::{ParquetError, Result};
use crate::parquet_thrift::{
ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol,
WriteThrift, WriteThriftField,
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index cb6b5167c8..b6003dc4d9 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -1191,6 +1191,7 @@ impl ColumnProperties {
pub type ReaderPropertiesPtr = Arc<ReaderProperties>;
const DEFAULT_READ_BLOOM_FILTER: bool = false;
+const DEFAULT_READ_PAGE_STATS: bool = false;
/// Configuration settings for reading parquet files.
///
@@ -1213,6 +1214,7 @@ const DEFAULT_READ_BLOOM_FILTER: bool = false;
pub struct ReaderProperties {
codec_options: CodecOptions,
read_bloom_filter: bool,
+ read_page_stats: bool,
}
impl ReaderProperties {
@@ -1230,6 +1232,11 @@ impl ReaderProperties {
pub(crate) fn read_bloom_filter(&self) -> bool {
self.read_bloom_filter
}
+
+ /// Returns whether to read page level statistics
+ pub(crate) fn read_page_stats(&self) -> bool {
+ self.read_page_stats
+ }
}
/// Builder for parquet file reader configuration. See example on
@@ -1237,6 +1244,7 @@ impl ReaderProperties {
pub struct ReaderPropertiesBuilder {
codec_options_builder: CodecOptionsBuilder,
read_bloom_filter: Option<bool>,
+ read_page_stats: Option<bool>,
}
/// Reader properties builder.
@@ -1246,6 +1254,7 @@ impl ReaderPropertiesBuilder {
Self {
codec_options_builder: CodecOptionsBuilder::default(),
read_bloom_filter: None,
+ read_page_stats: None,
}
}
@@ -1254,6 +1263,7 @@ impl ReaderPropertiesBuilder {
ReaderProperties {
codec_options: self.codec_options_builder.build(),
read_bloom_filter:
self.read_bloom_filter.unwrap_or(DEFAULT_READ_BLOOM_FILTER),
+ read_page_stats:
self.read_page_stats.unwrap_or(DEFAULT_READ_PAGE_STATS),
}
}
@@ -1282,6 +1292,20 @@ impl ReaderPropertiesBuilder {
self.read_bloom_filter = Some(value);
self
}
+
+ /// Enable/disable reading page-level statistics
+ ///
+ /// If set to `true`, then the reader will decode and populate the
[`Statistics`] for
+ /// each page, if present.
+ /// If set to `false`, then the reader will skip decoding the statistics.
+ ///
+ /// By default statistics will not be decoded.
+ ///
+ /// [`Statistics`]: crate::file::statistics::Statistics
+ pub fn set_read_page_statistics(mut self, value: bool) -> Self {
+ self.read_page_stats = Some(value);
+ self
+ }
}
#[cfg(test)]
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index 7285980453..1442f0f67c 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -18,31 +18,30 @@
//! Contains implementations of the reader traits FileReader, RowGroupReader
and PageReader
//! Also contains implementations of the ChunkReader for files (with
buffering) and byte arrays (RAM)
-use crate::basic::{Encoding, Type};
+use crate::basic::{PageType, Type};
use crate::bloom_filter::Sbbf;
use crate::column::page::{Page, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
#[cfg(feature = "encryption")]
use crate::encryption::decrypt::{read_and_decrypt, CryptoContext};
use crate::errors::{ParquetError, Result};
+use crate::file::metadata::thrift_gen::PageHeader;
use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
+use crate::file::statistics;
use crate::file::{
metadata::*,
properties::{ReaderProperties, ReaderPropertiesPtr},
reader::*,
- statistics,
};
-use crate::format::{PageHeader, PageType};
+#[cfg(feature = "encryption")]
+use crate::parquet_thrift::ThriftSliceInputProtocol;
+use crate::parquet_thrift::{ReadThrift, ThriftReadInputProtocol};
use crate::record::reader::RowIter;
use crate::record::Row;
use crate::schema::types::Type as SchemaType;
-#[cfg(feature = "encryption")]
-use crate::thrift::TCompactSliceInputProtocol;
-use crate::thrift::TSerializable;
use bytes::Bytes;
use std::collections::VecDeque;
use std::{fs::File, io::Read, path::Path, sync::Arc};
-use thrift::protocol::TCompactInputProtocol;
impl TryFrom<File> for SerializedFileReader<File> {
type Error = ParquetError;
@@ -423,7 +422,7 @@ pub(crate) fn decode_page(
Page::DictionaryPage {
buf: buffer,
num_values: dict_header.num_values.try_into()?,
- encoding: Encoding::try_from(dict_header.encoding)?,
+ encoding: dict_header.encoding,
is_sorted,
}
}
@@ -434,10 +433,10 @@ pub(crate) fn decode_page(
Page::DataPage {
buf: buffer,
num_values: header.num_values.try_into()?,
- encoding: Encoding::try_from(header.encoding)?,
- def_level_encoding:
Encoding::try_from(header.definition_level_encoding)?,
- rep_level_encoding:
Encoding::try_from(header.repetition_level_encoding)?,
- statistics: statistics::from_thrift(physical_type,
header.statistics)?,
+ encoding: header.encoding,
+ def_level_encoding: header.definition_level_encoding,
+ rep_level_encoding: header.repetition_level_encoding,
+ statistics: statistics::from_thrift_page_stats(physical_type,
header.statistics)?,
}
}
PageType::DATA_PAGE_V2 => {
@@ -448,13 +447,13 @@ pub(crate) fn decode_page(
Page::DataPageV2 {
buf: buffer,
num_values: header.num_values.try_into()?,
- encoding: Encoding::try_from(header.encoding)?,
+ encoding: header.encoding,
num_nulls: header.num_nulls.try_into()?,
num_rows: header.num_rows.try_into()?,
def_levels_byte_len:
header.definition_levels_byte_length.try_into()?,
rep_levels_byte_len:
header.repetition_levels_byte_length.try_into()?,
is_compressed,
- statistics: statistics::from_thrift(physical_type,
header.statistics)?,
+ statistics: statistics::from_thrift_page_stats(physical_type,
header.statistics)?,
}
}
_ => {
@@ -499,6 +498,8 @@ enum SerializedPageReaderState {
#[derive(Default)]
struct SerializedPageReaderContext {
+ /// Controls decoding of page-level statistics
+ read_stats: bool,
/// Crypto context carrying objects required for decryption
#[cfg(feature = "encryption")]
crypto_context: Option<Arc<CryptoContext>>,
@@ -610,12 +611,16 @@ impl<R: ChunkReader> SerializedPageReader<R> {
require_dictionary: meta.dictionary_page_offset().is_some(),
},
};
+ let mut context = SerializedPageReaderContext::default();
+ if props.read_page_stats() {
+ context.read_stats = true;
+ }
Ok(Self {
reader,
decompressor,
state,
physical_type: meta.column_type(),
- context: Default::default(),
+ context,
})
}
@@ -732,8 +737,12 @@ impl SerializedPageReaderContext {
_page_index: usize,
_dictionary_page: bool,
) -> Result<PageHeader> {
- let mut prot = TCompactInputProtocol::new(input);
- Ok(PageHeader::read_from_in_protocol(&mut prot)?)
+ let mut prot = ThriftReadInputProtocol::new(input);
+ if self.read_stats {
+ Ok(PageHeader::read_thrift(&mut prot)?)
+ } else {
+ Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
+ }
}
fn decrypt_page_data<T>(
@@ -756,8 +765,14 @@ impl SerializedPageReaderContext {
) -> Result<PageHeader> {
match self.page_crypto_context(page_index, dictionary_page) {
None => {
- let mut prot = TCompactInputProtocol::new(input);
- Ok(PageHeader::read_from_in_protocol(&mut prot)?)
+ let mut prot = ThriftReadInputProtocol::new(input);
+ if self.read_stats {
+ Ok(PageHeader::read_thrift(&mut prot)?)
+ } else {
+ use crate::file::metadata::thrift_gen::PageHeader;
+
+ Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
+ }
}
Some(page_crypto_context) => {
let data_decryptor = page_crypto_context.data_decryptor();
@@ -770,8 +785,12 @@ impl SerializedPageReaderContext {
))
})?;
- let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
- Ok(PageHeader::read_from_in_protocol(&mut prot)?)
+ let mut prot = ThriftSliceInputProtocol::new(buf.as_slice());
+ if self.read_stats {
+ Ok(PageHeader::read_thrift(&mut prot)?)
+ } else {
+ Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
+ }
}
}
}
@@ -1107,7 +1126,7 @@ mod tests {
};
use crate::file::properties::{EnabledStatistics, WriterProperties};
- use crate::basic::{self, BoundaryOrder, ColumnOrder, SortOrder};
+ use crate::basic::{self, BoundaryOrder, ColumnOrder, Encoding, SortOrder};
use crate::column::reader::ColumnReader;
use crate::data_type::private::ParquetValueType;
use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
@@ -1396,7 +1415,7 @@ mod tests {
assert_eq!(def_levels_byte_len, 2);
assert_eq!(rep_levels_byte_len, 0);
assert!(is_compressed);
- assert!(statistics.is_some());
+ assert!(statistics.is_none()); // page stats are no longer
read
true
}
_ => false,
@@ -1498,7 +1517,7 @@ mod tests {
assert_eq!(def_levels_byte_len, 2);
assert_eq!(rep_levels_byte_len, 0);
assert!(is_compressed);
- assert!(statistics.is_some());
+ assert!(statistics.is_none()); // page stats are no longer
read
true
}
_ => false,
diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs
index d4501830ac..e51f445b7e 100644
--- a/parquet/src/file/statistics.rs
+++ b/parquet/src/file/statistics.rs
@@ -45,6 +45,7 @@ use crate::basic::Type;
use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::errors::{ParquetError, Result};
+use crate::file::metadata::thrift_gen::PageStatistics;
use crate::util::bit_util::FromBytes;
pub(crate) mod private {
@@ -117,6 +118,7 @@ macro_rules! statistics_enum_func {
}};
}
+// FIXME(ets): remove this when done with format changes
/// Converts Thrift definition into `Statistics`.
pub fn from_thrift(
physical_type: Type,
@@ -266,6 +268,156 @@ pub fn from_thrift(
})
}
+/// Converts Thrift definition into `Statistics`.
+pub(crate) fn from_thrift_page_stats(
+ physical_type: Type,
+ thrift_stats: Option<PageStatistics>,
+) -> Result<Option<Statistics>> {
+ Ok(match thrift_stats {
+ Some(stats) => {
+ // Number of nulls recorded, when it is not available, we just
mark it as 0.
+ // TODO this should be `None` if there is no information about
NULLS.
+ // see https://github.com/apache/arrow-rs/pull/6216/files
+ let null_count = stats.null_count.unwrap_or(0);
+
+ if null_count < 0 {
+ return Err(ParquetError::General(format!(
+ "Statistics null count is negative {null_count}",
+ )));
+ }
+
+ // Generic null count.
+ let null_count = Some(null_count as u64);
+ // Generic distinct count (count of distinct values occurring)
+ let distinct_count = stats.distinct_count.map(|value| value as
u64);
+ // Whether or not statistics use deprecated min/max fields.
+ let old_format = stats.min_value.is_none() &&
stats.max_value.is_none();
+ // Generic min value as bytes.
+ let min = if old_format {
+ stats.min
+ } else {
+ stats.min_value
+ };
+ // Generic max value as bytes.
+ let max = if old_format {
+ stats.max
+ } else {
+ stats.max_value
+ };
+
+ fn check_len(min: &Option<Vec<u8>>, max: &Option<Vec<u8>>, len:
usize) -> Result<()> {
+ if let Some(min) = min {
+ if min.len() < len {
+ return Err(ParquetError::General(
+ "Insufficient bytes to parse min
statistic".to_string(),
+ ));
+ }
+ }
+ if let Some(max) = max {
+ if max.len() < len {
+ return Err(ParquetError::General(
+ "Insufficient bytes to parse max
statistic".to_string(),
+ ));
+ }
+ }
+ Ok(())
+ }
+
+ match physical_type {
+ Type::BOOLEAN => check_len(&min, &max, 1),
+ Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
+ Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
+ Type::INT96 => check_len(&min, &max, 12),
+ _ => Ok(()),
+ }?;
+
+ // Values are encoded using PLAIN encoding definition, except that
+ // variable-length byte arrays do not include a length prefix.
+ //
+ // Instead of using actual decoder, we manually convert values.
+ let res = match physical_type {
+ Type::BOOLEAN => Statistics::boolean(
+ min.map(|data| data[0] != 0),
+ max.map(|data| data[0] != 0),
+ distinct_count,
+ null_count,
+ old_format,
+ ),
+ Type::INT32 => Statistics::int32(
+ min.map(|data|
i32::from_le_bytes(data[..4].try_into().unwrap())),
+ max.map(|data|
i32::from_le_bytes(data[..4].try_into().unwrap())),
+ distinct_count,
+ null_count,
+ old_format,
+ ),
+ Type::INT64 => Statistics::int64(
+ min.map(|data|
i64::from_le_bytes(data[..8].try_into().unwrap())),
+ max.map(|data|
i64::from_le_bytes(data[..8].try_into().unwrap())),
+ distinct_count,
+ null_count,
+ old_format,
+ ),
+ Type::INT96 => {
+ // INT96 statistics may not be correct, because comparison
is signed
+ let min = if let Some(data) = min {
+ assert_eq!(data.len(), 12);
+ Some(Int96::try_from_le_slice(&data)?)
+ } else {
+ None
+ };
+ let max = if let Some(data) = max {
+ assert_eq!(data.len(), 12);
+ Some(Int96::try_from_le_slice(&data)?)
+ } else {
+ None
+ };
+ Statistics::int96(min, max, distinct_count, null_count,
old_format)
+ }
+ Type::FLOAT => Statistics::float(
+ min.map(|data|
f32::from_le_bytes(data[..4].try_into().unwrap())),
+ max.map(|data|
f32::from_le_bytes(data[..4].try_into().unwrap())),
+ distinct_count,
+ null_count,
+ old_format,
+ ),
+ Type::DOUBLE => Statistics::double(
+ min.map(|data|
f64::from_le_bytes(data[..8].try_into().unwrap())),
+ max.map(|data|
f64::from_le_bytes(data[..8].try_into().unwrap())),
+ distinct_count,
+ null_count,
+ old_format,
+ ),
+ Type::BYTE_ARRAY => Statistics::ByteArray(
+ ValueStatistics::new(
+ min.map(ByteArray::from),
+ max.map(ByteArray::from),
+ distinct_count,
+ null_count,
+ old_format,
+ )
+
.with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
+
.with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
+ ),
+ Type::FIXED_LEN_BYTE_ARRAY => Statistics::FixedLenByteArray(
+ ValueStatistics::new(
+ min.map(ByteArray::from).map(FixedLenByteArray::from),
+ max.map(ByteArray::from).map(FixedLenByteArray::from),
+ distinct_count,
+ null_count,
+ old_format,
+ )
+
.with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
+
.with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
+ ),
+ };
+
+ Some(res)
+ }
+ None => None,
+ })
+}
+
+// FIXME(ets): remove when done with format changes
/// Convert Statistics into Thrift definition.
pub fn to_thrift(stats: Option<&Statistics>) ->
Option<crate::format::Statistics> {
let stats = stats?;
@@ -315,6 +467,55 @@ pub fn to_thrift(stats: Option<&Statistics>) ->
Option<crate::format::Statistics
Some(thrift_stats)
}
+/// Convert Statistics into Thrift definition.
+pub(crate) fn page_stats_to_thrift(stats: Option<&Statistics>) ->
Option<PageStatistics> {
+ let stats = stats?;
+
+ // record null count if it can fit in i64
+ let null_count = stats
+ .null_count_opt()
+ .and_then(|value| i64::try_from(value).ok());
+
+ // record distinct count if it can fit in i64
+ let distinct_count = stats
+ .distinct_count_opt()
+ .and_then(|value| i64::try_from(value).ok());
+
+ let mut thrift_stats = PageStatistics {
+ max: None,
+ min: None,
+ null_count,
+ distinct_count,
+ max_value: None,
+ min_value: None,
+ is_max_value_exact: None,
+ is_min_value_exact: None,
+ };
+
+ // Get min/max if set.
+ let (min, max, min_exact, max_exact) = (
+ stats.min_bytes_opt().map(|x| x.to_vec()),
+ stats.max_bytes_opt().map(|x| x.to_vec()),
+ Some(stats.min_is_exact()),
+ Some(stats.max_is_exact()),
+ );
+ if stats.is_min_max_backwards_compatible() {
+ // Copy to deprecated min, max values for compatibility with older
readers
+ thrift_stats.min.clone_from(&min);
+ thrift_stats.max.clone_from(&max);
+ }
+
+ if !stats.is_min_max_deprecated() {
+ thrift_stats.min_value = min;
+ thrift_stats.max_value = max;
+ }
+
+ thrift_stats.is_min_value_exact = min_exact;
+ thrift_stats.is_max_value_exact = max_exact;
+
+ Some(thrift_stats)
+}
+
/// Strongly typed statistics for a column chunk within a row group.
///
/// This structure is a natively typed, in memory representation of the
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 158b2a21b7..71881b00ff 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -19,13 +19,13 @@
//! using row group writers and column writers respectively.
use crate::bloom_filter::Sbbf;
+use crate::file::metadata::thrift_gen::PageHeader;
use crate::file::page_index::index::Index;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
-use crate::thrift::TSerializable;
+use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
use std::fmt::Debug;
use std::io::{BufWriter, IoSlice, Read};
use std::{io::Write, sync::Arc};
-use thrift::protocol::TCompactOutputProtocol;
use crate::column::page_encryption::PageEncryptor;
use crate::column::writer::{get_typed_column_writer_mut, ColumnCloseResult,
ColumnWriterImpl};
@@ -939,15 +939,15 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> {
/// Serializes page header into Thrift.
/// Returns number of bytes that have been written into the sink.
#[inline]
- fn serialize_page_header(&mut self, header: crate::format::PageHeader) ->
Result<usize> {
+ fn serialize_page_header(&mut self, header: PageHeader) -> Result<usize> {
let start_pos = self.sink.bytes_written();
match self.page_encryptor_and_sink_mut() {
Some((page_encryptor, sink)) => {
page_encryptor.encrypt_page_header(&header, sink)?;
}
None => {
- let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
- header.write_to_out_protocol(&mut protocol)?;
+ let mut protocol = ThriftCompactOutputProtocol::new(&mut
self.sink);
+ header.write_thrift(&mut protocol)?;
}
}
Ok(self.sink.bytes_written() - start_pos)
@@ -1602,6 +1602,7 @@ mod tests {
let props = ReaderProperties::builder()
.set_backward_compatible_lz4(false)
+ .set_read_page_statistics(true)
.build();
let mut page_reader = SerializedPageReader::new_with_properties(
Arc::new(reader),
diff --git a/parquet/src/parquet_macros.rs b/parquet/src/parquet_macros.rs
index 889e5fafef..5720fd4ce0 100644
--- a/parquet/src/parquet_macros.rs
+++ b/parquet/src/parquet_macros.rs
@@ -323,6 +323,66 @@ macro_rules! thrift_struct {
}
}
+/// only implements ReadThrift for the give IDL struct definition
+#[macro_export]
+macro_rules! thrift_struct_read_impl {
+ ($(#[$($def_attrs:tt)*])* $vis:vis struct $identifier:ident $(<
$lt:lifetime >)? { $($(#[$($field_attrs:tt)*])* $field_id:literal :
$required_or_optional:ident $field_type:ident $(< $field_lt:lifetime >)? $(<
$element_type:ident >)? $field_name:ident $(= $default_value:literal)? $(;)?)*
}) => {
+ $(#[cfg_attr(not(doctest), $($def_attrs)*)])*
+ impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for
$identifier $(<$lt>)? {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
+ $(let mut $field_name:
Option<$crate::__thrift_field_type!($field_type $($field_lt)?
$($element_type)?)> = None;)*
+ let mut last_field_id = 0i16;
+ loop {
+ let field_ident = prot.read_field_begin(last_field_id)?;
+ if field_ident.field_type == FieldType::Stop {
+ break;
+ }
+ match field_ident.id {
+ $($field_id => {
+ let val = $crate::__thrift_read_field!(prot,
field_ident, $field_type $($field_lt)? $($element_type)?);
+ $field_name = Some(val);
+ })*
+ _ => {
+ prot.skip(field_ident.field_type)?;
+ }
+ };
+ last_field_id = field_ident.id;
+ }
+
$($crate::__thrift_result_required_or_optional!($required_or_optional
$field_name);)*
+ Ok(Self {
+ $($field_name),*
+ })
+ }
+ }
+ }
+}
+
+/// only implements WriteThrift for the give IDL struct definition
+#[macro_export]
+macro_rules! thrift_struct_write_impl {
+ ($(#[$($def_attrs:tt)*])* $vis:vis struct $identifier:ident $(<
$lt:lifetime >)? { $($(#[$($field_attrs:tt)*])* $field_id:literal :
$required_or_optional:ident $field_type:ident $(< $field_lt:lifetime >)? $(<
$element_type:ident >)? $field_name:ident $(= $default_value:literal)? $(;)?)*
}) => {
+ impl $(<$lt>)? WriteThrift for $identifier $(<$lt>)? {
+ const ELEMENT_TYPE: ElementType = ElementType::Struct;
+
+ #[allow(unused_assignments)]
+ fn write_thrift<W: Write>(&self, writer: &mut
ThriftCompactOutputProtocol<W>) -> Result<()> {
+ #[allow(unused_mut, unused_variables)]
+ let mut last_field_id = 0i16;
+
$($crate::__thrift_write_required_or_optional_field!($required_or_optional
$field_name, $field_id, $field_type, self, writer, last_field_id);)*
+ writer.write_struct_end()
+ }
+ }
+
+ impl $(<$lt>)? WriteThriftField for $identifier $(<$lt>)? {
+ fn write_thrift_field<W: Write>(&self, writer: &mut
ThriftCompactOutputProtocol<W>, field_id: i16, last_field_id: i16) ->
Result<i16> {
+ writer.write_field_begin(FieldType::Struct, field_id,
last_field_id)?;
+ self.write_thrift(writer)?;
+ Ok(field_id)
+ }
+ }
+ }
+}
+
#[doc(hidden)]
#[macro_export]
macro_rules! __thrift_write_required_or_optional_field {
@@ -391,15 +451,19 @@ macro_rules! __thrift_required_or_optional {
(optional $field_type:ty) => { Option<$field_type> };
}
+// Performance note: using `expect` here is about 4% faster on the page index
bench,
+// but we want to propagate errors. Using `ok_or` is *much* slower.
#[doc(hidden)]
#[macro_export]
macro_rules! __thrift_result_required_or_optional {
(required $field_name:ident) => {
- let $field_name = $field_name.expect(concat!(
- "Required field ",
- stringify!($field_name),
- " is missing",
- ));
+ let Some($field_name) = $field_name else {
+ return Err(general_err!(concat!(
+ "Required field ",
+ stringify!($field_name),
+ " is missing",
+ )));
+ };
};
(optional $field_name:ident) => {};
}
@@ -433,7 +497,7 @@ macro_rules! __thrift_read_field {
};
($prot:tt, $field_ident:tt, binary) => {
// this one needs to not conflict with `list<i8>`
- $prot.read_bytes()?.to_vec()
+ $prot.read_bytes_owned()?
};
($prot:tt, $field_ident:tt, double) => {
$crate::parquet_thrift::OrderedF64::read_thrift(&mut *$prot)?
diff --git a/parquet/src/parquet_thrift.rs b/parquet/src/parquet_thrift.rs
index 17847d0b71..5d549f012c 100644
--- a/parquet/src/parquet_thrift.rs
+++ b/parquet/src/parquet_thrift.rs
@@ -20,7 +20,10 @@
// to not allocate byte arrays or strings.
#![allow(dead_code)]
-use std::{cmp::Ordering, io::Write};
+use std::{
+ cmp::Ordering,
+ io::{Read, Write},
+};
use crate::errors::{ParquetError, Result};
@@ -197,6 +200,8 @@ pub(crate) trait ThriftCompactInputProtocol<'a> {
/// [binary]:
https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#binary-encoding
fn read_bytes(&mut self) -> Result<&'a [u8]>;
+ fn read_bytes_owned(&mut self) -> Result<Vec<u8>>;
+
/// Skip the next `n` bytes of input.
fn skip_bytes(&mut self, n: usize) -> Result<()>;
@@ -459,6 +464,10 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'b> for
ThriftSliceInputProtocol<'a>
Ok(ret)
}
+ fn read_bytes_owned(&mut self) -> Result<Vec<u8>> {
+ Ok(self.read_bytes()?.to_vec())
+ }
+
#[inline]
fn skip_bytes(&mut self, n: usize) -> Result<()> {
self.buf.get(..n).ok_or_else(eof_error)?;
@@ -480,6 +489,54 @@ fn eof_error() -> ParquetError {
eof_err!("Unexpected EOF")
}
+/// A Thrift input protocol that wraps a [`Read`] object.
+///
+/// Note that this is only intended for use in reading Parquet page headers.
This will panic
+/// if Thrift `binary` data is encountered because a slice of that data cannot
be returned.
+pub(crate) struct ThriftReadInputProtocol<R: Read> {
+ reader: R,
+}
+
+impl<R: Read> ThriftReadInputProtocol<R> {
+ pub(crate) fn new(reader: R) -> Self {
+ Self { reader }
+ }
+}
+
+impl<'a, R: Read> ThriftCompactInputProtocol<'a> for
ThriftReadInputProtocol<R> {
+ #[inline]
+ fn read_byte(&mut self) -> Result<u8> {
+ let mut buf = [0_u8; 1];
+ self.reader.read_exact(&mut buf)?;
+ Ok(buf[0])
+ }
+
+ fn read_bytes(&mut self) -> Result<&'a [u8]> {
+ unimplemented!()
+ }
+
+ fn read_bytes_owned(&mut self) -> Result<Vec<u8>> {
+ let len = self.read_vlq()? as usize;
+ let mut v = Vec::with_capacity(len);
+ std::io::copy(&mut self.reader.by_ref().take(len as u64), &mut v)?;
+ Ok(v)
+ }
+
+ fn skip_bytes(&mut self, n: usize) -> Result<()> {
+ std::io::copy(
+ &mut self.reader.by_ref().take(n as u64),
+ &mut std::io::sink(),
+ )?;
+ Ok(())
+ }
+
+ fn read_double(&mut self) -> Result<f64> {
+ let mut buf = [0_u8; 8];
+ self.reader.read_exact(&mut buf)?;
+ Ok(f64::from_le_bytes(buf))
+ }
+}
+
/// Trait implemented for objects that can be deserialized from a Thrift input
stream.
/// Implementations are provided for Thrift primitive types.
pub(crate) trait ReadThrift<'a, R: ThriftCompactInputProtocol<'a>> {
@@ -533,7 +590,7 @@ impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a,
R> for &'a str {
impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for String {
fn read_thrift(prot: &mut R) -> Result<Self> {
- Ok(prot.read_string()?.to_owned())
+ Ok(String::from_utf8(prot.read_bytes_owned()?)?)
}
}
diff --git a/parquet/src/thrift.rs b/parquet/src/thrift.rs
index 580ac2d7db..2492910a31 100644
--- a/parquet/src/thrift.rs
+++ b/parquet/src/thrift.rs
@@ -36,15 +36,22 @@ pub trait TSerializable: Sized {
// Public function to aid benchmarking. Reads Parquet `FileMetaData` encoded
in `bytes`.
#[doc(hidden)]
pub fn bench_file_metadata(bytes: &bytes::Bytes) {
- let mut input = TCompactSliceInputProtocol::new(bytes);
- crate::format::FileMetaData::read_from_in_protocol(&mut input).unwrap();
+ crate::file::metadata::thrift_gen::bench_file_metadata(bytes);
}
// Public function to aid benchmarking. Reads Parquet `PageHeader` encoded in
`bytes`.
#[doc(hidden)]
pub fn bench_page_header(bytes: &bytes::Bytes) {
- let mut prot = TCompactSliceInputProtocol::new(bytes);
- crate::format::PageHeader::read_from_in_protocol(&mut prot).unwrap();
+ use crate::parquet_thrift::ReadThrift;
+ let mut prot =
crate::parquet_thrift::ThriftReadInputProtocol::new(bytes.as_ref());
+ crate::file::metadata::thrift_gen::PageHeader::read_thrift(&mut
prot).unwrap();
+}
+
+// Public function to aid benchmarking. Reads Parquet `PageHeader` encoded in
`bytes`.
+#[doc(hidden)]
+pub fn bench_page_header_no_stats(bytes: &bytes::Bytes) {
+ let mut prot =
crate::parquet_thrift::ThriftReadInputProtocol::new(bytes.as_ref());
+
crate::file::metadata::thrift_gen::PageHeader::read_thrift_without_stats(&mut
prot).unwrap();
}
/// A more performant implementation of [`TCompactInputProtocol`] that reads a
slice
diff --git a/parquet/tests/arrow_reader/bad_data.rs
b/parquet/tests/arrow_reader/bad_data.rs
index ecf449a7ce..be401030e7 100644
--- a/parquet/tests/arrow_reader/bad_data.rs
+++ b/parquet/tests/arrow_reader/bad_data.rs
@@ -101,7 +101,7 @@ fn test_arrow_gh_41317() {
let err = read_file("ARROW-GH-41317.parquet").unwrap_err();
assert_eq!(
err.to_string(),
- "External: Parquet argument error: External: bad data"
+ "External: Parquet argument error: Parquet error: StructArrayReader
out of sync in read_records, expected 5 read, got 2"
);
}
diff --git a/parquet/tests/encryption/encryption_agnostic.rs
b/parquet/tests/encryption/encryption_agnostic.rs
index e071471712..48b5c77d9b 100644
--- a/parquet/tests/encryption/encryption_agnostic.rs
+++ b/parquet/tests/encryption/encryption_agnostic.rs
@@ -72,7 +72,7 @@ pub fn
read_plaintext_footer_file_without_decryption_properties() {
match record_reader.next() {
Some(Err(ArrowError::ParquetError(s))) => {
- assert!(s.contains("protocol error"));
+ assert!(s.contains("Parquet error"));
}
_ => {
panic!("Expected ArrowError::ParquetError");
@@ -137,7 +137,7 @@ pub async fn
read_plaintext_footer_file_without_decryption_properties_async() {
match record_reader.next().await {
Some(Err(ParquetError::ArrowError(s))) => {
- assert!(s.contains("protocol error"));
+ assert!(s.contains("Parquet error"));
}
_ => {
panic!("Expected ArrowError::ParquetError");