martin-g commented on code in PR #530:
URL: https://github.com/apache/avro-rs/pull/530#discussion_r3158977744
##########
avro/src/reader/block.rs:
##########
@@ -295,6 +355,26 @@ impl<'r, R: Read> Block<'r, R> {
}
}
+impl<R: Read + Seek> Block<'_, R> {
+ /// Seek the underlying stream to `offset` and read the block there.
+ /// Validates the sync marker to confirm it's a real block boundary.
+ /// Returns an error if no valid block can be read at the offset
+ /// (e.g., the offset is at or past EOF).
+ pub(super) fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> {
+ self.reader
+ .seek(SeekFrom::Start(offset))
+ .map_err(Details::SeekToBlock)?;
Review Comment:
It would be safer to reset the state first (lines 368-371).
Currently if SeekToBlock is returned then the state won't be reset and if
Block::read_block_next() is used it will fail at
https://github.com/apache/avro-rs/blob/a6c7563272484dcd71fc1584b7b574e6d401ccea/avro/src/reader/block.rs#L198
##########
avro/src/reader/block.rs:
##########
Review Comment:
`self.current_block_info` will be set even if the decompression fails.
And `Reader::current_block()` will see it.
##########
avro/src/reader/mod.rs:
##########
@@ -366,4 +406,111 @@ mod tests {
panic!("Expected an error in the reading of the codec!");
}
}
+
+ /// Write an Avro file with multiple blocks and verify we can seek between
them.
+ #[test]
+ fn test_seek_to_block() -> TestResult {
+ use crate::writer::Writer;
+
+ let schema = Schema::parse_str(SCHEMA)?;
+ let mut writer = Writer::new(&schema, Vec::new())?;
+
+ // Block 0: records with a=10, a=20
+ let mut r = Record::new(&schema).unwrap();
+ r.put("a", 10i64);
+ r.put("b", "b0_r0");
+ writer.append_value(r)?;
+ let mut r = Record::new(&schema).unwrap();
+ r.put("a", 20i64);
+ r.put("b", "b0_r1");
+ writer.append_value(r)?;
+ writer.flush()?;
+
+ // Block 1: records with a=30, a=40
+ let mut r = Record::new(&schema).unwrap();
+ r.put("a", 30i64);
+ r.put("b", "b1_r0");
+ writer.append_value(r)?;
+ let mut r = Record::new(&schema).unwrap();
+ r.put("a", 40i64);
+ r.put("b", "b1_r1");
+ writer.append_value(r)?;
+ writer.flush()?;
+
+ // Block 2: records with a=50
+ let mut r = Record::new(&schema).unwrap();
+ r.put("a", 50i64);
+ r.put("b", "b2_r0");
+ writer.append_value(r)?;
+ writer.flush()?;
+
+ let data = writer.into_inner()?;
+
+ // Read forward and collect block positions
+ let mut reader = Reader::new(Cursor::new(&data))?;
+ let mut block_offsets: Vec<BlockPosition> = Vec::new();
+ let mut all_values: Vec<Value> = Vec::new();
+
+ assert!(reader.current_block().is_none());
+
+ while let Some(value) = reader.next() {
+ all_values.push(value?);
+ let pos = reader.current_block().expect("block info after read");
+ if block_offsets
+ .last()
+ .is_none_or(|last| last.offset != pos.offset)
+ {
+ block_offsets.push(pos);
+ }
+ }
+
+ assert_eq!(all_values.len(), 5);
+ assert_eq!(block_offsets.len(), 3);
+ assert_eq!(block_offsets[0].message_count, 2);
+ assert_eq!(block_offsets[1].message_count, 2);
+ assert_eq!(block_offsets[2].message_count, 1);
+ assert_eq!(reader.data_start(), block_offsets[0].offset);
+
+ // Seek back to block 1 and read its records
+ reader.seek_to_block(block_offsets[1].offset)?;
+ let v1 = reader.next().unwrap()?;
+ assert_eq!(v1, all_values[2]);
+ let v2 = reader.next().unwrap()?;
+ assert_eq!(v2, all_values[3]);
+
+ // Seek back to block 0
+ reader.seek_to_block(block_offsets[0].offset)?;
+ let v0 = reader.next().unwrap()?;
+ assert_eq!(v0, all_values[0]);
+
+ // Seek to block 2
+ reader.seek_to_block(block_offsets[2].offset)?;
+ let v4 = reader.next().unwrap()?;
+ assert_eq!(v4, all_values[4]);
+
+ assert!(reader.next().is_none());
+
+ Ok(())
+ }
+
+ /// Seeking to an invalid offset should fail with a sync marker error.
+ #[test]
+ fn test_seek_to_invalid_offset() -> TestResult {
+ use crate::writer::Writer;
+
+ let schema = Schema::parse_str(SCHEMA)?;
+ let mut writer = Writer::new(&schema, Vec::new())?;
+ let mut r = Record::new(&schema).unwrap();
+ r.put("a", 1i64);
+ r.put("b", "x");
+ writer.append_value(r)?;
+ writer.flush()?;
+ let data = writer.into_inner()?;
+
+ let mut reader = Reader::new(Cursor::new(&data))?;
+ let result = reader.seek_to_block(7);
+ assert!(result.is_err());
+
Review Comment:
Please extend this test with seek to EOF and beyond:
```suggestion
let eof = data.len() as u64;
assert!(reader.seek_to_block(eof).is_err());
assert!(reader.seek_to_block(eof + 1).is_err());
```
##########
avro/src/reader/mod.rs:
##########
@@ -159,6 +164,41 @@ impl<R: Read> Iterator for Reader<'_, R> {
}
}
+impl<R: Read> Reader<'_, R> {
+ /// The currently loaded block's position and record count.
+ ///
+ /// Returns `None` only before the first block is loaded (via iteration or
+ /// [`seek_to_block`](Self::seek_to_block)). Always `Some` afterward.
+ pub fn current_block(&self) -> Option<BlockPosition> {
+ self.block.current_block_info
+ }
+
+ /// Byte offset where data blocks begin (right after the file header).
+ ///
+ /// This is the offset of the first data block — equivalent to the position
+ /// that would be returned by `current_block().offset` for block 0.
+ pub fn data_start(&self) -> u64 {
+ self.block.data_start
+ }
+}
+
+impl<R: Read + Seek> Reader<'_, R> {
+ /// Seek to the data block at the given byte offset and load it.
+ ///
+ /// The offset must point to the start of a valid data block (before its
+ /// object-count varint). The block is read, decompressed, and its sync
+ /// marker is validated against the file header. After this call,
[`Iterator::next`]
+ /// yields the first record in that block.
+ ///
+ /// Typically the caller saves offsets from
[`current_block`](Self::current_block)
+ /// during forward iteration and later passes them here to jump back.
+ pub fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> {
+ let seek_status = self.block.seek_to_block(offset);
Review Comment:
This logic has the side effect of resetting the `errored` flag.
If `errored` was set to `true` earlier, e.g. a call to `Reader::next()` then
a call to `Reader::seek_to_block(offset)` that successfully reads the block at
that offset will reset the `errored` to `false`. Now the user application can
again call Reader::next() (or any other method that checks `errored` before
doing any work).
##########
avro/src/reader/block.rs:
##########
@@ -35,10 +35,57 @@ use crate::{
util,
};
+/// Byte offset and record count of a single Avro data block.
+///
+/// Captured automatically as blocks are read during forward iteration.
+/// Use with [`super::Reader::seek_to_block`] to jump back to a
previously-read block.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct BlockPosition {
+ /// Byte offset in the stream where this block starts (before the
object-count varint).
+ pub offset: u64,
Review Comment:
Since BlockPosition is public I'd recommend to make the fields private, add
constructor and getters. This way it will be easier to add more fields later if
needed without API breaks.
##########
avro/src/reader/block.rs:
##########
@@ -35,10 +35,57 @@ use crate::{
util,
};
+/// Byte offset and record count of a single Avro data block.
+///
+/// Captured automatically as blocks are read during forward iteration.
+/// Use with [`super::Reader::seek_to_block`] to jump back to a
previously-read block.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct BlockPosition {
+ /// Byte offset in the stream where this block starts (before the
object-count varint).
+ pub offset: u64,
+ /// Total number of records in this block.
+ pub message_count: usize,
+}
+
+/// Wraps an inner reader and tracks the current byte position.
+///
+/// Avoids requiring `Seek` just to know how many bytes have been consumed.
+/// When the inner reader also implements `Seek`, seeking updates the tracked
position.
+#[derive(Debug, Clone)]
+struct PositionTracker<R> {
+ inner: R,
+ pos: u64,
+}
+
+impl<R> PositionTracker<R> {
+ fn new(inner: R) -> Self {
+ Self { inner, pos: 0 }
+ }
+
+ fn position(&self) -> u64 {
+ self.pos
+ }
+}
+
+impl<R: Read> Read for PositionTracker<R> {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ let n = self.inner.read(buf)?;
+ self.pos += n as u64;
+ Ok(n)
+ }
Review Comment:
The problem I see is:
- imagine some custom Read implementation that overrides any of the provided
methods (all Read methods but `read()`) for any reason. For example let's say
the custom Read impl has a custom version of `read_exact()`
- PositionTracker is not opt-in and thus any usage of Read::read_exact() in
the Avro codebase will always use PositionTracker::read_exact(). This will use
the default impl
(https://doc.rust-lang.org/stable/src/std/io/mod.rs.html#1044-1046) that will
delegate to PositionTracker::read(). It will delegate to inner::read() and
increment the read bytes (`pos`).
- So, the custom Read impl read_exact() won't be used at all and the user
application has nothing to do
PositionTracker either needs to be smarter or opt-in.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]