jecsand838 commented on code in PR #8371:
URL: https://github.com/apache/arrow-rs/pull/8371#discussion_r2357365074


##########
arrow-avro/src/schema.rs:
##########
@@ -316,6 +316,22 @@ pub struct Fixed<'a> {
     pub attributes: Attributes<'a>,
 }
 
+/// Defines the strategy for generating the per-record prefix for an Avro 
binary stream.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum FingerprintStrategy {
+    /// Use the 64-bit Rabin fingerprint (default for single-object encoding).
+    #[default]
+    Rabin,
+    /// Use a Confluent Schema Registry 32-bit ID.
+    ConfluentSchemaId(u32),

Review Comment:
   Also let's stick to the same convention we are using in `Fingerprint` imo.
   
   ```suggestion
       Id(u32),
   ```



##########
arrow-avro/src/writer/format.rs:
##########
@@ -26,16 +29,29 @@ use std::io::Write;
 /// Format abstraction implemented by each container‐level writer.
 pub trait AvroFormat: Debug + Default {
     /// Write any bytes required at the very beginning of the output stream
+    /// (file header, etc.).
     /// Implementations **must not** write any record data.
     fn start_stream<W: Write>(
         &mut self,
         writer: &mut W,
         schema: &Schema,
         compression: Option<CompressionCodec>,
+        fingerprint_strategy: FingerprintStrategy,
     ) -> Result<(), ArrowError>;
 
     /// Return the 16‑byte sync marker (OCF) or `None` (binary stream).
     fn sync_marker(&self) -> Option<&[u8; 16]>;
+
+    /// Return the 10‑byte **Avro single‑object** prefix (`C3 01` magic +
+    /// little‑endian schema fingerprint) to be written **before each record**,
+    /// or `None` if the format does not use single‑object encoding.
+    ///
+    /// The default implementation returns `None`. `AvroBinaryFormat` overrides
+    /// this to return the appropriate single-object encoding prefix.
+    #[inline]
+    fn single_object_prefix(&self) -> Option<&[u8]> {
+        None
+    }

Review Comment:
   Then we could get rid of this method as well.
   
   ```suggestion
   ```



##########
arrow-avro/src/writer/format.rs:
##########
@@ -84,25 +106,87 @@ impl AvroFormat for AvroOcfFormat {
     }
 }
 
-/// Raw Avro binary streaming format (no header or footer).
+/// Raw Avro binary streaming format using **Single-Object Encoding** per 
record.
+///
+/// Each record written by the stream writer is framed with a prefix determined
+/// by the schema fingerprinting algorithm.
+///
+/// See: 
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
+/// See: 
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
 #[derive(Debug, Default)]
-pub struct AvroBinaryFormat;
+pub struct AvroBinaryFormat {
+    /// Pre-built, variable-length prefix written before each record.
+    prefix: Vec<u8>,
+}
 
 impl AvroFormat for AvroBinaryFormat {
     fn start_stream<W: Write>(
         &mut self,
         _writer: &mut W,
-        _schema: &Schema,
-        _compression: Option<CompressionCodec>,
+        schema: &Schema,
+        compression: Option<CompressionCodec>,
+        fingerprint_strategy: FingerprintStrategy,
     ) -> Result<(), ArrowError> {
-        Err(ArrowError::NotYetImplemented(
-            "avro binary format not yet implemented".to_string(),
-        ))
+        if compression.is_some() {
+            return Err(ArrowError::InvalidArgumentError(
+                "Compression not supported for Avro binary 
streaming".to_string(),
+            ));
+        }
+
+        self.prefix.clear();
+
+        match fingerprint_strategy {
+            FingerprintStrategy::ConfluentSchemaId(id) => {
+                self.prefix.push(CONFLUENT_MAGIC[0]);
+                self.prefix.extend_from_slice(&id.to_be_bytes());
+            }
+            strategy => {
+                // All other strategies use the single-object encoding format
+                self.prefix.extend_from_slice(&SINGLE_OBJECT_MAGIC);
+
+                let avro_schema = AvroSchema::try_from(schema)?;
+                let fp = match strategy {
+                    FingerprintStrategy::Rabin => avro_schema.fingerprint()?,
+                    #[cfg(feature = "md5")]
+                    FingerprintStrategy::MD5 => 
AvroSchema::generate_fingerprint(
+                        &avro_schema.schema()?,
+                        crate::schema::FingerprintAlgorithm::MD5,
+                    )?,
+                    #[cfg(feature = "sha256")]
+                    FingerprintStrategy::SHA256 => 
AvroSchema::generate_fingerprint(
+                        &avro_schema.schema()?,
+                        crate::schema::FingerprintAlgorithm::SHA256,
+                    )?,
+                    FingerprintStrategy::ConfluentSchemaId(_) => 
unreachable!(),
+                };
+
+                match fp {
+                    Fingerprint::Rabin(val) => 
self.prefix.extend_from_slice(&val.to_le_bytes()),
+                    #[cfg(feature = "md5")]
+                    Fingerprint::MD5(val) => 
self.prefix.extend_from_slice(val.as_ref()),
+                    #[cfg(feature = "sha256")]
+                    Fingerprint::SHA256(val) => 
self.prefix.extend_from_slice(val.as_ref()),
+                    Fingerprint::Id(_) => return 
Err(ArrowError::InvalidArgumentError(
+                        "ConfluentSchemaId strategy cannot be used with a 
hash-based fingerprint."
+                            .to_string(),
+                    )),
+                }
+            }
+        }

Review Comment:
   I'd move this logic to the `WriterBuilder::build` method and simplify.
   
   ```suggestion
   ```



##########
arrow-avro/src/writer/format.rs:
##########
@@ -84,25 +106,87 @@ impl AvroFormat for AvroOcfFormat {
     }
 }
 
-/// Raw Avro binary streaming format (no header or footer).
+/// Raw Avro binary streaming format using **Single-Object Encoding** per 
record.
+///
+/// Each record written by the stream writer is framed with a prefix determined
+/// by the schema fingerprinting algorithm.
+///
+/// See: 
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
+/// See: 
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
 #[derive(Debug, Default)]
-pub struct AvroBinaryFormat;
+pub struct AvroBinaryFormat {
+    /// Pre-built, variable-length prefix written before each record.
+    prefix: Vec<u8>,
+}
 
 impl AvroFormat for AvroBinaryFormat {
     fn start_stream<W: Write>(
         &mut self,
         _writer: &mut W,
-        _schema: &Schema,
-        _compression: Option<CompressionCodec>,
+        schema: &Schema,
+        compression: Option<CompressionCodec>,
+        fingerprint_strategy: FingerprintStrategy,

Review Comment:
   ```suggestion
   ```



##########
arrow-avro/src/writer/mod.rs:
##########
@@ -90,7 +99,7 @@ impl WriterBuilder {
             avro_schema.clone().json_string,
         );
         let schema = 
Arc::new(Schema::new_with_metadata(self.schema.fields().clone(), md));
-        format.start_stream(&mut writer, &schema, self.codec)?;
+        format.start_stream(&mut writer, &schema, self.codec, 
self.fingerprint_strategy)?;

Review Comment:
   Then we could do something like this to generate the `Fingerprint`:
   
   ```suggestion
           let maybe_fingerprint = if F::NEEDS_PREFIX {
               match self.fingerprint_strategy {
                   Id(id) => Some(Fingerprint::load_fingerprint_id(id)),
                   Other => 
Some(&AvroSchema::try_from(&self.schema)?.fingerprint(FingerprintAlgorithm::from(Other)?)),
               }
           } else {
               None
           };
           format.start_stream(&mut writer, &schema, self.codec)?;
   ```
   
   This will require:
   1. Updating `AvroSchema::fingerprint()` to this:
       ```rust
       /// Returns the fingerprint of the schema.
        pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> 
Result<Fingerprint, ArrowError> {
            Self::generate_fingerprint(&self.schema()?, hash_type)
        }
       ```
   2. Adding those convenience methods I mentioned in my other comment. 
   
   So much cleaner and simpler though imo. 



##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -600,9 +601,22 @@ impl RecordEncoder {
     /// Encode a `RecordBatch` using this encoder plan.
     ///
     /// Tip: Wrap `out` in a `std::io::BufWriter` to reduce the overhead of 
many small writes.
-    pub fn encode<W: Write>(&self, out: &mut W, batch: &RecordBatch) -> 
Result<(), ArrowError> {
+    pub fn encode<W: Write>(
+        &self,
+        out: &mut W,
+        batch: &RecordBatch,
+        prefix: Option<&[u8]>,
+    ) -> Result<(), ArrowError> {
         let mut column_encoders = self.prepare_for_batch(batch)?;
         for row in 0..batch.num_rows() {
+            if let Some(prefix) = prefix {
+                if !prefix.is_empty() {
+                    out.write_all(prefix).map_err(|e| {
+                        ArrowError::IoError(format!("write single-object 
prefix: {e}"), e)
+                    })?;
+                }
+            }
+
             for encoder in column_encoders.iter_mut() {
                 encoder.encode(out, row)?;
             }

Review Comment:
   Also another way to do this would be to use an inline buffer to avoid heap 
allocations, maybe something like this?
   
   ```rust
   const MAX_PREFIX: usize = 34;
   
   #[derive(Debug, Copy, Clone)]
   struct Prefix { buf: [u8; MAX_PREFIX], len: u8 }
   impl Prefix {
       #[inline] fn as_slice(&self) -> &[u8] { &self.buf[..self.len as usize] }
   }
   
   #[derive(Debug, Clone)]
   pub struct RecordEncoder {
       columns: Vec<FieldBinding>,
       prefix: Option<Prefix>,
   }
   
   impl RecordEncoder {
       pub fn encode<W: Write>(
           &self,
           out: &mut W,
           batch: &RecordBatch,
       ) -> Result<(), ArrowError> {
           let mut column_encoders = self.prepare_for_batch(batch)?;
           let n = batch.num_rows();
           match self.prefix {
               Some(prefix) => {
                   for row in 0..n {
                       out.write_all(prefix.as_slice())
                           .map_err(|e| ArrowError::IoError(format!("write 
prefix: {e}"), e))?;
                       for enc in column_encoders.iter_mut() {
                           enc.encode(out, row)?;
                       }
                   }
               }
               None => {
                   ...
               }
           }
           Ok(())
       }
   }
   ```
   
   I'm not sure if there's a massive upside to an approach like this and it's 
very likely an over-optimization. However figured it was worth calling out.



##########
arrow-avro/src/writer/format.rs:
##########
@@ -84,25 +106,87 @@ impl AvroFormat for AvroOcfFormat {
     }
 }
 
-/// Raw Avro binary streaming format (no header or footer).
+/// Raw Avro binary streaming format using **Single-Object Encoding** per 
record.
+///
+/// Each record written by the stream writer is framed with a prefix determined
+/// by the schema fingerprinting algorithm.
+///
+/// See: 
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
+/// See: 
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
 #[derive(Debug, Default)]
-pub struct AvroBinaryFormat;
+pub struct AvroBinaryFormat {
+    /// Pre-built, variable-length prefix written before each record.
+    prefix: Vec<u8>,

Review Comment:
   Instead of putting this here, we could move it up into the `RecordEncoder`.
   
   ```suggestion
   ```



##########
arrow-avro/src/schema.rs:
##########
@@ -316,6 +316,22 @@ pub struct Fixed<'a> {
     pub attributes: Attributes<'a>,
 }
 
+/// Defines the strategy for generating the per-record prefix for an Avro 
binary stream.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum FingerprintStrategy {
+    /// Use the 64-bit Rabin fingerprint (default for single-object encoding).
+    #[default]
+    Rabin,
+    /// Use a Confluent Schema Registry 32-bit ID.
+    ConfluentSchemaId(u32),
+    #[cfg(feature = "md5")]
+    /// Use the 128-bit MD5 fingerprint.
+    MD5,
+    #[cfg(feature = "sha256")]
+    /// Use the 256-bit SHA-256 fingerprint.
+    SHA256,
+}

Review Comment:
   I like this approach. This saves the user from having to deal with multiple 
knobs on the `WriterBuilder`.
   
   Would you mind adding some convenience methods to convert between 
`Fingerprint`, `FingerprintAlgorithm`, and `FingerprintStrategy`?



##########
arrow-avro/src/writer/format.rs:
##########
@@ -26,16 +29,29 @@ use std::io::Write;
 /// Format abstraction implemented by each container‐level writer.
 pub trait AvroFormat: Debug + Default {
     /// Write any bytes required at the very beginning of the output stream
+    /// (file header, etc.).
     /// Implementations **must not** write any record data.
     fn start_stream<W: Write>(
         &mut self,
         writer: &mut W,
         schema: &Schema,
         compression: Option<CompressionCodec>,
+        fingerprint_strategy: FingerprintStrategy,

Review Comment:
   Actually, I think there's a cleaner approach where we don't pass the 
`FingerprintStrategy` into the `AvroFormat`.
   
   Instead we could build the `Fingerprint` in `WriterBuilder::build` and store 
it in the `RecordEncoder` as an optional field.
   
   ```suggestion
   ```



##########
arrow-avro/src/writer/format.rs:
##########
@@ -84,25 +106,87 @@ impl AvroFormat for AvroOcfFormat {
     }
 }
 
-/// Raw Avro binary streaming format (no header or footer).
+/// Raw Avro binary streaming format using **Single-Object Encoding** per 
record.
+///
+/// Each record written by the stream writer is framed with a prefix determined
+/// by the schema fingerprinting algorithm.
+///
+/// See: 
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
+/// See: 
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
 #[derive(Debug, Default)]
-pub struct AvroBinaryFormat;
+pub struct AvroBinaryFormat {

Review Comment:
   @nathaniel-d-ef I think we'll want three formats in total here, OCF, Binary, 
and Single Object. I know the reader side doesn't 100% align, however I suspect 
it maybe more important on the writer side.
   
   This also follows the specs:
   * https://avro.apache.org/docs/1.11.1/specification/#binary-encoding
   * https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding



##########
arrow-avro/src/writer/format.rs:
##########
@@ -50,13 +66,19 @@ impl AvroFormat for AvroOcfFormat {
         writer: &mut W,
         schema: &Schema,
         compression: Option<CompressionCodec>,
+        _fingerprint_strategy: FingerprintStrategy,

Review Comment:
   ```suggestion
   ```



##########
arrow-avro/src/writer/format.rs:
##########
@@ -84,25 +106,87 @@ impl AvroFormat for AvroOcfFormat {
     }
 }
 
-/// Raw Avro binary streaming format (no header or footer).
+/// Raw Avro binary streaming format using **Single-Object Encoding** per 
record.
+///
+/// Each record written by the stream writer is framed with a prefix determined
+/// by the schema fingerprinting algorithm.
+///
+/// See: 
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
+/// See: 
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
 #[derive(Debug, Default)]
-pub struct AvroBinaryFormat;
+pub struct AvroBinaryFormat {
+    /// Pre-built, variable-length prefix written before each record.
+    prefix: Vec<u8>,
+}
 
 impl AvroFormat for AvroBinaryFormat {
     fn start_stream<W: Write>(

Review Comment:
   What you could do to enable zero cost control flow logic in 
`WriterBuilder::build` is this:
   
   ```suggestion
   impl AvroFormat for AvroBinaryFormat {
   
       const NEEDS_PREFIX: bool = true;
   
       fn start_stream<W: Write>(
   ```
   
   Then in `AvroOcfFormat`:
   
   ```rust
   impl AvroFormat for AvroOcfFormat {
   
       const NEEDS_PREFIX: bool = false;
   
       fn start_stream<W: Write>(
   ```
   
   And in the `AvroFormat` trait
   
   ```rust
   /// Format abstraction implemented by each container‐level writer.
   pub trait AvroFormat: Debug + Default {
   
       /// If `true`, the writer for this format will query 
`single_object_prefix()`
       /// and write the prefix before each record. If `false`, the writer can
       /// skip this step. This is a performance hint for the writer.
       const NEEDS_PREFIX: bool;
       
   ```
   
   That would enable you to do this in `WriterBuilder::build`
   
   ```rust
           let maybe_fp = if F::NEEDS_PREFIX {
               match self.fingerprint_strategy {
               
               }           
           }
   ```



##########
arrow-avro/src/writer/mod.rs:
##########
@@ -194,7 +204,9 @@ impl<W: Write, F: AvroFormat> Writer<W, F> {
     }
 
     fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
-        self.encoder.encode(&mut self.writer, batch)
+        self.encoder
+            .encode(&mut self.writer, batch, 
self.format.single_object_prefix())?;

Review Comment:
   ```suggestion
               .encode(&mut self.writer, batch)?;
   ```



##########
arrow-avro/src/writer/format.rs:
##########
@@ -84,25 +106,87 @@ impl AvroFormat for AvroOcfFormat {
     }
 }
 
-/// Raw Avro binary streaming format (no header or footer).
+/// Raw Avro binary streaming format using **Single-Object Encoding** per 
record.
+///
+/// Each record written by the stream writer is framed with a prefix determined
+/// by the schema fingerprinting algorithm.
+///
+/// See: 
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
+/// See: 
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
 #[derive(Debug, Default)]
-pub struct AvroBinaryFormat;
+pub struct AvroBinaryFormat {
+    /// Pre-built, variable-length prefix written before each record.
+    prefix: Vec<u8>,
+}
 
 impl AvroFormat for AvroBinaryFormat {
     fn start_stream<W: Write>(
         &mut self,
         _writer: &mut W,
-        _schema: &Schema,
-        _compression: Option<CompressionCodec>,
+        schema: &Schema,
+        compression: Option<CompressionCodec>,
+        fingerprint_strategy: FingerprintStrategy,
     ) -> Result<(), ArrowError> {
-        Err(ArrowError::NotYetImplemented(
-            "avro binary format not yet implemented".to_string(),
-        ))
+        if compression.is_some() {
+            return Err(ArrowError::InvalidArgumentError(
+                "Compression not supported for Avro binary 
streaming".to_string(),
+            ));
+        }
+
+        self.prefix.clear();
+
+        match fingerprint_strategy {
+            FingerprintStrategy::ConfluentSchemaId(id) => {
+                self.prefix.push(CONFLUENT_MAGIC[0]);
+                self.prefix.extend_from_slice(&id.to_be_bytes());
+            }
+            strategy => {
+                // All other strategies use the single-object encoding format
+                self.prefix.extend_from_slice(&SINGLE_OBJECT_MAGIC);
+
+                let avro_schema = AvroSchema::try_from(schema)?;
+                let fp = match strategy {
+                    FingerprintStrategy::Rabin => avro_schema.fingerprint()?,
+                    #[cfg(feature = "md5")]
+                    FingerprintStrategy::MD5 => 
AvroSchema::generate_fingerprint(
+                        &avro_schema.schema()?,
+                        crate::schema::FingerprintAlgorithm::MD5,
+                    )?,
+                    #[cfg(feature = "sha256")]
+                    FingerprintStrategy::SHA256 => 
AvroSchema::generate_fingerprint(
+                        &avro_schema.schema()?,
+                        crate::schema::FingerprintAlgorithm::SHA256,
+                    )?,
+                    FingerprintStrategy::ConfluentSchemaId(_) => 
unreachable!(),
+                };
+
+                match fp {
+                    Fingerprint::Rabin(val) => 
self.prefix.extend_from_slice(&val.to_le_bytes()),
+                    #[cfg(feature = "md5")]
+                    Fingerprint::MD5(val) => 
self.prefix.extend_from_slice(val.as_ref()),
+                    #[cfg(feature = "sha256")]
+                    Fingerprint::SHA256(val) => 
self.prefix.extend_from_slice(val.as_ref()),
+                    Fingerprint::Id(_) => return 
Err(ArrowError::InvalidArgumentError(
+                        "ConfluentSchemaId strategy cannot be used with a 
hash-based fingerprint."
+                            .to_string(),
+                    )),
+                }
+            }
+        }
+        Ok(())
     }
 
     fn sync_marker(&self) -> Option<&[u8; 16]> {
         None
     }
+
+    fn single_object_prefix(&self) -> Option<&[u8]> {
+        if self.prefix.is_empty() {
+            None
+        } else {
+            Some(&self.prefix)
+        }
+    }

Review Comment:
   ```suggestion
   ```



##########
arrow-avro/src/writer/mod.rs:
##########
@@ -90,7 +99,7 @@ impl WriterBuilder {
             avro_schema.clone().json_string,
         );
         let schema = 
Arc::new(Schema::new_with_metadata(self.schema.fields().clone(), md));
-        format.start_stream(&mut writer, &schema, self.codec)?;
+        format.start_stream(&mut writer, &schema, self.codec, 
self.fingerprint_strategy)?;
         let avro_root = AvroFieldBuilder::new(&avro_schema.schema()?).build()?;
         let encoder = RecordEncoderBuilder::new(&avro_root, 
schema.as_ref()).build()?;

Review Comment:
   I'd add a new `RecordEncoderBuilder::with_fingerprint(fingerprint: 
Fingerprint)` method to use here.
   
    If we also add a new `Fingerprint::make_prefix()` method like this:
   
   ```rust
   impl Fingerprint {
       pub fn make_prefix(&self) -> Vec<u8> {
           match self {
               Self::Id(id) => {
                   // Confluent wire format
                   let mut out = Vec::with_capacity(CONFLUENT_MAGIC.len() + 4);
                   out.extend_from_slice(&CONFLUENT_MAGIC);
                   out.extend_from_slice(&id.to_be_bytes());
                   out
               }
               Self::Rabin(val) => {
                   // Avro single-object (spec)
                   let mut out = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + 
8);
                   out.extend_from_slice(&SINGLE_OBJECT_MAGIC);
                   out.extend_from_slice(&val.to_le_bytes());
                   out
               }
               #[cfg(feature = "md5")]
               Self::MD5(bytes) => {
                   // Non-standard extension
                   let mut out = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + 
bytes.len());
                   out.extend_from_slice(&SINGLE_OBJECT_MAGIC);
                   out.extend_from_slice(&bytes);
                   out
               }
               #[cfg(feature = "sha256")]
               Self::SHA256(bytes) => {
                   // Non-standard extension
                   let mut out = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + 
bytes.len());
                   out.extend_from_slice(&SINGLE_OBJECT_MAGIC);
                   out.extend_from_slice(&bytes);
                   out
               }
           }
       }
   ```
   
    We could then easily construct the prefix bytes in 
`RecordEncoderBuilder::build()` and store it on the new `RecordEncoder` as an 
optional field.



##########
arrow-avro/src/writer/mod.rs:
##########
@@ -177,7 +186,8 @@ impl<W: Write, F: AvroFormat> Writer<W, F> {
 
     fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) -> 
Result<(), ArrowError> {
         let mut buf = Vec::<u8>::with_capacity(1024);
-        self.encoder.encode(&mut buf, batch)?;
+        self.encoder
+            .encode(&mut buf, batch, self.format.single_object_prefix())?;

Review Comment:
   ```suggestion
               .encode(&mut buf, batch)?;
   ```



##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -600,9 +601,22 @@ impl RecordEncoder {
     /// Encode a `RecordBatch` using this encoder plan.
     ///
     /// Tip: Wrap `out` in a `std::io::BufWriter` to reduce the overhead of 
many small writes.
-    pub fn encode<W: Write>(&self, out: &mut W, batch: &RecordBatch) -> 
Result<(), ArrowError> {
+    pub fn encode<W: Write>(
+        &self,
+        out: &mut W,
+        batch: &RecordBatch,
+        prefix: Option<&[u8]>,
+    ) -> Result<(), ArrowError> {
         let mut column_encoders = self.prepare_for_batch(batch)?;
         for row in 0..batch.num_rows() {
+            if let Some(prefix) = prefix {
+                if !prefix.is_empty() {
+                    out.write_all(prefix).map_err(|e| {
+                        ArrowError::IoError(format!("write single-object 
prefix: {e}"), e)
+                    })?;
+                }
+            }
+
             for encoder in column_encoders.iter_mut() {
                 encoder.encode(out, row)?;
             }

Review Comment:
   I'd remove the redundant conditional check and hoist the branch out of the 
loop. I'd also store the prefix on the `RecordEncoder` and get rid of the 
`prefix: Option<&[u8]>,` parameter. 
   
   ```rust
   #[derive(Debug, Clone)]
   pub struct RecordEncoder {
       columns: Vec<FieldBinding>,
       prefix: Option<Arc<[u8]>>,
   }
   ```
   
   Then you could do:
   
   ```suggestion
       pub fn encode<W: Write>(
           &self,
           out: &mut W,
           batch: &RecordBatch,
       ) -> Result<(), ArrowError> {
           let mut column_encoders = self.prepare_for_batch(batch)?;
           let n = batch.num_rows();
           match self.prefix.as_deref() {
               Some(prefix) => {
                   for row in 0..n {
                       out.write_all(prefix)
                           .map_err(|e| ArrowError::IoError(format!("write 
prefix: {e}"), e))?;
                       for enc in column_encoders.iter_mut() {
                           enc.encode(out, row)?;
                       }
                   }
               }
               None => {
                   for row in 0..n {
                       for enc in column_encoders.iter_mut() {
                           enc.encode(out, row)?;
                       }
                   }
               }
           }
           Ok(())
       }
   ```
   
   If you wanted to then abstract:
   
   ```rust
                      for enc in column_encoders.iter_mut() {
                           enc.encode(out, row)?;
                       }
   ```
   
   Into another method that may make sense. However might also be overkill for 
now since I don't see that logic being applied elsewhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to