jecsand838 commented on code in PR #8371:
URL: https://github.com/apache/arrow-rs/pull/8371#discussion_r2365816734
##########
arrow-avro/src/writer/mod.rs:
##########
@@ -48,6 +50,7 @@ pub struct WriterBuilder {
schema: Schema,
codec: Option<CompressionCodec>,
capacity: usize,
+ fingerprint_strategy: FingerprintStrategy,
Review Comment:
Because not every format will require a prefix, it maybe better for this to
be optional.
I'm on the fence about this one though since it will add minor complexities
to the default `FingerprintStrategy` logic in the `WriterBuilder::build`
method. That being said this way does seem more correct.
```suggestion
fingerprint_strategy: option<FingerprintStrategy>,
```
##########
arrow-avro/src/writer/mod.rs:
##########
@@ -57,9 +60,17 @@ impl WriterBuilder {
schema,
codec: None,
capacity: 1024,
+ fingerprint_strategy: FingerprintStrategy::default(),
}
}
+ /// Set the fingerprinting strategy for the stream writer.
+ /// This determines the per-record prefix format.
+ pub fn with_fingerprint_strategy(mut self, strategy: FingerprintStrategy)
-> Self {
+ self.fingerprint_strategy = strategy;
Review Comment:
```suggestion
self.fingerprint_strategy = Some(strategy);
```
##########
arrow-avro/src/writer/format.rs:
##########
@@ -84,20 +99,31 @@ impl AvroFormat for AvroOcfFormat {
}
}
-/// Raw Avro binary streaming format (no header or footer).
+/// Raw Avro binary streaming format using **Single-Object Encoding** per
record.
+///
+/// Each record written by the stream writer is framed with a prefix determined
+/// by the schema fingerprinting algorithm.
+///
+/// See:
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
+/// See:
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
#[derive(Debug, Default)]
-pub struct AvroBinaryFormat;
+pub struct AvroBinaryFormat {}
impl AvroFormat for AvroBinaryFormat {
+ const NEEDS_PREFIX: bool = true;
fn start_stream<W: Write>(
&mut self,
_writer: &mut W,
- _schema: &Schema,
- _compression: Option<CompressionCodec>,
+ schema: &Schema,
Review Comment:
We can probably change this back since it's still not being used.
```suggestion
_schema: &Schema,
```
##########
arrow-avro/src/writer/mod.rs:
##########
@@ -84,6 +95,17 @@ impl WriterBuilder {
Some(json) => AvroSchema::new(json.clone()),
None => AvroSchema::try_from(&self.schema)?,
};
+
+ let maybe_fingerprint = if F::NEEDS_PREFIX {
+ match self.fingerprint_strategy {
+ FingerprintStrategy::Id(id) => Some(Fingerprint::Id(id)),
+ strategy =>
Some(avro_schema.fingerprint(FingerprintAlgorithm::from(strategy))?),
+ }
+ } else {
+ None
+ };
+ let maybe_prefix = maybe_fingerprint.as_ref().map(|fp|
fp.make_prefix());
Review Comment:
This isn't being used anywhere.
```suggestion
```
##########
arrow-avro/src/writer/mod.rs:
##########
@@ -84,6 +95,17 @@ impl WriterBuilder {
Some(json) => AvroSchema::new(json.clone()),
None => AvroSchema::try_from(&self.schema)?,
};
+
+ let maybe_fingerprint = if F::NEEDS_PREFIX {
+ match self.fingerprint_strategy {
+ FingerprintStrategy::Id(id) => Some(Fingerprint::Id(id)),
+ strategy =>
Some(avro_schema.fingerprint(FingerprintAlgorithm::from(strategy))?),
+ }
Review Comment:
```suggestion
match self.fingerprint_strategy {
Some(FingerprintStrategy::Id(id)) =>
Some(Fingerprint::Id(id)),
Some(strategy) =>
Some(avro_schema.fingerprint(FingerprintAlgorithm::from(strategy))?),
None =>
Some(avro_schema.fingerprint(FingerprintAlgorithm::from(FingerprintStrategy::Rabin))?),
}
```
##########
arrow-avro/src/writer/mod.rs:
##########
@@ -57,9 +60,17 @@ impl WriterBuilder {
schema,
codec: None,
capacity: 1024,
+ fingerprint_strategy: FingerprintStrategy::default(),
Review Comment:
```suggestion
fingerprint_strategy: None,
```
##########
arrow-avro/src/schema.rs:
##########
@@ -540,6 +657,65 @@ impl Fingerprint {
pub fn load_fingerprint_id(id: u32) -> Self {
Fingerprint::Id(u32::from_be(id))
}
+
+ /// Constructs a serialized prefix represented as a `Vec<u8>` based on the
variant of the enum.
+ ///
+ /// This method serializes data in different formats depending on the
variant of `self`:
+ /// - **`Id(id)`**: Uses the Confluent wire format, which includes a
predefined magic header (`CONFLUENT_MAGIC`)
+ /// followed by the big-endian byte representation of the `id`.
+ /// - **`Rabin(val)`**: Uses the Avro single-object specification format.
This includes a different magic header
+ /// (`SINGLE_OBJECT_MAGIC`) followed by the little-endian byte
representation of the `val`.
+ /// - **`MD5(bytes)`** (optional, `md5` feature enabled): A non-standard
extension that adds the
+ /// `SINGLE_OBJECT_MAGIC` header followed by the provided `bytes`.
+ /// - **`SHA256(bytes)`** (optional, `sha256` feature enabled): Similar to
the `MD5` variant, this is
+ /// a non-standard extension that attaches the `SINGLE_OBJECT_MAGIC`
header followed by the given `bytes`.
+ ///
+ /// # Returns
+ ///
+ /// A `Prefix` containing the serialized prefix data.
+ ///
+ /// # Features
+ ///
+ /// - You can optionally enable the `md5` feature to include the `MD5`
variant.
+ /// - You can optionally enable the `sha256` feature to include the
`SHA256` variant.
+ ///
+ pub fn make_prefix(&self) -> Prefix {
+ let mut buf = [0u8; MAX_PREFIX_LEN];
+ let len = match self {
+ Self::Id(id) => {
+ let prefix_slice = &mut buf[..5];
+ prefix_slice[..1].copy_from_slice(&CONFLUENT_MAGIC);
+ prefix_slice[1..5].copy_from_slice(&id.to_be_bytes());
+ 5
+ }
+ Self::Rabin(val) => {
+ let prefix_slice = &mut buf[..10];
+ prefix_slice[..2].copy_from_slice(&SINGLE_OBJECT_MAGIC);
+ prefix_slice[2..10].copy_from_slice(&val.to_le_bytes());
+ 10
+ }
+ #[cfg(feature = "md5")]
+ Self::MD5(bytes) => {
+ const LEN: usize = 2 + 16;
+ let prefix_slice = &mut buf[..LEN];
+ prefix_slice[..2].copy_from_slice(&SINGLE_OBJECT_MAGIC);
+ prefix_slice[2..LEN].copy_from_slice(bytes);
+ LEN
+ }
+ #[cfg(feature = "sha256")]
+ Self::SHA256(bytes) => {
+ const LEN: usize = 2 + 32;
+ let prefix_slice = &mut buf[..LEN];
+ prefix_slice[..2].copy_from_slice(&SINGLE_OBJECT_MAGIC);
+ prefix_slice[2..LEN].copy_from_slice(bytes);
+ LEN
+ }
+ };
+ Prefix {
+ buf,
+ len: len as u8,
+ }
+ }
}
Review Comment:
Minor nit:
```suggestion
pub fn make_prefix(&self) -> Prefix {
let mut buf = [0u8; MAX_PREFIX_LEN];
let len = match self {
Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC,
&val.to_be_bytes()),
Self::Rabin(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC,
&val.to_le_bytes()),
#[cfg(feature = "md5")]
Self::MD5(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC,
&val),
#[cfg(feature = "sha256")]
Self::SHA256(val) => write_prefix(&mut buf,
&SINGLE_OBJECT_MAGIC, &val),
};
Prefix { buf, len }
}
}
#[inline]
fn write_prefix<const MAGIC_LEN: usize, const PAYLOAD_LEN: usize>(
buf: &mut [u8; MAX_PREFIX_LEN],
magic: &[u8; MAGIC_LEN],
payload: &[u8; PAYLOAD_LEN],
) -> u8 {
debug_assert!(MAGIC_LEN + PAYLOAD_LEN <= MAX_PREFIX_LEN);
let total = MAGIC_LEN + PAYLOAD_LEN;
let prefix_slice = &mut buf[..total];
prefix_slice[..MAGIC_LEN].copy_from_slice(magic);
prefix_slice[MAGIC_LEN..total].copy_from_slice(payload);
total as u8
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]