This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 0975ca6  perf(core): skip decoding out-of-range log blocks and add 
unit test (#420)
0975ca6 is described below

commit 0975ca64b27634b96ce35f0863e1b8d57a79d8af
Author: Sagar Sumit <[email protected]>
AuthorDate: Mon Aug 18 20:53:59 2025 +0530

    perf(core): skip decoding out-of-range log blocks and add unit test (#420)
    
    Signed-off-by: Sagar Sumit <[email protected]>
---
 crates/core/src/file_group/log_file/reader.rs | 54 +++++++++++++++++++++++++--
 1 file changed, 50 insertions(+), 4 deletions(-)

diff --git a/crates/core/src/file_group/log_file/reader.rs 
b/crates/core/src/file_group/log_file/reader.rs
index c8e9657..e0ad3ca 100644
--- a/crates/core/src/file_group/log_file/reader.rs
+++ b/crates/core/src/file_group/log_file/reader.rs
@@ -25,12 +25,14 @@ use crate::file_group::log_file::log_block::{
     BlockMetadataKey, BlockMetadataType, BlockType, LogBlock,
 };
 use crate::file_group::log_file::log_format::{LogFormatVersion, MAGIC};
+use crate::file_group::record_batches::RecordBatches;
 use crate::storage::reader::StorageReader;
 use crate::storage::Storage;
 use crate::timeline::selector::InstantRange;
 use crate::Result;
 use bytes::BytesMut;
 use std::collections::HashMap;
+use std::io::SeekFrom;
 use std::io::{self, Read, Seek};
 use std::sync::Arc;
 
@@ -235,10 +237,27 @@ impl<R: Read + Seek> LogFileReader<R> {
         let format_version = self.read_log_format_version()?;
         let block_type = self.read_block_type(&format_version)?;
         let header = self.read_block_metadata(BlockMetadataType::Header, 
&format_version)?;
-        let mut skipped = false;
+        // If block is out of the requested range, fast skip its payload 
without decoding
         if self.should_skip_block(&header, instant_range)? {
-            skipped = true;
-            // TODO skip reading block
+            // block_length excludes the magic; we consumed 8 bytes of length 
already.
+            // Jump to the end of this block (absolute seek from start of 
file):
+            // end_pos = curr_pos (right after magic) + 8 (length field) + 
block_length
+            let target = curr_pos
+                .checked_add(8)
+                .and_then(|v| v.checked_add(block_length))
+                .ok_or_else(|| CoreError::LogFormatError("Block length 
overflow".to_string()))?;
+            self.reader
+                .seek(SeekFrom::Start(target))
+                .map_err(CoreError::ReadLogFileError)?;
+
+            return Ok(Some(LogBlock {
+                format_version,
+                block_type,
+                header,
+                record_batches: RecordBatches::new(),
+                footer: HashMap::new(),
+                skipped: true,
+            }));
         }
 
         let decoder = Decoder::new(self.hudi_configs.clone());
@@ -258,7 +277,7 @@ impl<R: Read + Seek> LogFileReader<R> {
             header,
             record_batches,
             footer,
-            skipped,
+            skipped: false,
         }))
     }
 }
@@ -468,4 +487,31 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_skip_out_of_range_block_fast_path() -> Result<()> {
+        // use a file with a single data block
+        let (dir, file_name) = get_valid_log_parquet_data();
+        let mut reader = create_log_file_reader(&dir, &file_name).await?;
+
+        // choose an end timestamp earlier than the block's instant time so it 
should be skipped
+        let instant_range = InstantRange::up_to("20200101000000000", "utc");
+
+        // call the internal reader to inspect the skipped block
+        let maybe_block = reader.read_next_block(&instant_range)?;
+        assert!(maybe_block.is_some(), "Expected a block to be read");
+        let block = maybe_block.unwrap();
+        assert!(block.skipped, "Block should be marked as skipped");
+        assert_eq!(block.record_batches.num_data_batches(), 0);
+        assert_eq!(block.record_batches.num_delete_batches(), 0);
+
+        // next call should hit EOF
+        let next = reader.read_next_block(&instant_range)?;
+        assert!(
+            next.is_none(),
+            "Should reach EOF after skipping the only block"
+        );
+
+        Ok(())
+    }
 }

Reply via email to