alamb commented on code in PR #8340:
URL: https://github.com/apache/arrow-rs/pull/8340#discussion_r2383112611
##########
parquet/src/file/metadata/parser.rs:
##########
@@ -43,6 +43,86 @@ use crate::encryption::{
#[cfg(feature = "encryption")]
use crate::format::EncryptionAlgorithm;
+/// Helper struct for metadata parsing
+///
+/// This structure parses thrift-encoded bytes into the correct Rust structs,
+/// such as [`ParquetMetaData`], handling decryption if necessary.
+//
+// Note this structure is used to minimize the number of
+// places need to add `#[cfg(feature = "encryption")]` checks.
+pub(crate) use inner::MetadataParser;
+
+#[cfg(feature = "encryption")]
+mod inner {
+ use super::*;
+ use crate::encryption::decrypt::FileDecryptionProperties;
+ use crate::errors::Result;
+
+ /// API for decoding metadata that may be encrypted
+ #[derive(Debug, Default)]
+ pub(crate) struct MetadataParser {
+ // the credentials and keys needed to decrypt metadata
+ file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
+ }
+
+ impl MetadataParser {
+ pub(crate) fn new() -> Self {
+ MetadataParser::default()
+ }
+
+ pub(crate) fn with_file_decryption_properties(
+ mut self,
+ file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
+ ) -> Self {
+ self.file_decryption_properties = file_decryption_properties;
+ self
+ }
+
+ pub(crate) fn decode_metadata(
+ &self,
+ buf: &[u8],
+ encrypted_footer: bool,
+ ) -> Result<ParquetMetaData> {
+ decode_metadata_with_encryption(
+ buf,
+ encrypted_footer,
+ self.file_decryption_properties.as_deref(),
+ )
+ }
+ }
+}
+
+#[cfg(not(feature = "encryption"))]
+mod inner {
+ use super::*;
+ use crate::errors::Result;
+ /// parallel implementation when encryption feature is not enabled
+ ///
+ /// This has the same API as the encryption-enabled version
+ #[derive(Debug, Default)]
+ pub(crate) struct MetadataParser;
+
+ impl MetadataParser {
+ pub(crate) fn new() -> Self {
+ MetadataParser
+ }
+
+ pub(crate) fn decode_metadata(
+ &self,
+ buf: &[u8],
+ encrypted_footer: bool,
+ ) -> Result<ParquetMetaData> {
+ if encrypted_footer {
+ Err(general_err!(
+ "Parquet file has an encrypted footer but the encryption
feature is disabled"
+ ))
+ } else {
+ decode_metadata(buf)
Review Comment:
I don't fully understand your description of the problem. Do you mean you
inlined the contents of decode_metadata or something?
Is there anything I can do to make the pattern more amenable to the
thrift-remodel branch?
##########
parquet/src/file/metadata/push_decoder.rs:
##########
@@ -211,16 +232,38 @@ impl ParquetMetaDataPushDecoder {
)));
};
- let metadata_reader =
-
ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Optional);
-
Ok(Self {
- done: false,
- metadata_reader,
+ state: DecodeState::ReadingFooter,
+ column_index_policy: PageIndexPolicy::Optional,
+ offset_index_policy: PageIndexPolicy::Optional,
buffers: crate::util::push_buffers::PushBuffers::new(file_len),
+ metadata_parser: MetadataParser::new(),
})
}
+ /// Begin decoding from the given footer tail.
+ pub(crate) fn try_new_with_footer_tail(
+ file_len: u64,
+ footer_tail: FooterTail,
+ ) -> Result<Self, ParquetError> {
Review Comment:
Oh, now I see what you are asking -- I will update this to use
crate::errors::Result (which is what I thought I had used)!
##########
parquet/src/file/metadata/push_decoder.rs:
##########
@@ -259,70 +325,160 @@ impl ParquetMetaDataPushDecoder {
/// example on [`Self`]
pub fn push_ranges(
&mut self,
- ranges: Vec<std::ops::Range<u64>>,
- buffers: Vec<bytes::Bytes>,
- ) -> std::result::Result<(), String> {
- if self.done {
- return Err(
+ ranges: Vec<Range<u64>>,
+ buffers: Vec<Bytes>,
+ ) -> Result<(), ParquetError> {
+ if matches!(&self.state, DecodeState::Finished) {
+ return Err(general_err!(
"ParquetMetaDataPushDecoder: cannot push data after decoding
is finished"
- .to_string(),
- );
+ ));
}
self.buffers.push_ranges(ranges, buffers);
Ok(())
}
- /// Try to decode the metadata from the pushed data, returning the
- /// decoded metadata or an error if not enough data is available.
- pub fn try_decode(
- &mut self,
- ) -> std::result::Result<DecodeResult<ParquetMetaData>, ParquetError> {
- if self.done {
- return Ok(DecodeResult::Finished);
+ /// Pushes a single range of data into the decoder's buffer.
+ pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) ->
Result<(), ParquetError> {
+ if matches!(&self.state, DecodeState::Finished) {
+ return Err(general_err!(
+ "ParquetMetaDataPushDecoder: cannot push data after decoding
is finished"
+ ));
}
+ self.buffers.push_range(range, buffer);
+ Ok(())
+ }
- // need to have the last 8 bytes of the file to decode the metadata
+ /// Try to decode the metadata from the pushed data, returning the
+ /// decoded metadata or an error if not enough data is available.
+ pub fn try_decode(&mut self) -> Result<DecodeResult<ParquetMetaData>,
ParquetError> {
let file_len = self.buffers.file_len();
- if !self.buffers.has_range(&(file_len - 8..file_len)) {
- #[expect(clippy::single_range_in_vec_init)]
- return Ok(DecodeResult::NeedsData(vec![file_len - 8..file_len]));
+ let footer_len = FOOTER_SIZE as u64;
+ loop {
+ match std::mem::replace(&mut self.state,
DecodeState::Intermediate) {
+ DecodeState::ReadingFooter => {
+ // need to have the last 8 bytes of the file to decode the
metadata
+ let footer_start = file_len.saturating_sub(footer_len);
+ let footer_range = footer_start..file_len;
+
+ if !self.buffers.has_range(&footer_range) {
+ self.state = DecodeState::ReadingFooter;
+ return Ok(needs_range(footer_range));
+ }
+ let footer_bytes = self.get_bytes(&footer_range)?;
+ let footer_tail =
FooterTail::try_from(footer_bytes.as_ref())?;
+
+ self.state = DecodeState::ReadingMetadata(footer_tail);
+ continue;
+ }
+
+ DecodeState::ReadingMetadata(footer_tail) => {
+ let metadata_len: u64 = footer_tail.metadata_length() as
u64;
+ let metadata_start = file_len - footer_len - metadata_len;
+ let metadata_end = metadata_start + metadata_len;
+ let metadata_range = metadata_start..metadata_end;
+
+ if !self.buffers.has_range(&metadata_range) {
+ self.state = DecodeState::ReadingMetadata(footer_tail);
+ return Ok(needs_range(metadata_range));
+ }
+
+ let metadata = self.metadata_parser.decode_metadata(
+ &self.get_bytes(&metadata_range)?,
+ footer_tail.is_encrypted_footer(),
+ )?;
+ self.state =
DecodeState::ReadingPageIndex(Box::new(metadata));
+ continue;
+ }
+
+ DecodeState::ReadingPageIndex(mut metadata) => {
Review Comment:
Good call -- I tried to clarify in 12bca808bf.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]