This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 0975ca6 perf(core): skip decoding out-of-range log blocks and add
unit test (#420)
0975ca6 is described below
commit 0975ca64b27634b96ce35f0863e1b8d57a79d8af
Author: Sagar Sumit <[email protected]>
AuthorDate: Mon Aug 18 20:53:59 2025 +0530
perf(core): skip decoding out-of-range log blocks and add unit test (#420)
Signed-off-by: Sagar Sumit <[email protected]>
---
crates/core/src/file_group/log_file/reader.rs | 54 +++++++++++++++++++++++++--
1 file changed, 50 insertions(+), 4 deletions(-)
diff --git a/crates/core/src/file_group/log_file/reader.rs
b/crates/core/src/file_group/log_file/reader.rs
index c8e9657..e0ad3ca 100644
--- a/crates/core/src/file_group/log_file/reader.rs
+++ b/crates/core/src/file_group/log_file/reader.rs
@@ -25,12 +25,14 @@ use crate::file_group::log_file::log_block::{
BlockMetadataKey, BlockMetadataType, BlockType, LogBlock,
};
use crate::file_group::log_file::log_format::{LogFormatVersion, MAGIC};
+use crate::file_group::record_batches::RecordBatches;
use crate::storage::reader::StorageReader;
use crate::storage::Storage;
use crate::timeline::selector::InstantRange;
use crate::Result;
use bytes::BytesMut;
use std::collections::HashMap;
+use std::io::SeekFrom;
use std::io::{self, Read, Seek};
use std::sync::Arc;
@@ -235,10 +237,27 @@ impl<R: Read + Seek> LogFileReader<R> {
let format_version = self.read_log_format_version()?;
let block_type = self.read_block_type(&format_version)?;
let header = self.read_block_metadata(BlockMetadataType::Header,
&format_version)?;
- let mut skipped = false;
+ // If block is out of the requested range, fast skip its payload
without decoding
if self.should_skip_block(&header, instant_range)? {
- skipped = true;
- // TODO skip reading block
+ // block_length excludes the magic; we consumed 8 bytes of length
already.
+ // Jump to the end of this block (absolute seek from start of
file):
+ // end_pos = curr_pos (right after magic) + 8 (length field) +
block_length
+ let target = curr_pos
+ .checked_add(8)
+ .and_then(|v| v.checked_add(block_length))
+ .ok_or_else(|| CoreError::LogFormatError("Block length
overflow".to_string()))?;
+ self.reader
+ .seek(SeekFrom::Start(target))
+ .map_err(CoreError::ReadLogFileError)?;
+
+ return Ok(Some(LogBlock {
+ format_version,
+ block_type,
+ header,
+ record_batches: RecordBatches::new(),
+ footer: HashMap::new(),
+ skipped: true,
+ }));
}
let decoder = Decoder::new(self.hudi_configs.clone());
@@ -258,7 +277,7 @@ impl<R: Read + Seek> LogFileReader<R> {
header,
record_batches,
footer,
- skipped,
+ skipped: false,
}))
}
}
@@ -468,4 +487,31 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_skip_out_of_range_block_fast_path() -> Result<()> {
+ // use a file with a single data block
+ let (dir, file_name) = get_valid_log_parquet_data();
+ let mut reader = create_log_file_reader(&dir, &file_name).await?;
+
+ // choose an end timestamp earlier than the block's instant time so it
should be skipped
+ let instant_range = InstantRange::up_to("20200101000000000", "utc");
+
+ // call the internal reader to inspect the skipped block
+ let maybe_block = reader.read_next_block(&instant_range)?;
+ assert!(maybe_block.is_some(), "Expected a block to be read");
+ let block = maybe_block.unwrap();
+ assert!(block.skipped, "Block should be marked as skipped");
+ assert_eq!(block.record_batches.num_data_batches(), 0);
+ assert_eq!(block.record_batches.num_delete_batches(), 0);
+
+ // next call should hit EOF
+ let next = reader.read_next_block(&instant_range)?;
+ assert!(
+ next.is_none(),
+ "Should reach EOF after skipping the only block"
+ );
+
+ Ok(())
+ }
}