jecsand838 commented on code in PR #9171:
URL: https://github.com/apache/arrow-rs/pull/9171#discussion_r2723837724


##########
arrow-avro/src/writer/mod.rs:
##########
@@ -79,11 +162,194 @@ mod encoder;
 /// Logic for different Avro container file formats.
 pub mod format;
 
+/// A contiguous set of encoded rows.
+///
+/// `EncodedRows` stores:
+/// - a single backing byte buffer (`bytes::Bytes`)
+/// - a `Vec<u64>` of row boundary offsets (length = `rows + 1`)
+///
+/// This lets callers get per-row payloads as zero-copy `Bytes` slices.
+///
+/// For compatibility with APIs that require owned `Vec<u8>`, use 
[`EncodedRows::to_vecs`].
+#[derive(Debug, Clone)]
+pub struct EncodedRows {
+    data: Bytes,
+    offsets: Vec<u64>,
+}
+
+impl EncodedRows {
+    /// Create a new `EncodedRows` from a backing buffer and row boundary 
offsets.
+    ///
+    /// `offsets` must have length `rows + 1`, and be monotonically 
non-decreasing.
+    /// The last offset should equal `data.len()`.
+    pub fn new(data: Bytes, offsets: Vec<u64>) -> Self {
+        Self { data, offsets }
+    }
+
+    /// Number of rows in this buffer.
+    #[inline]
+    pub fn len(&self) -> usize {
+        self.offsets.len().saturating_sub(1)
+    }
+
+    /// Returns `true` if there are no rows.
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+
+    /// Returns the backing buffer.
+    ///
+    /// Note: individual rows should typically be accessed via [`Self::row`] 
or [`Self::rows`].
+    #[inline]
+    pub fn bytes(&self) -> &Bytes {
+        &self.data
+    }
+
+    /// Returns the row boundary offsets (length = `len() + 1`).
+    #[inline]
+    pub fn offsets(&self) -> &[u64] {
+        &self.offsets
+    }
+
+    /// Return the `i`th row as a zero-copy `Bytes` slice.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if the row offsets are invalid (e.g. exceed 
`usize::MAX`).
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use std::sync::Arc;
+    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+    /// use arrow_schema::{DataType, Field, Schema};
+    /// use arrow_avro::writer::WriterBuilder;
+    /// use arrow_avro::writer::format::AvroSoeFormat;
+    ///
+    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+    /// let schema = Schema::new(vec![Field::new("x", DataType::Int32, 
false)]);
+    /// let batch = RecordBatch::try_new(
+    ///     Arc::new(schema.clone()),
+    ///     vec![Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef],
+    /// )?;
+    ///
+    /// let mut encoder = 
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+    /// encoder.encode(&batch)?;
+    /// let rows = encoder.flush();
+    ///
+    /// // Access the first row (index 0)
+    /// let row0 = rows.row(0)?;
+    /// assert!(!row0.is_empty());
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn row(&self, i: usize) -> Result<Bytes, ArrowError> {
+        if i >= self.len() {
+            return Err(ArrowError::AvroError(format!(
+                "Row index {i} out of bounds for len {}",
+                self.len()
+            )));
+        }
+        // SAFETY:
+        // self.len() is defined as self.offsets.len().saturating_sub(1).
+        // The check `i >= self.len()` above ensures that `i < 
self.offsets.len() - 1`.
+        // Therefore, both `i` and `i + 1` are strictly within the bounds of 
`self.offsets`.
+        let (start_u64, end_u64) = unsafe {
+            (
+                *self.offsets.get_unchecked(i),
+                *self.offsets.get_unchecked(i + 1),
+            )
+        };
+        let start = usize::try_from(start_u64).map_err(|e| {
+            ArrowError::AvroError(format!("row start offset does not fit in 
usize: {e}"))
+        })?;
+        let end = usize::try_from(end_u64).map_err(|e| {
+            ArrowError::AvroError(format!("row end offset does not fit in 
usize: {e}"))
+        })?;
+        Ok(self.data.slice(start..end))
+    }
+
+    /// Iterate over rows as zero-copy `Bytes` slices.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use std::sync::Arc;
+    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+    /// use arrow_schema::{DataType, Field, Schema};
+    /// use arrow_avro::writer::WriterBuilder;
+    /// use arrow_avro::writer::format::AvroSoeFormat;
+    ///
+    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+    /// let schema = Schema::new(vec![Field::new("x", DataType::Int32, 
false)]);
+    /// let batch = RecordBatch::try_new(
+    ///     Arc::new(schema.clone()),
+    ///     vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
+    /// )?;
+    ///
+    /// let mut encoder = 
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+    /// encoder.encode(&batch)?;
+    /// let rows = encoder.flush();
+    ///
+    /// let mut count = 0;
+    /// for row in rows.rows() {
+    ///     let _bytes = row?;
+    ///     count += 1;
+    /// }
+    /// assert_eq!(count, 2);
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn rows(&self) -> impl Iterator<Item = Result<Bytes, ArrowError>> + '_ 
{
+        (0..self.len()).map(|i| self.row(i))

Review Comment:
   This was a great call out. I went ahead and implemented those changes and 
renamed the method from `rows` to `iter` which seemed more idiomatic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to