This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new b540248270 Refactor: Move parquet metadata parsing code into its own
module (#8436)
b540248270 is described below
commit b5402482705e6b092556c13e744ebe45555a2b90
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Sep 25 13:58:10 2025 -0700
Refactor: Move parquet metadata parsing code into its own module (#8436)
# Which issue does this PR close?
- Part of https://github.com/apache/arrow-rs/issues/8000
- Prep PR for https://github.com/apache/arrow-rs/pull/8340, to make it
easier to review
Note while this is a large (in line count) code change, it should be
relatively easy to review as it is just moving code around
# Rationale for this change
In https://github.com/apache/arrow-rs/pull/8340 I am trying to split the
"IO" from the "where is the metadata in the file" from the "decode
thrift into Rust structures" logic. The first part of this is simply to
move the code that handles the "decode thrift into Rust structures" into
its own module.
# What changes are included in this PR?
1. Move most of the "parse thrift bytes into rust structure" code from
`parquet/src/file/metadata/mod.rs ` to
`parquet/src/file/metadata/parser.rs`
# Are these changes tested?
yes, by CI
# Are there any user-facing changes?
No, this is entirely internal reorganization
---------
Co-authored-by: Matthijs Brobbel <[email protected]>
---
parquet/src/file/metadata/mod.rs | 7 +-
parquet/src/file/metadata/parser.rs | 475 +++++++++++++++++++++++++++++++++
parquet/src/file/metadata/reader.rs | 508 ++++--------------------------------
3 files changed, 535 insertions(+), 455 deletions(-)
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index f90143104c..a6f740f0f2 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -40,7 +40,7 @@
//! metadata into parquet files. To work with metadata directly,
//! the following APIs are available:
//!
-//! * [`ParquetMetaDataReader`] for reading from a reader for I/O
+//! * [`ParquetMetaDataReader`] for reading metadata from an I/O source (sync
and async)
//! * [`ParquetMetaDataPushDecoder`] for decoding from bytes without I/O
//! * [`ParquetMetaDataWriter`] for writing.
//!
@@ -91,6 +91,7 @@
//! * Same name, different struct
//! ```
mod memory;
+mod parser;
mod push_decoder;
pub(crate) mod reader;
mod writer;
@@ -195,10 +196,10 @@ impl ParquetMetaData {
ParquetMetaData {
file_metadata,
row_groups,
- #[cfg(feature = "encryption")]
- file_decryptor: None,
column_index: None,
offset_index: None,
+ #[cfg(feature = "encryption")]
+ file_decryptor: None,
}
}
diff --git a/parquet/src/file/metadata/parser.rs
b/parquet/src/file/metadata/parser.rs
new file mode 100644
index 0000000000..a68f14d4d7
--- /dev/null
+++ b/parquet/src/file/metadata/parser.rs
@@ -0,0 +1,475 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Internal metadata parsing routines
+//!
+//! These functions parse thrift-encoded metadata from a byte slice
+//! into the corresponding Rust structures
+
+use crate::basic::ColumnOrder;
+use crate::errors::ParquetError;
+use crate::file::metadata::{
+ ColumnChunkMetaData, FileMetaData, PageIndexPolicy, ParquetMetaData,
RowGroupMetaData,
+};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{decode_column_index,
decode_offset_index};
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
+use crate::schema::types;
+use crate::schema::types::SchemaDescriptor;
+use crate::thrift::TCompactSliceInputProtocol;
+use crate::thrift::TSerializable;
+use bytes::Bytes;
+use std::sync::Arc;
+
+#[cfg(feature = "encryption")]
+use crate::encryption::{
+ decrypt::{FileDecryptionProperties, FileDecryptor},
+ modules::create_footer_aad,
+};
+#[cfg(feature = "encryption")]
+use crate::format::EncryptionAlgorithm;
+
+/// Decodes [`ParquetMetaData`] from the provided bytes.
+///
+/// Typically this is used to decode the metadata from the end of a parquet
+/// file. The format of `buf` is the Thrift compact binary protocol, as
specified
+/// by the [Parquet Spec].
+///
+/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
+pub(crate) fn decode_metadata(buf: &[u8]) ->
crate::errors::Result<ParquetMetaData> {
+ let mut prot = TCompactSliceInputProtocol::new(buf);
+
+ let t_file_metadata: crate::format::FileMetaData =
+ crate::format::FileMetaData::read_from_in_protocol(&mut prot)
+ .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
+ let schema = types::from_thrift(&t_file_metadata.schema)?;
+ let schema_descr = Arc::new(SchemaDescriptor::new(schema));
+
+ let mut row_groups = Vec::new();
+ for rg in t_file_metadata.row_groups {
+ row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(),
rg)?);
+ }
+ let column_orders = parse_column_orders(t_file_metadata.column_orders,
&schema_descr)?;
+
+ let file_metadata = FileMetaData::new(
+ t_file_metadata.version,
+ t_file_metadata.num_rows,
+ t_file_metadata.created_by,
+ t_file_metadata.key_value_metadata,
+ schema_descr,
+ column_orders,
+ );
+
+ Ok(ParquetMetaData::new(file_metadata, row_groups))
+}
+
+/// Parses column orders from Thrift definition.
+/// If no column orders are defined, returns `None`.
+pub(crate) fn parse_column_orders(
+ t_column_orders: Option<Vec<crate::format::ColumnOrder>>,
+ schema_descr: &SchemaDescriptor,
+) -> crate::errors::Result<Option<Vec<ColumnOrder>>> {
+ match t_column_orders {
+ Some(orders) => {
+ // Should always be the case
+ if orders.len() != schema_descr.num_columns() {
+ return Err(general_err!("Column order length mismatch"));
+ };
+ let mut res = Vec::new();
+ for (i, column) in schema_descr.columns().iter().enumerate() {
+ match orders[i] {
+ crate::format::ColumnOrder::TYPEORDER(_) => {
+ let sort_order = ColumnOrder::get_sort_order(
+ column.logical_type(),
+ column.converted_type(),
+ column.physical_type(),
+ );
+ res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
+ }
+ }
+ }
+ Ok(Some(res))
+ }
+ None => Ok(None),
+ }
+}
+
+/// Parses column index from the provided bytes and adds it to the metadata.
+///
+/// Arguments
+/// * `metadata` - The ParquetMetaData to which the parsed column index will
be added.
+/// * `column_index_policy` - The policy for handling column index parsing
(e.g.,
+/// Required, Optional, Skip).
+/// * `bytes` - The byte slice containing the column index data.
+/// * `start_offset` - The offset where `bytes` begin in the file.
+pub(crate) fn parse_column_index(
+ metadata: &mut ParquetMetaData,
+ column_index_policy: PageIndexPolicy,
+ bytes: &Bytes,
+ start_offset: u64,
+) -> crate::errors::Result<()> {
+ if column_index_policy == PageIndexPolicy::Skip {
+ return Ok(());
+ }
+ let index = metadata
+ .row_groups()
+ .iter()
+ .enumerate()
+ .map(|(rg_idx, x)| {
+ x.columns()
+ .iter()
+ .enumerate()
+ .map(|(col_idx, c)| match c.column_index_range() {
+ Some(r) => {
+ let r_start = usize::try_from(r.start - start_offset)?;
+ let r_end = usize::try_from(r.end - start_offset)?;
+ parse_single_column_index(
+ &bytes[r_start..r_end],
+ metadata,
+ c,
+ rg_idx,
+ col_idx,
+ )
+ }
+ None => Ok(Index::NONE),
+ })
+ .collect::<crate::errors::Result<Vec<_>>>()
+ })
+ .collect::<crate::errors::Result<Vec<_>>>()?;
+
+ metadata.set_column_index(Some(index));
+ Ok(())
+}
+
+#[cfg(feature = "encryption")]
+fn parse_single_column_index(
+ bytes: &[u8],
+ metadata: &ParquetMetaData,
+ column: &ColumnChunkMetaData,
+ row_group_index: usize,
+ col_index: usize,
+) -> crate::errors::Result<Index> {
+ use crate::encryption::decrypt::CryptoContext;
+ match &column.column_crypto_metadata {
+ Some(crypto_metadata) => {
+ let file_decryptor =
metadata.file_decryptor.as_ref().ok_or_else(|| {
+ general_err!("Cannot decrypt column index, no file decryptor
set")
+ })?;
+ let crypto_context = CryptoContext::for_column(
+ file_decryptor,
+ crypto_metadata,
+ row_group_index,
+ col_index,
+ )?;
+ let column_decryptor = crypto_context.metadata_decryptor();
+ let aad = crypto_context.create_column_index_aad()?;
+ let plaintext = column_decryptor.decrypt(bytes, &aad)?;
+ decode_column_index(&plaintext, column.column_type())
+ }
+ None => decode_column_index(bytes, column.column_type()),
+ }
+}
+
+#[cfg(not(feature = "encryption"))]
+fn parse_single_column_index(
+ bytes: &[u8],
+ _metadata: &ParquetMetaData,
+ column: &ColumnChunkMetaData,
+ _row_group_index: usize,
+ _col_index: usize,
+) -> crate::errors::Result<Index> {
+ decode_column_index(bytes, column.column_type())
+}
+
+pub(crate) fn parse_offset_index(
+ metadata: &mut ParquetMetaData,
+ offset_index_policy: PageIndexPolicy,
+ bytes: &Bytes,
+ start_offset: u64,
+) -> crate::errors::Result<()> {
+ if offset_index_policy == PageIndexPolicy::Skip {
+ return Ok(());
+ }
+ let row_groups = metadata.row_groups();
+ let mut all_indexes = Vec::with_capacity(row_groups.len());
+ for (rg_idx, x) in row_groups.iter().enumerate() {
+ let mut row_group_indexes = Vec::with_capacity(x.columns().len());
+ for (col_idx, c) in x.columns().iter().enumerate() {
+ let result = match c.offset_index_range() {
+ Some(r) => {
+ let r_start = usize::try_from(r.start - start_offset)?;
+ let r_end = usize::try_from(r.end - start_offset)?;
+ parse_single_offset_index(&bytes[r_start..r_end],
metadata, c, rg_idx, col_idx)
+ }
+ None => Err(general_err!("missing offset index")),
+ };
+
+ match result {
+ Ok(index) => row_group_indexes.push(index),
+ Err(e) => {
+ if offset_index_policy == PageIndexPolicy::Required {
+ return Err(e);
+ } else {
+ // Invalidate and return
+ metadata.set_column_index(None);
+ metadata.set_offset_index(None);
+ return Ok(());
+ }
+ }
+ }
+ }
+ all_indexes.push(row_group_indexes);
+ }
+ metadata.set_offset_index(Some(all_indexes));
+ Ok(())
+}
+
+#[cfg(feature = "encryption")]
+fn parse_single_offset_index(
+ bytes: &[u8],
+ metadata: &ParquetMetaData,
+ column: &ColumnChunkMetaData,
+ row_group_index: usize,
+ col_index: usize,
+) -> crate::errors::Result<OffsetIndexMetaData> {
+ use crate::encryption::decrypt::CryptoContext;
+ match &column.column_crypto_metadata {
+ Some(crypto_metadata) => {
+ let file_decryptor =
metadata.file_decryptor.as_ref().ok_or_else(|| {
+ general_err!("Cannot decrypt offset index, no file decryptor
set")
+ })?;
+ let crypto_context = CryptoContext::for_column(
+ file_decryptor,
+ crypto_metadata,
+ row_group_index,
+ col_index,
+ )?;
+ let column_decryptor = crypto_context.metadata_decryptor();
+ let aad = crypto_context.create_offset_index_aad()?;
+ let plaintext = column_decryptor.decrypt(bytes, &aad)?;
+ decode_offset_index(&plaintext)
+ }
+ None => decode_offset_index(bytes),
+ }
+}
+
+#[cfg(not(feature = "encryption"))]
+fn parse_single_offset_index(
+ bytes: &[u8],
+ _metadata: &ParquetMetaData,
+ _column: &ColumnChunkMetaData,
+ _row_group_index: usize,
+ _col_index: usize,
+) -> crate::errors::Result<OffsetIndexMetaData> {
+ decode_offset_index(bytes)
+}
+
+/// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata
that may be encrypted.
+///
+/// Typically this is used to decode the metadata from the end of a parquet
+/// file. The format of `buf` is the Thrift compact binary protocol, as
specified
+/// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
+/// ciphers as specfied in the [Parquet Encryption Spec].
+///
+/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
+/// [Parquet Encryption Spec]:
https://parquet.apache.org/docs/file-format/data-pages/encryption/
+#[cfg(feature = "encryption")]
+pub(crate) fn decode_metadata_with_encryption(
+ buf: &[u8],
+ encrypted_footer: bool,
+ file_decryption_properties: Option<&FileDecryptionProperties>,
+) -> crate::errors::Result<ParquetMetaData> {
+ let mut prot = TCompactSliceInputProtocol::new(buf);
+ let mut file_decryptor = None;
+ let decrypted_fmd_buf;
+
+ if encrypted_footer {
+ if let Some(file_decryption_properties) = file_decryption_properties {
+ let t_file_crypto_metadata: crate::format::FileCryptoMetaData =
+ crate::format::FileCryptoMetaData::read_from_in_protocol(&mut
prot)
+ .map_err(|e| general_err!("Could not parse crypto
metadata: {}", e))?;
+ let supply_aad_prefix = match
&t_file_crypto_metadata.encryption_algorithm {
+ EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix,
+ _ => Some(false),
+ }
+ .unwrap_or(false);
+ if supply_aad_prefix &&
file_decryption_properties.aad_prefix().is_none() {
+ return Err(general_err!(
+ "Parquet file was encrypted with an AAD prefix that is
not stored in the file, \
+ but no AAD prefix was provided in the file decryption
properties"
+ ));
+ }
+ let decryptor = get_file_decryptor(
+ t_file_crypto_metadata.encryption_algorithm,
+ t_file_crypto_metadata.key_metadata.as_deref(),
+ file_decryption_properties,
+ )?;
+ let footer_decryptor = decryptor.get_footer_decryptor();
+ let aad_footer = create_footer_aad(decryptor.file_aad())?;
+
+ decrypted_fmd_buf = footer_decryptor?
+ .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
+ .map_err(|_| {
+ general_err!(
+ "Provided footer key and AAD were unable to decrypt
parquet footer"
+ )
+ })?;
+ prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
+
+ file_decryptor = Some(decryptor);
+ } else {
+ return Err(general_err!(
+ "Parquet file has an encrypted footer but decryption
properties were not provided"
+ ));
+ }
+ }
+
+ use crate::format::FileMetaData as TFileMetaData;
+ let t_file_metadata: TFileMetaData =
TFileMetaData::read_from_in_protocol(&mut prot)
+ .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
+ let schema = types::from_thrift(&t_file_metadata.schema)?;
+ let schema_descr = Arc::new(SchemaDescriptor::new(schema));
+
+ if let (Some(algo), Some(file_decryption_properties)) = (
+ t_file_metadata.encryption_algorithm,
+ file_decryption_properties,
+ ) {
+ // File has a plaintext footer but encryption algorithm is set
+ let file_decryptor_value = get_file_decryptor(
+ algo,
+ t_file_metadata.footer_signing_key_metadata.as_deref(),
+ file_decryption_properties,
+ )?;
+ if file_decryption_properties.check_plaintext_footer_integrity() &&
!encrypted_footer {
+ file_decryptor_value.verify_plaintext_footer_signature(buf)?;
+ }
+ file_decryptor = Some(file_decryptor_value);
+ }
+
+ let mut row_groups = Vec::new();
+ for rg in t_file_metadata.row_groups {
+ let r = RowGroupMetaData::from_encrypted_thrift(
+ schema_descr.clone(),
+ rg,
+ file_decryptor.as_ref(),
+ )?;
+ row_groups.push(r);
+ }
+ let column_orders = parse_column_orders(t_file_metadata.column_orders,
&schema_descr)?;
+
+ let file_metadata = FileMetaData::new(
+ t_file_metadata.version,
+ t_file_metadata.num_rows,
+ t_file_metadata.created_by,
+ t_file_metadata.key_value_metadata,
+ schema_descr,
+ column_orders,
+ );
+ let mut metadata = ParquetMetaData::new(file_metadata, row_groups);
+
+ metadata.with_file_decryptor(file_decryptor);
+
+ Ok(metadata)
+}
+
+#[cfg(feature = "encryption")]
+fn get_file_decryptor(
+ encryption_algorithm: EncryptionAlgorithm,
+ footer_key_metadata: Option<&[u8]>,
+ file_decryption_properties: &FileDecryptionProperties,
+) -> crate::errors::Result<FileDecryptor> {
+ match encryption_algorithm {
+ EncryptionAlgorithm::AESGCMV1(algo) => {
+ let aad_file_unique = algo
+ .aad_file_unique
+ .ok_or_else(|| general_err!("AAD unique file identifier is not
set"))?;
+ let aad_prefix = if let Some(aad_prefix) =
file_decryption_properties.aad_prefix() {
+ aad_prefix.clone()
+ } else {
+ algo.aad_prefix.unwrap_or_default()
+ };
+
+ FileDecryptor::new(
+ file_decryption_properties,
+ footer_key_metadata,
+ aad_file_unique,
+ aad_prefix,
+ )
+ }
+ EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!(
+ "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
+ )),
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use crate::basic::{SortOrder, Type};
+ use crate::file::metadata::SchemaType;
+ use crate::format::ColumnOrder as TColumnOrder;
+ use crate::format::TypeDefinedOrder;
+ #[test]
+ fn test_metadata_column_orders_parse() {
+ // Define simple schema, we do not need to provide logical types.
+ let fields = vec![
+ Arc::new(
+ SchemaType::primitive_type_builder("col1", Type::INT32)
+ .build()
+ .unwrap(),
+ ),
+ Arc::new(
+ SchemaType::primitive_type_builder("col2", Type::FLOAT)
+ .build()
+ .unwrap(),
+ ),
+ ];
+ let schema = SchemaType::group_type_builder("schema")
+ .with_fields(fields)
+ .build()
+ .unwrap();
+ let schema_descr = SchemaDescriptor::new(Arc::new(schema));
+
+ let t_column_orders = Some(vec![
+ TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
+ TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
+ ]);
+
+ assert_eq!(
+ parse_column_orders(t_column_orders, &schema_descr).unwrap(),
+ Some(vec![
+ ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
+ ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
+ ])
+ );
+
+ // Test when no column orders are defined.
+ assert_eq!(parse_column_orders(None, &schema_descr).unwrap(), None);
+ }
+
+ #[test]
+ fn test_metadata_column_orders_len_mismatch() {
+ let schema = SchemaType::group_type_builder("schema").build().unwrap();
+ let schema_descr = SchemaDescriptor::new(Arc::new(schema));
+
+ let t_column_orders =
Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
+
+ let res = parse_column_orders(t_column_orders, &schema_descr);
+ assert!(res.is_err());
+ assert!(format!("{:?}", res.unwrap_err()).contains("Column order
length mismatch"));
+ }
+}
diff --git a/parquet/src/file/metadata/reader.rs
b/parquet/src/file/metadata/reader.rs
index 8d92d1e0aa..92113f336e 100644
--- a/parquet/src/file/metadata/reader.rs
+++ b/parquet/src/file/metadata/reader.rs
@@ -15,46 +15,43 @@
// specific language governing permissions and limitations
// under the License.
-use std::{io::Read, ops::Range, sync::Arc};
+use std::{io::Read, ops::Range};
-use crate::basic::ColumnOrder;
#[cfg(feature = "encryption")]
-use crate::encryption::{
- decrypt::{FileDecryptionProperties, FileDecryptor},
- modules::create_footer_aad,
-};
-use bytes::Bytes;
-
+use crate::encryption::decrypt::FileDecryptionProperties;
use crate::errors::{ParquetError, Result};
-use crate::file::metadata::{ColumnChunkMetaData, FileMetaData,
ParquetMetaData, RowGroupMetaData};
-use crate::file::page_index::index::Index;
-use crate::file::page_index::index_reader::{acc_range, decode_column_index,
decode_offset_index};
+use crate::file::metadata::ParquetMetaData;
+use crate::file::page_index::index_reader::acc_range;
use crate::file::reader::ChunkReader;
use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER};
-use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as
TFileMetaData};
-#[cfg(feature = "encryption")]
-use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as
TFileCryptoMetaData};
-use crate::schema::types;
-use crate::schema::types::SchemaDescriptor;
-use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
#[cfg(all(feature = "async", feature = "arrow"))]
use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
#[cfg(feature = "encryption")]
-use crate::encryption::decrypt::CryptoContext;
-use crate::file::page_index::offset_index::OffsetIndexMetaData;
+use crate::file::metadata::parser::decode_metadata_with_encryption;
+use crate::file::metadata::parser::{decode_metadata, parse_column_index,
parse_offset_index};
-/// Reads the [`ParquetMetaData`] from a byte stream.
+/// Reads [`ParquetMetaData`] from a byte stream, with either synchronous or
+/// asynchronous I/O.
+///
+/// There are two flavors of APIs:
+/// * Synchronous: [`Self::try_parse()`], [`Self::try_parse_sized()`],
[`Self::parse_and_finish()`], etc.
+/// * Asynchronous (requires `async` and `arrow` features):
[`Self::try_load()`], etc
+///
+/// See the [`ParquetMetaDataPushDecoder`] for an API that does not require
I/O.
///
-/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a
description of
-/// the Parquet metadata.
+/// [`ParquetMetaDataPushDecoder`]:
crate::file::metadata::push_decoder::ParquetMetaDataPushDecoder
///
-/// Parquet metadata is not necessarily contiguous in the files: part is stored
+/// # Format Notes
+///
+/// Parquet metadata is not necessarily contiguous in a Parquet file: a
portion is stored
/// in the footer (the last bytes of the file), but other portions (such as the
/// PageIndex) can be stored elsewhere.
+/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for
more details of
+/// Parquet metadata.
///
/// This reader handles reading the footer as well as the non contiguous parts
-/// of the metadata such as the page indexes; excluding Bloom Filters.
+/// of the metadata (`PageIndex` and `ColumnIndex`). It does not handle
reading Bloom Filters.
///
/// # Example
/// ```no_run
@@ -243,6 +240,8 @@ impl ParquetMetaDataReader {
/// .with_page_indexes(true)
/// .parse_and_finish(&file).unwrap();
/// ```
+ ///
+ /// [`Bytes`]: bytes::Bytes
pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) ->
Result<ParquetMetaData> {
self.try_parse(reader)?;
self.finish()
@@ -253,6 +252,8 @@ impl ParquetMetaDataReader {
/// If `reader` is [`Bytes`] based, then the buffer must contain
sufficient bytes to complete
/// the request, and must include the Parquet footer. If page indexes are
desired, the buffer
/// must contain the entire file, or [`Self::try_parse_sized()`] should be
used.
+ ///
+ /// [`Bytes`]: bytes::Bytes
pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
self.try_parse_sized(reader, reader.len())
}
@@ -329,6 +330,8 @@ impl ParquetMetaDataReader {
/// }
/// let metadata = reader.finish().unwrap();
/// ```
+ ///
+ /// [`Bytes`]: bytes::Bytes
pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size:
u64) -> Result<()> {
self.metadata = match self.parse_metadata(reader) {
Ok(metadata) => Some(metadata),
@@ -369,22 +372,24 @@ impl ParquetMetaDataReader {
/// a [`Bytes`] struct containing the tail of the file).
/// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. Like
/// [`Self::try_parse_sized()`] this function may return
[`ParquetError::NeedMoreData`].
+ ///
+ /// [`Bytes`]: bytes::Bytes
pub fn read_page_indexes_sized<R: ChunkReader>(
&mut self,
reader: &R,
file_size: u64,
) -> Result<()> {
- if self.metadata.is_none() {
- return Err(general_err!(
- "Tried to read page indexes without ParquetMetaData metadata"
- ));
- }
-
// Get bounds needed for page indexes (if any are present in the file).
let Some(range) = self.range_for_page_index() else {
return Ok(());
};
+ let Some(metadata) = self.metadata.as_mut() else {
+ return Err(general_err!(
+ "Tried to read page indexes without ParquetMetaData metadata"
+ ));
+ };
+
// Check to see if needed range is within `file_range`. Checking
`range.end` seems
// redundant, but it guards against `range_for_page_index()` returning
garbage.
let file_range = file_size.saturating_sub(reader.len())..file_size;
@@ -417,8 +422,8 @@ impl ParquetMetaDataReader {
let bytes = reader.get_bytes(range.start - file_range.start,
bytes_needed)?;
let offset = range.start;
- self.parse_column_index(&bytes, offset)?;
- self.parse_offset_index(&bytes, offset)?;
+ parse_column_index(metadata, self.column_index, &bytes, offset)?;
+ parse_offset_index(metadata, self.offset_index, &bytes, offset)?;
Ok(())
}
@@ -507,17 +512,15 @@ impl ParquetMetaDataReader {
async fn load_page_index_with_remainder<F: MetadataFetch>(
&mut self,
mut fetch: F,
- remainder: Option<(usize, Bytes)>,
+ remainder: Option<(usize, bytes::Bytes)>,
) -> Result<()> {
- if self.metadata.is_none() {
- return Err(general_err!("Footer metadata is not present"));
- }
-
// Get bounds needed for page indexes (if any are present in the file).
- let range = self.range_for_page_index();
- let range = match range {
- Some(range) => range,
- None => return Ok(()),
+ let Some(range) = self.range_for_page_index() else {
+ return Ok(());
+ };
+
+ let Some(metadata) = self.metadata.as_mut() else {
+ return Err(general_err!("Footer metadata is not present"));
};
let bytes = match &remainder {
@@ -535,168 +538,12 @@ impl ParquetMetaDataReader {
// Sanity check
assert_eq!(bytes.len() as u64, range.end - range.start);
- self.parse_column_index(&bytes, range.start)?;
- self.parse_offset_index(&bytes, range.start)?;
-
- Ok(())
- }
-
- fn parse_column_index(&mut self, bytes: &Bytes, start_offset: u64) ->
Result<()> {
- let metadata = self.metadata.as_mut().unwrap();
- if self.column_index != PageIndexPolicy::Skip {
- let index = metadata
- .row_groups()
- .iter()
- .enumerate()
- .map(|(rg_idx, x)| {
- x.columns()
- .iter()
- .enumerate()
- .map(|(col_idx, c)| match c.column_index_range() {
- Some(r) => {
- let r_start = usize::try_from(r.start -
start_offset)?;
- let r_end = usize::try_from(r.end -
start_offset)?;
- Self::parse_single_column_index(
- &bytes[r_start..r_end],
- metadata,
- c,
- rg_idx,
- col_idx,
- )
- }
- None => Ok(Index::NONE),
- })
- .collect::<Result<Vec<_>>>()
- })
- .collect::<Result<Vec<_>>>()?;
-
- metadata.set_column_index(Some(index));
- }
- Ok(())
- }
+ parse_column_index(metadata, self.column_index, &bytes, range.start)?;
+ parse_offset_index(metadata, self.offset_index, &bytes, range.start)?;
- #[cfg(feature = "encryption")]
- fn parse_single_column_index(
- bytes: &[u8],
- metadata: &ParquetMetaData,
- column: &ColumnChunkMetaData,
- row_group_index: usize,
- col_index: usize,
- ) -> Result<Index> {
- match &column.column_crypto_metadata {
- Some(crypto_metadata) => {
- let file_decryptor =
metadata.file_decryptor.as_ref().ok_or_else(|| {
- general_err!("Cannot decrypt column index, no file
decryptor set")
- })?;
- let crypto_context = CryptoContext::for_column(
- file_decryptor,
- crypto_metadata,
- row_group_index,
- col_index,
- )?;
- let column_decryptor = crypto_context.metadata_decryptor();
- let aad = crypto_context.create_column_index_aad()?;
- let plaintext = column_decryptor.decrypt(bytes, &aad)?;
- decode_column_index(&plaintext, column.column_type())
- }
- None => decode_column_index(bytes, column.column_type()),
- }
- }
-
- #[cfg(not(feature = "encryption"))]
- fn parse_single_column_index(
- bytes: &[u8],
- _metadata: &ParquetMetaData,
- column: &ColumnChunkMetaData,
- _row_group_index: usize,
- _col_index: usize,
- ) -> Result<Index> {
- decode_column_index(bytes, column.column_type())
- }
-
- fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) ->
Result<()> {
- let metadata = self.metadata.as_mut().unwrap();
- if self.offset_index != PageIndexPolicy::Skip {
- let row_groups = metadata.row_groups();
- let mut all_indexes = Vec::with_capacity(row_groups.len());
- for (rg_idx, x) in row_groups.iter().enumerate() {
- let mut row_group_indexes =
Vec::with_capacity(x.columns().len());
- for (col_idx, c) in x.columns().iter().enumerate() {
- let result = match c.offset_index_range() {
- Some(r) => {
- let r_start = usize::try_from(r.start -
start_offset)?;
- let r_end = usize::try_from(r.end - start_offset)?;
- Self::parse_single_offset_index(
- &bytes[r_start..r_end],
- metadata,
- c,
- rg_idx,
- col_idx,
- )
- }
- None => Err(general_err!("missing offset index")),
- };
-
- match result {
- Ok(index) => row_group_indexes.push(index),
- Err(e) => {
- if self.offset_index == PageIndexPolicy::Required {
- return Err(e);
- } else {
- // Invalidate and return
- metadata.set_column_index(None);
- metadata.set_offset_index(None);
- return Ok(());
- }
- }
- }
- }
- all_indexes.push(row_group_indexes);
- }
- metadata.set_offset_index(Some(all_indexes));
- }
Ok(())
}
- #[cfg(feature = "encryption")]
- fn parse_single_offset_index(
- bytes: &[u8],
- metadata: &ParquetMetaData,
- column: &ColumnChunkMetaData,
- row_group_index: usize,
- col_index: usize,
- ) -> Result<OffsetIndexMetaData> {
- match &column.column_crypto_metadata {
- Some(crypto_metadata) => {
- let file_decryptor =
metadata.file_decryptor.as_ref().ok_or_else(|| {
- general_err!("Cannot decrypt offset index, no file
decryptor set")
- })?;
- let crypto_context = CryptoContext::for_column(
- file_decryptor,
- crypto_metadata,
- row_group_index,
- col_index,
- )?;
- let column_decryptor = crypto_context.metadata_decryptor();
- let aad = crypto_context.create_offset_index_aad()?;
- let plaintext = column_decryptor.decrypt(bytes, &aad)?;
- decode_offset_index(&plaintext)
- }
- None => decode_offset_index(bytes),
- }
- }
-
- #[cfg(not(feature = "encryption"))]
- fn parse_single_offset_index(
- bytes: &[u8],
- _metadata: &ParquetMetaData,
- _column: &ColumnChunkMetaData,
- _row_group_index: usize,
- _col_index: usize,
- ) -> Result<OffsetIndexMetaData> {
- decode_offset_index(bytes)
- }
-
fn range_for_page_index(&self) -> Option<Range<u64>> {
// sanity check
self.metadata.as_ref()?;
@@ -763,7 +610,7 @@ impl ParquetMetaDataReader {
&self,
fetch: &mut F,
file_size: u64,
- ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
+ ) -> Result<(ParquetMetaData, Option<(usize, bytes::Bytes)>)> {
let prefetch = self.get_prefetch_size() as u64;
if file_size < FOOTER_SIZE as u64 {
@@ -825,7 +672,7 @@ impl ParquetMetaDataReader {
async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
&self,
fetch: &mut F,
- ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
+ ) -> Result<(ParquetMetaData, Option<(usize, bytes::Bytes)>)> {
let prefetch = self.get_prefetch_size();
let suffix = fetch.fetch_suffix(prefetch as _).await?;
@@ -914,6 +761,8 @@ impl ParquetMetaDataReader {
/// file. The format of `buf` is the Thrift compact binary protocol, as
specified
/// by the [Parquet Spec].
///
+ /// It does **NOT** include the 8-byte footer.
+ ///
/// This method handles using either `decode_metadata` or
/// `decode_metadata_with_encryption` depending on whether the encryption
/// feature is enabled.
@@ -925,7 +774,7 @@ impl ParquetMetaDataReader {
footer_tail: &FooterTail,
) -> Result<ParquetMetaData> {
#[cfg(feature = "encryption")]
- let result = Self::decode_metadata_with_encryption(
+ let result = decode_metadata_with_encryption(
buf,
footer_tail.is_encrypted_footer(),
self.file_decryption_properties.as_ref(),
@@ -943,112 +792,6 @@ impl ParquetMetaDataReader {
result
}
- /// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata
that may be encrypted.
- ///
- /// Typically this is used to decode the metadata from the end of a parquet
- /// file. The format of `buf` is the Thrift compact binary protocol, as
specified
- /// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
- /// ciphers as specfied in the [Parquet Encryption Spec].
- ///
- /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
- /// [Parquet Encryption Spec]:
https://parquet.apache.org/docs/file-format/data-pages/encryption/
- #[cfg(feature = "encryption")]
- fn decode_metadata_with_encryption(
- buf: &[u8],
- encrypted_footer: bool,
- file_decryption_properties: Option<&FileDecryptionProperties>,
- ) -> Result<ParquetMetaData> {
- let mut prot = TCompactSliceInputProtocol::new(buf);
- let mut file_decryptor = None;
- let decrypted_fmd_buf;
-
- if encrypted_footer {
- if let Some(file_decryption_properties) =
file_decryption_properties {
- let t_file_crypto_metadata: TFileCryptoMetaData =
- TFileCryptoMetaData::read_from_in_protocol(&mut prot)
- .map_err(|e| general_err!("Could not parse crypto
metadata: {}", e))?;
- let supply_aad_prefix = match
&t_file_crypto_metadata.encryption_algorithm {
- EncryptionAlgorithm::AESGCMV1(algo) =>
algo.supply_aad_prefix,
- _ => Some(false),
- }
- .unwrap_or(false);
- if supply_aad_prefix &&
file_decryption_properties.aad_prefix().is_none() {
- return Err(general_err!(
- "Parquet file was encrypted with an AAD prefix that is
not stored in the file, \
- but no AAD prefix was provided in the file decryption
properties"
- ));
- }
- let decryptor = get_file_decryptor(
- t_file_crypto_metadata.encryption_algorithm,
- t_file_crypto_metadata.key_metadata.as_deref(),
- file_decryption_properties,
- )?;
- let footer_decryptor = decryptor.get_footer_decryptor();
- let aad_footer = create_footer_aad(decryptor.file_aad())?;
-
- decrypted_fmd_buf = footer_decryptor?
- .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
- .map_err(|_| {
- general_err!(
- "Provided footer key and AAD were unable to
decrypt parquet footer"
- )
- })?;
- prot =
TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
-
- file_decryptor = Some(decryptor);
- } else {
- return Err(general_err!("Parquet file has an encrypted footer
but decryption properties were not provided"));
- }
- }
-
- let t_file_metadata: TFileMetaData =
TFileMetaData::read_from_in_protocol(&mut prot)
- .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
- let schema = types::from_thrift(&t_file_metadata.schema)?;
- let schema_descr = Arc::new(SchemaDescriptor::new(schema));
-
- if let (Some(algo), Some(file_decryption_properties)) = (
- t_file_metadata.encryption_algorithm,
- file_decryption_properties,
- ) {
- // File has a plaintext footer but encryption algorithm is set
- let file_decryptor_value = get_file_decryptor(
- algo,
- t_file_metadata.footer_signing_key_metadata.as_deref(),
- file_decryption_properties,
- )?;
- if file_decryption_properties.check_plaintext_footer_integrity()
&& !encrypted_footer {
- file_decryptor_value.verify_plaintext_footer_signature(buf)?;
- }
- file_decryptor = Some(file_decryptor_value);
- }
-
- let mut row_groups = Vec::new();
- for rg in t_file_metadata.row_groups {
- let r = RowGroupMetaData::from_encrypted_thrift(
- schema_descr.clone(),
- rg,
- file_decryptor.as_ref(),
- )?;
- row_groups.push(r);
- }
- let column_orders =
- Self::parse_column_orders(t_file_metadata.column_orders,
&schema_descr)?;
-
- let file_metadata = FileMetaData::new(
- t_file_metadata.version,
- t_file_metadata.num_rows,
- t_file_metadata.created_by,
- t_file_metadata.key_value_metadata,
- schema_descr,
- column_orders,
- );
- let mut metadata = ParquetMetaData::new(file_metadata, row_groups);
-
- metadata.with_file_decryptor(file_decryptor);
-
- Ok(metadata)
- }
-
/// Decodes [`ParquetMetaData`] from the provided bytes.
///
/// Typically this is used to decode the metadata from the end of a parquet
@@ -1057,105 +800,18 @@ impl ParquetMetaDataReader {
///
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
- let mut prot = TCompactSliceInputProtocol::new(buf);
-
- let t_file_metadata: TFileMetaData =
TFileMetaData::read_from_in_protocol(&mut prot)
- .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
- let schema = types::from_thrift(&t_file_metadata.schema)?;
- let schema_descr = Arc::new(SchemaDescriptor::new(schema));
-
- let mut row_groups = Vec::new();
- for rg in t_file_metadata.row_groups {
-
row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
- }
- let column_orders =
- Self::parse_column_orders(t_file_metadata.column_orders,
&schema_descr)?;
-
- let file_metadata = FileMetaData::new(
- t_file_metadata.version,
- t_file_metadata.num_rows,
- t_file_metadata.created_by,
- t_file_metadata.key_value_metadata,
- schema_descr,
- column_orders,
- );
-
- Ok(ParquetMetaData::new(file_metadata, row_groups))
- }
-
- /// Parses column orders from Thrift definition.
- /// If no column orders are defined, returns `None`.
- fn parse_column_orders(
- t_column_orders: Option<Vec<TColumnOrder>>,
- schema_descr: &SchemaDescriptor,
- ) -> Result<Option<Vec<ColumnOrder>>> {
- match t_column_orders {
- Some(orders) => {
- // Should always be the case
- if orders.len() != schema_descr.num_columns() {
- return Err(general_err!("Column order length mismatch"));
- };
- let mut res = Vec::new();
- for (i, column) in schema_descr.columns().iter().enumerate() {
- match orders[i] {
- TColumnOrder::TYPEORDER(_) => {
- let sort_order = ColumnOrder::get_sort_order(
- column.logical_type(),
- column.converted_type(),
- column.physical_type(),
- );
-
res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
- }
- }
- }
- Ok(Some(res))
- }
- None => Ok(None),
- }
- }
-}
-
-#[cfg(feature = "encryption")]
-fn get_file_decryptor(
- encryption_algorithm: EncryptionAlgorithm,
- footer_key_metadata: Option<&[u8]>,
- file_decryption_properties: &FileDecryptionProperties,
-) -> Result<FileDecryptor> {
- match encryption_algorithm {
- EncryptionAlgorithm::AESGCMV1(algo) => {
- let aad_file_unique = algo
- .aad_file_unique
- .ok_or_else(|| general_err!("AAD unique file identifier is not
set"))?;
- let aad_prefix = if let Some(aad_prefix) =
file_decryption_properties.aad_prefix() {
- aad_prefix.clone()
- } else {
- algo.aad_prefix.unwrap_or_default()
- };
-
- FileDecryptor::new(
- file_decryption_properties,
- footer_key_metadata,
- aad_file_unique,
- aad_prefix,
- )
- }
- EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!(
- "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
- )),
+ // Note this API does not support encryption.
+ decode_metadata(buf)
}
}
#[cfg(test)]
mod tests {
use super::*;
- use bytes::Bytes;
-
- use crate::basic::SortOrder;
- use crate::basic::Type;
use crate::file::reader::Length;
- use crate::format::TypeDefinedOrder;
- use crate::schema::types::Type as SchemaType;
use crate::util::test_common::file_util::get_test_file;
+ use bytes::Bytes;
+ use std::ops::Range;
#[test]
fn test_parse_metadata_size_smaller_than_footer() {
@@ -1185,59 +841,6 @@ mod tests {
assert!(matches!(err, ParquetError::NeedMoreData(263)));
}
- #[test]
- fn test_metadata_column_orders_parse() {
- // Define simple schema, we do not need to provide logical types.
- let fields = vec![
- Arc::new(
- SchemaType::primitive_type_builder("col1", Type::INT32)
- .build()
- .unwrap(),
- ),
- Arc::new(
- SchemaType::primitive_type_builder("col2", Type::FLOAT)
- .build()
- .unwrap(),
- ),
- ];
- let schema = SchemaType::group_type_builder("schema")
- .with_fields(fields)
- .build()
- .unwrap();
- let schema_descr = SchemaDescriptor::new(Arc::new(schema));
-
- let t_column_orders = Some(vec![
- TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
- TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
- ]);
-
- assert_eq!(
- ParquetMetaDataReader::parse_column_orders(t_column_orders,
&schema_descr).unwrap(),
- Some(vec![
- ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
- ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
- ])
- );
-
- // Test when no column orders are defined.
- assert_eq!(
- ParquetMetaDataReader::parse_column_orders(None,
&schema_descr).unwrap(),
- None
- );
- }
-
- #[test]
- fn test_metadata_column_orders_len_mismatch() {
- let schema = SchemaType::group_type_builder("schema").build().unwrap();
- let schema_descr = SchemaDescriptor::new(Arc::new(schema));
-
- let t_column_orders =
Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
-
- let res = ParquetMetaDataReader::parse_column_orders(t_column_orders,
&schema_descr);
- assert!(res.is_err());
- assert!(format!("{:?}", res.unwrap_err()).contains("Column order
length mismatch"));
- }
-
#[test]
#[allow(deprecated)]
fn test_try_parse() {
@@ -1369,6 +972,7 @@ mod async_tests {
use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
use std::sync::atomic::{AtomicUsize, Ordering};
+ use std::sync::Arc;
use tempfile::NamedTempFile;
use crate::arrow::ArrowWriter;