alamb commented on code in PR #9171:
URL: https://github.com/apache/arrow-rs/pull/9171#discussion_r2723382951
##########
arrow-avro/src/writer/mod.rs:
##########
@@ -79,11 +162,194 @@ mod encoder;
/// Logic for different Avro container file formats.
pub mod format;
+/// A contiguous set of encoded rows.
+///
+/// `EncodedRows` stores:
+/// - a single backing byte buffer (`bytes::Bytes`)
+/// - a `Vec<u64>` of row boundary offsets (length = `rows + 1`)
+///
+/// This lets callers get per-row payloads as zero-copy `Bytes` slices.
+///
+/// For compatibility with APIs that require owned `Vec<u8>`, use
[`EncodedRows::to_vecs`].
+#[derive(Debug, Clone)]
+pub struct EncodedRows {
+ data: Bytes,
+ offsets: Vec<u64>,
+}
+
+impl EncodedRows {
+ /// Create a new `EncodedRows` from a backing buffer and row boundary
offsets.
+ ///
+ /// `offsets` must have length `rows + 1`, and be monotonically
non-decreasing.
+ /// The last offset should equal `data.len()`.
+ pub fn new(data: Bytes, offsets: Vec<u64>) -> Self {
+ Self { data, offsets }
+ }
+
+ /// Number of rows in this buffer.
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.offsets.len().saturating_sub(1)
+ }
+
+ /// Returns `true` if there are no rows.
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ /// Returns the backing buffer.
+ ///
+ /// Note: individual rows should typically be accessed via [`Self::row`]
or [`Self::rows`].
+ #[inline]
+ pub fn bytes(&self) -> &Bytes {
+ &self.data
+ }
+
+ /// Returns the row boundary offsets (length = `len() + 1`).
+ #[inline]
+ pub fn offsets(&self) -> &[u64] {
+ &self.offsets
+ }
+
+ /// Return the `i`th row as a zero-copy `Bytes` slice.
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if the row offsets are invalid (e.g. exceed
`usize::MAX`).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+ /// use arrow_schema::{DataType, Field, Schema};
+ /// use arrow_avro::writer::WriterBuilder;
+ /// use arrow_avro::writer::format::AvroSoeFormat;
+ ///
+ /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// let schema = Schema::new(vec![Field::new("x", DataType::Int32,
false)]);
+ /// let batch = RecordBatch::try_new(
+ /// Arc::new(schema.clone()),
+ /// vec![Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef],
+ /// )?;
+ ///
+ /// let mut encoder =
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+ /// encoder.encode(&batch)?;
+ /// let rows = encoder.flush();
+ ///
+ /// // Access the first row (index 0)
+ /// let row0 = rows.row(0)?;
+ /// assert!(!row0.is_empty());
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn row(&self, i: usize) -> Result<Bytes, ArrowError> {
+ if i >= self.len() {
+ return Err(ArrowError::AvroError(format!(
+ "Row index {i} out of bounds for len {}",
+ self.len()
+ )));
+ }
+ // SAFETY:
+ // self.len() is defined as self.offsets.len().saturating_sub(1).
+ // The check `i >= self.len()` above ensures that `i <
self.offsets.len() - 1`.
+ // Therefore, both `i` and `i + 1` are strictly within the bounds of
`self.offsets`.
+ let (start_u64, end_u64) = unsafe {
Review Comment:
die you see this use of unsafe make a difference in benchmarks?
##########
arrow-avro/src/writer/mod.rs:
##########
@@ -172,6 +480,74 @@ impl WriterBuilder {
}
}
+/// A row-by-row streaming encoder for Avro **Single Object Encoding** (SOE)
streams.
Review Comment:
I wonder why a user couldn't just use `Writer` with a `mut Vec` as the the
sink - you would get the same effect
Is the difference that you get the output offsets as well?
##########
arrow-avro/src/writer/mod.rs:
##########
@@ -79,11 +162,194 @@ mod encoder;
/// Logic for different Avro container file formats.
pub mod format;
+/// A contiguous set of encoded rows.
+///
+/// `EncodedRows` stores:
+/// - a single backing byte buffer (`bytes::Bytes`)
+/// - a `Vec<u64>` of row boundary offsets (length = `rows + 1`)
+///
+/// This lets callers get per-row payloads as zero-copy `Bytes` slices.
+///
+/// For compatibility with APIs that require owned `Vec<u8>`, use
[`EncodedRows::to_vecs`].
+#[derive(Debug, Clone)]
+pub struct EncodedRows {
+ data: Bytes,
+ offsets: Vec<u64>,
+}
+
+impl EncodedRows {
+ /// Create a new `EncodedRows` from a backing buffer and row boundary
offsets.
+ ///
+ /// `offsets` must have length `rows + 1`, and be monotonically
non-decreasing.
+ /// The last offset should equal `data.len()`.
+ pub fn new(data: Bytes, offsets: Vec<u64>) -> Self {
+ Self { data, offsets }
+ }
+
+ /// Number of rows in this buffer.
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.offsets.len().saturating_sub(1)
+ }
+
+ /// Returns `true` if there are no rows.
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ /// Returns the backing buffer.
+ ///
+ /// Note: individual rows should typically be accessed via [`Self::row`]
or [`Self::rows`].
+ #[inline]
+ pub fn bytes(&self) -> &Bytes {
+ &self.data
+ }
+
+ /// Returns the row boundary offsets (length = `len() + 1`).
+ #[inline]
+ pub fn offsets(&self) -> &[u64] {
+ &self.offsets
+ }
+
+ /// Return the `i`th row as a zero-copy `Bytes` slice.
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if the row offsets are invalid (e.g. exceed
`usize::MAX`).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+ /// use arrow_schema::{DataType, Field, Schema};
+ /// use arrow_avro::writer::WriterBuilder;
+ /// use arrow_avro::writer::format::AvroSoeFormat;
+ ///
+ /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// let schema = Schema::new(vec![Field::new("x", DataType::Int32,
false)]);
+ /// let batch = RecordBatch::try_new(
+ /// Arc::new(schema.clone()),
+ /// vec![Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef],
+ /// )?;
+ ///
+ /// let mut encoder =
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+ /// encoder.encode(&batch)?;
+ /// let rows = encoder.flush();
+ ///
+ /// // Access the first row (index 0)
+ /// let row0 = rows.row(0)?;
+ /// assert!(!row0.is_empty());
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn row(&self, i: usize) -> Result<Bytes, ArrowError> {
+ if i >= self.len() {
+ return Err(ArrowError::AvroError(format!(
+ "Row index {i} out of bounds for len {}",
+ self.len()
+ )));
+ }
+ // SAFETY:
+ // self.len() is defined as self.offsets.len().saturating_sub(1).
+ // The check `i >= self.len()` above ensures that `i <
self.offsets.len() - 1`.
+ // Therefore, both `i` and `i + 1` are strictly within the bounds of
`self.offsets`.
+ let (start_u64, end_u64) = unsafe {
+ (
+ *self.offsets.get_unchecked(i),
+ *self.offsets.get_unchecked(i + 1),
+ )
+ };
+ let start = usize::try_from(start_u64).map_err(|e| {
+ ArrowError::AvroError(format!("row start offset does not fit in
usize: {e}"))
+ })?;
+ let end = usize::try_from(end_u64).map_err(|e| {
+ ArrowError::AvroError(format!("row end offset does not fit in
usize: {e}"))
+ })?;
+ Ok(self.data.slice(start..end))
+ }
+
+ /// Iterate over rows as zero-copy `Bytes` slices.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+ /// use arrow_schema::{DataType, Field, Schema};
+ /// use arrow_avro::writer::WriterBuilder;
+ /// use arrow_avro::writer::format::AvroSoeFormat;
+ ///
+ /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// let schema = Schema::new(vec![Field::new("x", DataType::Int32,
false)]);
+ /// let batch = RecordBatch::try_new(
+ /// Arc::new(schema.clone()),
+ /// vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
+ /// )?;
+ ///
+ /// let mut encoder =
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+ /// encoder.encode(&batch)?;
+ /// let rows = encoder.flush();
+ ///
+ /// let mut count = 0;
+ /// for row in rows.rows() {
+ /// let _bytes = row?;
+ /// count += 1;
+ /// }
+ /// assert_eq!(count, 2);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn rows(&self) -> impl Iterator<Item = Result<Bytes, ArrowError>> + '_
{
+ (0..self.len()).map(|i| self.row(i))
+ }
+
+ /// Copy all rows into independent `Vec<u8>` buffers.
+ ///
+ /// This is useful for compatibility with APIs that require owned, mutable
byte vectors.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+ /// use arrow_schema::{DataType, Field, Schema};
+ /// use arrow_avro::writer::WriterBuilder;
+ /// use arrow_avro::writer::format::AvroSoeFormat;
+ ///
+ /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// let schema = Schema::new(vec![Field::new("x", DataType::Int32,
false)]);
+ /// let batch = RecordBatch::try_new(
+ /// Arc::new(schema.clone()),
+ /// vec![Arc::new(Int32Array::from(vec![100])) as ArrayRef],
+ /// )?;
+ ///
+ /// let mut encoder =
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+ /// encoder.encode(&batch)?;
+ /// let rows = encoder.flush();
+ ///
+ /// let vecs = rows.to_vecs()?;
+ /// assert_eq!(vecs.len(), 1);
+ /// assert!(!vecs[0].is_empty());
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn to_vecs(&self) -> Result<Vec<Vec<u8>>, ArrowError> {
+ let mut out = Vec::with_capacity(self.len());
+ for i in 0..self.len() {
+ out.push(self.row(i)?.to_vec());
+ }
+ Ok(out)
+ }
Review Comment:
This seems like an unnecessary API to me -- you could do it the same with
```rust
let vecs: Vec<_> = rows.iter().map(|v| v.to_vec()).collect()
```
##########
arrow-avro/src/writer/mod.rs:
##########
@@ -79,11 +162,194 @@ mod encoder;
/// Logic for different Avro container file formats.
pub mod format;
+/// A contiguous set of encoded rows.
+///
+/// `EncodedRows` stores:
+/// - a single backing byte buffer (`bytes::Bytes`)
+/// - a `Vec<u64>` of row boundary offsets (length = `rows + 1`)
+///
+/// This lets callers get per-row payloads as zero-copy `Bytes` slices.
+///
+/// For compatibility with APIs that require owned `Vec<u8>`, use
[`EncodedRows::to_vecs`].
+#[derive(Debug, Clone)]
+pub struct EncodedRows {
+ data: Bytes,
+ offsets: Vec<u64>,
+}
+
+impl EncodedRows {
+ /// Create a new `EncodedRows` from a backing buffer and row boundary
offsets.
+ ///
+ /// `offsets` must have length `rows + 1`, and be monotonically
non-decreasing.
+ /// The last offset should equal `data.len()`.
+ pub fn new(data: Bytes, offsets: Vec<u64>) -> Self {
+ Self { data, offsets }
+ }
+
+ /// Number of rows in this buffer.
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.offsets.len().saturating_sub(1)
+ }
+
+ /// Returns `true` if there are no rows.
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ /// Returns the backing buffer.
+ ///
+ /// Note: individual rows should typically be accessed via [`Self::row`]
or [`Self::rows`].
+ #[inline]
+ pub fn bytes(&self) -> &Bytes {
+ &self.data
+ }
+
+ /// Returns the row boundary offsets (length = `len() + 1`).
+ #[inline]
+ pub fn offsets(&self) -> &[u64] {
+ &self.offsets
+ }
+
+ /// Return the `i`th row as a zero-copy `Bytes` slice.
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if the row offsets are invalid (e.g. exceed
`usize::MAX`).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+ /// use arrow_schema::{DataType, Field, Schema};
+ /// use arrow_avro::writer::WriterBuilder;
+ /// use arrow_avro::writer::format::AvroSoeFormat;
+ ///
+ /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// let schema = Schema::new(vec![Field::new("x", DataType::Int32,
false)]);
+ /// let batch = RecordBatch::try_new(
+ /// Arc::new(schema.clone()),
+ /// vec![Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef],
+ /// )?;
+ ///
+ /// let mut encoder =
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+ /// encoder.encode(&batch)?;
+ /// let rows = encoder.flush();
+ ///
+ /// // Access the first row (index 0)
+ /// let row0 = rows.row(0)?;
+ /// assert!(!row0.is_empty());
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn row(&self, i: usize) -> Result<Bytes, ArrowError> {
+ if i >= self.len() {
+ return Err(ArrowError::AvroError(format!(
+ "Row index {i} out of bounds for len {}",
+ self.len()
+ )));
+ }
+ // SAFETY:
+ // self.len() is defined as self.offsets.len().saturating_sub(1).
+ // The check `i >= self.len()` above ensures that `i <
self.offsets.len() - 1`.
+ // Therefore, both `i` and `i + 1` are strictly within the bounds of
`self.offsets`.
+ let (start_u64, end_u64) = unsafe {
+ (
+ *self.offsets.get_unchecked(i),
+ *self.offsets.get_unchecked(i + 1),
+ )
+ };
+ let start = usize::try_from(start_u64).map_err(|e| {
+ ArrowError::AvroError(format!("row start offset does not fit in
usize: {e}"))
+ })?;
+ let end = usize::try_from(end_u64).map_err(|e| {
+ ArrowError::AvroError(format!("row end offset does not fit in
usize: {e}"))
+ })?;
+ Ok(self.data.slice(start..end))
+ }
+
+ /// Iterate over rows as zero-copy `Bytes` slices.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+ /// use arrow_schema::{DataType, Field, Schema};
+ /// use arrow_avro::writer::WriterBuilder;
+ /// use arrow_avro::writer::format::AvroSoeFormat;
+ ///
+ /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// let schema = Schema::new(vec![Field::new("x", DataType::Int32,
false)]);
+ /// let batch = RecordBatch::try_new(
+ /// Arc::new(schema.clone()),
+ /// vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
+ /// )?;
+ ///
+ /// let mut encoder =
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+ /// encoder.encode(&batch)?;
+ /// let rows = encoder.flush();
+ ///
+ /// let mut count = 0;
+ /// for row in rows.rows() {
+ /// let _bytes = row?;
+ /// count += 1;
+ /// }
+ /// assert_eq!(count, 2);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn rows(&self) -> impl Iterator<Item = Result<Bytes, ArrowError>> + '_
{
+ (0..self.len()).map(|i| self.row(i))
Review Comment:
This is likely more efficient if you returned the sliced Bytes directly --
calling `row` will continually recheck `len`, for example
You could do something like this to get known good iffsets
```rust
self.offsets.iter().windows(2).map(...)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]