scovich commented on code in PR #7935:
URL: https://github.com/apache/arrow-rs/pull/7935#discussion_r2216465127


##########
parquet-variant/src/builder.rs:
##########
@@ -598,6 +599,49 @@ impl ParentState<'_> {
             }
         }
     }
+
+    // returns the beginning offset of buffer for the parent if it is object 
builder, else 0.
+    // for object builder will reuse the buffer from the parent, this is 
needed for `finish`
+    // which needs the relative offset from the current variant.
+    fn object_start_offset(&self) -> usize {
+        match self {
+            ParentState::Object {
+                object_start_offset,
+                ..
+            } => *object_start_offset,
+            _ => 0,
+        }
+    }
+
+    /// Return mutable references to the buffer and metadata builder that this
+    /// parent state is using.
+    fn buffer_and_metadata_builder(&mut self) -> (&mut ValueBuffer, &mut 
MetadataBuilder) {
+        match self {
+            ParentState::Variant {
+                buffer,
+                metadata_builder,
+            } => (buffer, metadata_builder),
+            ParentState::List {
+                buffer,
+                metadata_builder,
+                ..
+            } => (buffer, metadata_builder),
+            ParentState::Object {
+                buffer,
+                metadata_builder,
+                ..
+            } => (buffer, metadata_builder),

Review Comment:
   I _think_ rust allows this:
   ```suggestion
               ParentState::Variant {
                   buffer,
                   metadata_builder,
               } |
               ParentState::List {
                   buffer,
                   metadata_builder,
                   ..
               } |
               ParentState::Object {
                   buffer,
                   metadata_builder,
                   ..
               } => (buffer, metadata_builder),
   ```
   Not clear whether that's better or worse than the current code. 
   
   I'm also not sure how it would `fmt`.



##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
             )));
         }
 
-        let data_size = self.buffer.offset();
-        let num_fields = self.fields.len();
-        let is_large = num_fields > u8::MAX as usize;
+        let metadata_builder = self.parent_state.metadata_builder();
 
         self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
-            let key_a = &metadata_builder.field_name(field_a_id as usize);
-            let key_b = &metadata_builder.field_name(field_b_id as usize);
-            key_a.cmp(key_b)
+            let field_a_name = metadata_builder.field_name(field_a_id as 
usize);
+            let field_b_name = metadata_builder.field_name(field_b_id as 
usize);
+            field_a_name.cmp(field_b_name)
         });
-
         let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
         let id_size = int_size(max_id as usize);
-        let offset_size = int_size(data_size);
 
-        // Get parent's buffer
         let parent_buffer = self.parent_state.buffer();
-        let starting_offset = parent_buffer.offset();
+        let current_offset = parent_buffer.offset();
+        // current object starts from `object_start_offset`
+        let data_size = current_offset - self.object_start_offset;
+        let offset_size = int_size(data_size);
 
-        // Write header
+        let num_fields = self.fields.len();
+        let is_large = num_fields > u8::MAX as usize;
+
+        let header_size = 1 + // header byte
+            (if is_large { 4 } else { 1 }) + // num_fields
+            (num_fields * id_size as usize) + // field IDs
+            ((num_fields + 1) * offset_size as usize); // field offsets + 
data_size
+
+        let starting_offset = self.object_start_offset;
+
+        // Shift existing data to make room for the header
+        let buffer = parent_buffer.inner_mut();
+        buffer.splice(starting_offset..starting_offset, vec![0u8; 
header_size]);
+
+        // Write header at the original start position
+        let mut header_pos = starting_offset;
+
+        // Write header byte
         let header = object_header(is_large, id_size, offset_size);
-        parent_buffer.append_header(header, is_large, num_fields);
+        buffer[header_pos] = header;
+        header_pos += 1;
+
+        // Write number of fields
+        if is_large {
+            buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as 
u32).to_le_bytes());
+            header_pos += 4;
+        } else {
+            buffer[header_pos] = num_fields as u8;
+            header_pos += 1;
+        }
 
-        // Write field IDs (sorted order)
-        let ids = self.fields.keys().map(|id| *id as usize);
-        parent_buffer.append_offset_array(ids, None, id_size);
+        // Write field IDs
+        for (&field_id, _) in &self.fields {
+            let id_bytes = (field_id as usize).to_le_bytes();
+            buffer[header_pos..header_pos + id_size as usize]
+                .copy_from_slice(&id_bytes[..id_size as usize]);
+            header_pos += id_size as usize;
+        }
 
-        // Write the field offset array, followed by the value bytes
-        let offsets = std::mem::take(&mut self.fields).into_values();
-        parent_buffer.append_offset_array(offsets, Some(data_size), 
offset_size);
-        parent_buffer.append_slice(self.buffer.inner());
-        self.parent_state.finish(starting_offset);
+        // Write field offsets (adjusted for header)
+        for (_, &relative_offset) in &self.fields {

Review Comment:
   ```suggestion
           for relative_offset in self.fields.values() {
   ```



##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
             )));
         }
 
-        let data_size = self.buffer.offset();
-        let num_fields = self.fields.len();
-        let is_large = num_fields > u8::MAX as usize;
+        let metadata_builder = self.parent_state.metadata_builder();
 
         self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
-            let key_a = &metadata_builder.field_name(field_a_id as usize);
-            let key_b = &metadata_builder.field_name(field_b_id as usize);
-            key_a.cmp(key_b)
+            let field_a_name = metadata_builder.field_name(field_a_id as 
usize);
+            let field_b_name = metadata_builder.field_name(field_b_id as 
usize);
+            field_a_name.cmp(field_b_name)
         });
-
         let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
         let id_size = int_size(max_id as usize);
-        let offset_size = int_size(data_size);
 
-        // Get parent's buffer
         let parent_buffer = self.parent_state.buffer();
-        let starting_offset = parent_buffer.offset();
+        let current_offset = parent_buffer.offset();
+        // current object starts from `object_start_offset`
+        let data_size = current_offset - self.object_start_offset;
+        let offset_size = int_size(data_size);
 
-        // Write header
+        let num_fields = self.fields.len();
+        let is_large = num_fields > u8::MAX as usize;
+
+        let header_size = 1 + // header byte
+            (if is_large { 4 } else { 1 }) + // num_fields
+            (num_fields * id_size as usize) + // field IDs
+            ((num_fields + 1) * offset_size as usize); // field offsets + 
data_size
+
+        let starting_offset = self.object_start_offset;
+
+        // Shift existing data to make room for the header
+        let buffer = parent_buffer.inner_mut();
+        buffer.splice(starting_offset..starting_offset, vec![0u8; 
header_size]);
+
+        // Write header at the original start position
+        let mut header_pos = starting_offset;
+
+        // Write header byte
         let header = object_header(is_large, id_size, offset_size);
-        parent_buffer.append_header(header, is_large, num_fields);
+        buffer[header_pos] = header;
+        header_pos += 1;
+
+        // Write number of fields
+        if is_large {
+            buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as 
u32).to_le_bytes());
+            header_pos += 4;
+        } else {
+            buffer[header_pos] = num_fields as u8;
+            header_pos += 1;
+        }
 
-        // Write field IDs (sorted order)
-        let ids = self.fields.keys().map(|id| *id as usize);
-        parent_buffer.append_offset_array(ids, None, id_size);
+        // Write field IDs
+        for (&field_id, _) in &self.fields {
+            let id_bytes = (field_id as usize).to_le_bytes();
+            buffer[header_pos..header_pos + id_size as usize]
+                .copy_from_slice(&id_bytes[..id_size as usize]);
+            header_pos += id_size as usize;
+        }
 
-        // Write the field offset array, followed by the value bytes
-        let offsets = std::mem::take(&mut self.fields).into_values();
-        parent_buffer.append_offset_array(offsets, Some(data_size), 
offset_size);
-        parent_buffer.append_slice(self.buffer.inner());
-        self.parent_state.finish(starting_offset);
+        // Write field offsets (adjusted for header)
+        for (_, &relative_offset) in &self.fields {
+            let offset_bytes = relative_offset.to_le_bytes();
+            buffer[header_pos..header_pos + offset_size as usize]
+                .copy_from_slice(&offset_bytes[..offset_size as usize]);
+            header_pos += offset_size as usize;
+        }
+
+        // Write data_size
+        let data_size_bytes = data_size.to_le_bytes();
+        buffer[header_pos..header_pos + offset_size as usize]
+            .copy_from_slice(&data_size_bytes[..offset_size as usize]);
+
+        let start_offset_shift = self.parent_state.object_start_offset();
+        self.parent_state
+            .finish(starting_offset - start_offset_shift);

Review Comment:
   Why make the caller of `ParentState::finish` extract the start offset? Seems 
like parent state can do that more easily and reliably on its own?



##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
             )));
         }
 
-        let data_size = self.buffer.offset();
-        let num_fields = self.fields.len();
-        let is_large = num_fields > u8::MAX as usize;
+        let metadata_builder = self.parent_state.metadata_builder();
 
         self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
-            let key_a = &metadata_builder.field_name(field_a_id as usize);
-            let key_b = &metadata_builder.field_name(field_b_id as usize);
-            key_a.cmp(key_b)
+            let field_a_name = metadata_builder.field_name(field_a_id as 
usize);
+            let field_b_name = metadata_builder.field_name(field_b_id as 
usize);
+            field_a_name.cmp(field_b_name)
         });
-
         let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
         let id_size = int_size(max_id as usize);
-        let offset_size = int_size(data_size);
 
-        // Get parent's buffer
         let parent_buffer = self.parent_state.buffer();
-        let starting_offset = parent_buffer.offset();
+        let current_offset = parent_buffer.offset();
+        // current object starts from `object_start_offset`
+        let data_size = current_offset - self.object_start_offset;
+        let offset_size = int_size(data_size);
 
-        // Write header
+        let num_fields = self.fields.len();
+        let is_large = num_fields > u8::MAX as usize;
+
+        let header_size = 1 + // header byte
+            (if is_large { 4 } else { 1 }) + // num_fields
+            (num_fields * id_size as usize) + // field IDs
+            ((num_fields + 1) * offset_size as usize); // field offsets + 
data_size
+
+        let starting_offset = self.object_start_offset;
+
+        // Shift existing data to make room for the header
+        let buffer = parent_buffer.inner_mut();
+        buffer.splice(starting_offset..starting_offset, vec![0u8; 
header_size]);

Review Comment:
   aside: I'd be very impressed if the compiler is smart enough to optimize 
away the allocation in `vec![0u8; header_size].into_iter()`, which would 
potentially cause the above to run slower than the current code.



##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
             )));
         }
 
-        let data_size = self.buffer.offset();
-        let num_fields = self.fields.len();
-        let is_large = num_fields > u8::MAX as usize;
+        let metadata_builder = self.parent_state.metadata_builder();
 
         self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
-            let key_a = &metadata_builder.field_name(field_a_id as usize);
-            let key_b = &metadata_builder.field_name(field_b_id as usize);
-            key_a.cmp(key_b)
+            let field_a_name = metadata_builder.field_name(field_a_id as 
usize);
+            let field_b_name = metadata_builder.field_name(field_b_id as 
usize);
+            field_a_name.cmp(field_b_name)
         });
-
         let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
         let id_size = int_size(max_id as usize);
-        let offset_size = int_size(data_size);
 
-        // Get parent's buffer
         let parent_buffer = self.parent_state.buffer();
-        let starting_offset = parent_buffer.offset();
+        let current_offset = parent_buffer.offset();
+        // current object starts from `object_start_offset`
+        let data_size = current_offset - self.object_start_offset;
+        let offset_size = int_size(data_size);
 
-        // Write header
+        let num_fields = self.fields.len();
+        let is_large = num_fields > u8::MAX as usize;
+
+        let header_size = 1 + // header byte
+            (if is_large { 4 } else { 1 }) + // num_fields
+            (num_fields * id_size as usize) + // field IDs
+            ((num_fields + 1) * offset_size as usize); // field offsets + 
data_size
+
+        let starting_offset = self.object_start_offset;
+
+        // Shift existing data to make room for the header
+        let buffer = parent_buffer.inner_mut();
+        buffer.splice(starting_offset..starting_offset, vec![0u8; 
header_size]);

Review Comment:
   Alternatively -- that small allocation apparently isn't hurting performance. 
What if we create the temp vec with initial capacity of `header_size` and then 
populate it with the actual header info, field ids, and offsets before splicing 
it in. That way, an incorrect `header_size` calculation would not impact 
correctness:
   ```rust
   let mut bytes_to_splice = Vec::with_capacity(header_size);
   bytes_to_splice.push(header_byte);
   if is_large {
       bytes_to_splice.extend((num_fields as u32).to_le_bytes());
   } else {
       bytes_to_splice.push(num_fields as u8);
   }
   for &field_id in self.fields.keys() {
       bytes_to_splice.extend(field_id.to_le_bytes().into_iter().take(id_size));
   }
   for &offset in self.fields.values() {
       bytes_to_splice.extend((offset as 
u32).to_le_bytes().into_iter().take(offset_size));
   }
   bytes_to_splice.extend((data_size as 
u32).to_le_bytes().into_iter().take(offset_size));
   buffer.splice(starting_offset..starting_offset, bytes_to_splice);
   ```



##########
parquet-variant/src/builder.rs:
##########
@@ -598,6 +599,49 @@ impl ParentState<'_> {
             }
         }
     }
+
+    // returns the beginning offset of buffer for the parent if it is object 
builder, else 0.
+    // for object builder will reuse the buffer from the parent, this is 
needed for `finish`
+    // which needs the relative offset from the current variant.
+    fn object_start_offset(&self) -> usize {

Review Comment:
   Doesn't variant array also have a starting offset?



##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
             )));
         }
 
-        let data_size = self.buffer.offset();
-        let num_fields = self.fields.len();
-        let is_large = num_fields > u8::MAX as usize;
+        let metadata_builder = self.parent_state.metadata_builder();
 
         self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
-            let key_a = &metadata_builder.field_name(field_a_id as usize);
-            let key_b = &metadata_builder.field_name(field_b_id as usize);
-            key_a.cmp(key_b)
+            let field_a_name = metadata_builder.field_name(field_a_id as 
usize);
+            let field_b_name = metadata_builder.field_name(field_b_id as 
usize);
+            field_a_name.cmp(field_b_name)
         });
-
         let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
         let id_size = int_size(max_id as usize);
-        let offset_size = int_size(data_size);
 
-        // Get parent's buffer
         let parent_buffer = self.parent_state.buffer();
-        let starting_offset = parent_buffer.offset();
+        let current_offset = parent_buffer.offset();
+        // current object starts from `object_start_offset`
+        let data_size = current_offset - self.object_start_offset;
+        let offset_size = int_size(data_size);
 
-        // Write header
+        let num_fields = self.fields.len();
+        let is_large = num_fields > u8::MAX as usize;
+
+        let header_size = 1 + // header byte
+            (if is_large { 4 } else { 1 }) + // num_fields
+            (num_fields * id_size as usize) + // field IDs
+            ((num_fields + 1) * offset_size as usize); // field offsets + 
data_size

Review Comment:
   One thing I don't love about this approach is the independent calculation of 
the splice size vs. the bytes that actually get inserted. If they ever 
disagreed... badness as the insertions underflow or overflow the splice. 
   
   Ideally, we could produce an iterator that emits the desired bytes, and the 
splice itself can guarantee correct behavior. But for that to work the iterator 
would need to provide an accurate lower bound, so which rules out 
`std::iter::from_fn`. Even if we did craft a custom iterator, computing its 
lower bound would basically be this same calculation all over again. We could 
also chain together a bunch of iterators, which preserves the lower bound, but 
somehow I doubt that would be efficient.



##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
             )));
         }
 
-        let data_size = self.buffer.offset();
-        let num_fields = self.fields.len();
-        let is_large = num_fields > u8::MAX as usize;
+        let metadata_builder = self.parent_state.metadata_builder();
 
         self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
-            let key_a = &metadata_builder.field_name(field_a_id as usize);
-            let key_b = &metadata_builder.field_name(field_b_id as usize);
-            key_a.cmp(key_b)
+            let field_a_name = metadata_builder.field_name(field_a_id as 
usize);
+            let field_b_name = metadata_builder.field_name(field_b_id as 
usize);
+            field_a_name.cmp(field_b_name)
         });
-
         let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
         let id_size = int_size(max_id as usize);
-        let offset_size = int_size(data_size);
 
-        // Get parent's buffer
         let parent_buffer = self.parent_state.buffer();
-        let starting_offset = parent_buffer.offset();
+        let current_offset = parent_buffer.offset();
+        // current object starts from `object_start_offset`
+        let data_size = current_offset - self.object_start_offset;
+        let offset_size = int_size(data_size);
 
-        // Write header
+        let num_fields = self.fields.len();
+        let is_large = num_fields > u8::MAX as usize;
+
+        let header_size = 1 + // header byte
+            (if is_large { 4 } else { 1 }) + // num_fields
+            (num_fields * id_size as usize) + // field IDs
+            ((num_fields + 1) * offset_size as usize); // field offsets + 
data_size
+
+        let starting_offset = self.object_start_offset;
+
+        // Shift existing data to make room for the header
+        let buffer = parent_buffer.inner_mut();
+        buffer.splice(starting_offset..starting_offset, vec![0u8; 
header_size]);
+
+        // Write header at the original start position
+        let mut header_pos = starting_offset;
+
+        // Write header byte
         let header = object_header(is_large, id_size, offset_size);
-        parent_buffer.append_header(header, is_large, num_fields);
+        buffer[header_pos] = header;
+        header_pos += 1;
+
+        // Write number of fields
+        if is_large {
+            buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as 
u32).to_le_bytes());
+            header_pos += 4;
+        } else {
+            buffer[header_pos] = num_fields as u8;
+            header_pos += 1;
+        }
 
-        // Write field IDs (sorted order)
-        let ids = self.fields.keys().map(|id| *id as usize);
-        parent_buffer.append_offset_array(ids, None, id_size);
+        // Write field IDs
+        for (&field_id, _) in &self.fields {

Review Comment:
   ```suggestion
           for field_id in self.fields.keys() {
   ```



##########
parquet-variant/src/builder.rs:
##########
@@ -1317,7 +1414,15 @@ impl<'a> ObjectBuilder<'a> {
 /// This is to ensure that the object is always finalized before its parent 
builder
 /// is finalized.
 impl Drop for ObjectBuilder<'_> {
-    fn drop(&mut self) {}
+    fn drop(&mut self) {
+        // truncate the buffer if the `finish` method has not been called.
+        if !self.has_been_finished {
+            self.parent_state
+                .buffer()
+                .inner_mut()
+                .truncate(self.object_start_offset);
+        }

Review Comment:
   While we're at it, we should truncate the `MetadataBuilder` back to the size 
it had when we started, to clean up any new field ids the failed builder might 
have created.



##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
             )));
         }
 
-        let data_size = self.buffer.offset();
-        let num_fields = self.fields.len();
-        let is_large = num_fields > u8::MAX as usize;
+        let metadata_builder = self.parent_state.metadata_builder();
 
         self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
-            let key_a = &metadata_builder.field_name(field_a_id as usize);
-            let key_b = &metadata_builder.field_name(field_b_id as usize);
-            key_a.cmp(key_b)
+            let field_a_name = metadata_builder.field_name(field_a_id as 
usize);
+            let field_b_name = metadata_builder.field_name(field_b_id as 
usize);
+            field_a_name.cmp(field_b_name)
         });
-
         let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);

Review Comment:
   aside: Rather than pay a traversal over the list of fields, I would just use 
the metadata builder's current length as a _very_ cheap to compute upper bound. 
It will almost always be a tight upper bound as well -- it would take a pretty 
carefully crafted object to use only the early/small field ids of a large 
dictionary, such that the dictionary length needs more encoding bytes than the 
largest field ids of this object.



##########
parquet-variant/src/builder.rs:
##########
@@ -598,6 +599,49 @@ impl ParentState<'_> {
             }
         }
     }
+
+    // returns the beginning offset of buffer for the parent if it is object 
builder, else 0.
+    // for object builder will reuse the buffer from the parent, this is 
needed for `finish`
+    // which needs the relative offset from the current variant.
+    fn object_start_offset(&self) -> usize {
+        match self {
+            ParentState::Object {
+                object_start_offset,
+                ..
+            } => *object_start_offset,
+            _ => 0,
+        }
+    }
+
+    /// Return mutable references to the buffer and metadata builder that this
+    /// parent state is using.
+    fn buffer_and_metadata_builder(&mut self) -> (&mut ValueBuffer, &mut 
MetadataBuilder) {
+        match self {
+            ParentState::Variant {
+                buffer,
+                metadata_builder,
+            } => (buffer, metadata_builder),
+            ParentState::List {
+                buffer,
+                metadata_builder,
+                ..
+            } => (buffer, metadata_builder),
+            ParentState::Object {
+                buffer,
+                metadata_builder,
+                ..
+            } => (buffer, metadata_builder),
+        }
+    }
+
+    // return the offset of the underlying buffer at the time of calling this 
method.
+    fn buffer_current_offset(&self) -> usize {
+        match self {
+            ParentState::Variant { buffer, .. } => buffer.offset(),
+            ParentState::Object { buffer, .. } => buffer.offset(),
+            ParentState::List { buffer, .. } => buffer.offset(),
+        }

Review Comment:
   As above, can reduce redundancy by 
   ```suggestion
           match self {
               ParentState::Variant { buffer, .. }
                   | ParentState::Object { buffer, .. } 
                   | ParentState::List { buffer, .. } => buffer.offset(),
           }
   ```



##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
             )));
         }
 
-        let data_size = self.buffer.offset();
-        let num_fields = self.fields.len();
-        let is_large = num_fields > u8::MAX as usize;
+        let metadata_builder = self.parent_state.metadata_builder();
 
         self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
-            let key_a = &metadata_builder.field_name(field_a_id as usize);
-            let key_b = &metadata_builder.field_name(field_b_id as usize);
-            key_a.cmp(key_b)
+            let field_a_name = metadata_builder.field_name(field_a_id as 
usize);
+            let field_b_name = metadata_builder.field_name(field_b_id as 
usize);
+            field_a_name.cmp(field_b_name)
         });
-
         let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
         let id_size = int_size(max_id as usize);
-        let offset_size = int_size(data_size);
 
-        // Get parent's buffer
         let parent_buffer = self.parent_state.buffer();
-        let starting_offset = parent_buffer.offset();
+        let current_offset = parent_buffer.offset();
+        // current object starts from `object_start_offset`
+        let data_size = current_offset - self.object_start_offset;
+        let offset_size = int_size(data_size);
 
-        // Write header
+        let num_fields = self.fields.len();
+        let is_large = num_fields > u8::MAX as usize;
+
+        let header_size = 1 + // header byte
+            (if is_large { 4 } else { 1 }) + // num_fields
+            (num_fields * id_size as usize) + // field IDs
+            ((num_fields + 1) * offset_size as usize); // field offsets + 
data_size
+
+        let starting_offset = self.object_start_offset;
+
+        // Shift existing data to make room for the header
+        let buffer = parent_buffer.inner_mut();
+        buffer.splice(starting_offset..starting_offset, vec![0u8; 
header_size]);

Review Comment:
   Can avoid even this allocation:
   ```suggestion
           buffer.splice(starting_offset..starting_offset, 
std::iter::repeat_n(0u8, header_size));
   ```



##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
             )));
         }
 
-        let data_size = self.buffer.offset();
-        let num_fields = self.fields.len();
-        let is_large = num_fields > u8::MAX as usize;
+        let metadata_builder = self.parent_state.metadata_builder();
 
         self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
-            let key_a = &metadata_builder.field_name(field_a_id as usize);
-            let key_b = &metadata_builder.field_name(field_b_id as usize);
-            key_a.cmp(key_b)
+            let field_a_name = metadata_builder.field_name(field_a_id as 
usize);
+            let field_b_name = metadata_builder.field_name(field_b_id as 
usize);
+            field_a_name.cmp(field_b_name)
         });
-
         let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
         let id_size = int_size(max_id as usize);
-        let offset_size = int_size(data_size);
 
-        // Get parent's buffer
         let parent_buffer = self.parent_state.buffer();
-        let starting_offset = parent_buffer.offset();
+        let current_offset = parent_buffer.offset();
+        // current object starts from `object_start_offset`
+        let data_size = current_offset - self.object_start_offset;
+        let offset_size = int_size(data_size);
 
-        // Write header
+        let num_fields = self.fields.len();
+        let is_large = num_fields > u8::MAX as usize;
+
+        let header_size = 1 + // header byte
+            (if is_large { 4 } else { 1 }) + // num_fields
+            (num_fields * id_size as usize) + // field IDs
+            ((num_fields + 1) * offset_size as usize); // field offsets + 
data_size

Review Comment:
   ... but again, I worry that wrapping up all those for-loops inside an 
iterator for direct splicing will turn out to be a lot slower than the 
splice-then-overwrite approach the code currently takes.



##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
             )));
         }
 
-        let data_size = self.buffer.offset();
-        let num_fields = self.fields.len();
-        let is_large = num_fields > u8::MAX as usize;
+        let metadata_builder = self.parent_state.metadata_builder();
 
         self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
-            let key_a = &metadata_builder.field_name(field_a_id as usize);
-            let key_b = &metadata_builder.field_name(field_b_id as usize);
-            key_a.cmp(key_b)
+            let field_a_name = metadata_builder.field_name(field_a_id as 
usize);
+            let field_b_name = metadata_builder.field_name(field_b_id as 
usize);
+            field_a_name.cmp(field_b_name)
         });
-
         let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
         let id_size = int_size(max_id as usize);
-        let offset_size = int_size(data_size);
 
-        // Get parent's buffer
         let parent_buffer = self.parent_state.buffer();
-        let starting_offset = parent_buffer.offset();
+        let current_offset = parent_buffer.offset();
+        // current object starts from `object_start_offset`
+        let data_size = current_offset - self.object_start_offset;
+        let offset_size = int_size(data_size);
 
-        // Write header
+        let num_fields = self.fields.len();
+        let is_large = num_fields > u8::MAX as usize;
+
+        let header_size = 1 + // header byte
+            (if is_large { 4 } else { 1 }) + // num_fields
+            (num_fields * id_size as usize) + // field IDs
+            ((num_fields + 1) * offset_size as usize); // field offsets + 
data_size

Review Comment:
   Something like might _almost_ work?
   ```rust
   let field_ids = self.fields.keys().flat_map(|field_id| {
       (field_id as usize).to_le_bytes().into_iter().take(id_size)
   });
   
   let offsets = self.fields.values().flat_map(|offset| {
       offset.to_le_bytes().into_iter().take(offset_size)
   });
   
   let num_fields = num_fields.to_le_bytes().take(if is_large 4 else 1);
   
   let header_and_num_fields = std::iter::once(header_byte).chain(num_fields);
   let field_ids_and_offsets = field_ids.chain(offsets);
   let bytes_to_splice = header_and_num_fields.chain(field_ids_and_offsets);
   buffer.splice(starting_offset..starting_offset, bytes_to_splice);
   ```
   ... but unfortunately `Iterator::flat_map` does _not_ (honestly, cannot) 
compute an accurate lower bound size hint. So the `splice` call would end up 
having to allocate an internal temp buffer.



##########
parquet-variant/src/builder.rs:
##########
@@ -598,6 +599,49 @@ impl ParentState<'_> {
             }
         }
     }
+
+    // returns the beginning offset of buffer for the parent if it is object 
builder, else 0.
+    // for object builder will reuse the buffer from the parent, this is 
needed for `finish`
+    // which needs the relative offset from the current variant.
+    fn object_start_offset(&self) -> usize {
+        match self {
+            ParentState::Object {
+                object_start_offset,
+                ..
+            } => *object_start_offset,
+            _ => 0,
+        }
+    }
+
+    /// Return mutable references to the buffer and metadata builder that this
+    /// parent state is using.
+    fn buffer_and_metadata_builder(&mut self) -> (&mut ValueBuffer, &mut 
MetadataBuilder) {
+        match self {
+            ParentState::Variant {
+                buffer,
+                metadata_builder,
+            } => (buffer, metadata_builder),
+            ParentState::List {
+                buffer,
+                metadata_builder,
+                ..
+            } => (buffer, metadata_builder),
+            ParentState::Object {
+                buffer,
+                metadata_builder,
+                ..
+            } => (buffer, metadata_builder),
+        }
+    }
+
+    // return the offset of the underlying buffer at the time of calling this 
method.
+    fn buffer_current_offset(&self) -> usize {
+        match self {
+            ParentState::Variant { buffer, .. } => buffer.offset(),
+            ParentState::Object { buffer, .. } => buffer.offset(),
+            ParentState::List { buffer, .. } => buffer.offset(),
+        }

Review Comment:
   Actually... we can just invoke `ParentState::buffer`?
   ```suggestion
           self.buffer().offset()
   ```



##########
parquet-variant/src/builder.rs:
##########
@@ -1275,38 +1330,80 @@ impl<'a> ObjectBuilder<'a> {
             )));
         }
 
-        let data_size = self.buffer.offset();
-        let num_fields = self.fields.len();
-        let is_large = num_fields > u8::MAX as usize;
+        let metadata_builder = self.parent_state.metadata_builder();
 
         self.fields.sort_by(|&field_a_id, _, &field_b_id, _| {
-            let key_a = &metadata_builder.field_name(field_a_id as usize);
-            let key_b = &metadata_builder.field_name(field_b_id as usize);
-            key_a.cmp(key_b)
+            let field_a_name = metadata_builder.field_name(field_a_id as 
usize);
+            let field_b_name = metadata_builder.field_name(field_b_id as 
usize);
+            field_a_name.cmp(field_b_name)
         });
-
         let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0);
-
         let id_size = int_size(max_id as usize);
-        let offset_size = int_size(data_size);
 
-        // Get parent's buffer
         let parent_buffer = self.parent_state.buffer();
-        let starting_offset = parent_buffer.offset();
+        let current_offset = parent_buffer.offset();
+        // current object starts from `object_start_offset`
+        let data_size = current_offset - self.object_start_offset;
+        let offset_size = int_size(data_size);
 
-        // Write header
+        let num_fields = self.fields.len();
+        let is_large = num_fields > u8::MAX as usize;
+
+        let header_size = 1 + // header byte
+            (if is_large { 4 } else { 1 }) + // num_fields
+            (num_fields * id_size as usize) + // field IDs
+            ((num_fields + 1) * offset_size as usize); // field offsets + 
data_size

Review Comment:
   I just realized -- a custom iterator that computes its size hint is still 
safer than the current code, because an incorrect size hint won't corrupt any 
data. It will just require extra allocations and/or byte shifting. There's 
still the question of performance tho. If it's drastically slower the extra 
safety probably wouldn't be worth it.
   
   Another possibility is to take a hybrid approach -- rely on 
`Iterator::chain` for most of the heavy lifting, but define a custom iterator 
for emitting offset/field_id arrays:
   
   ```rust
   let num_fields = num_fields.to_le_bytes().take(if is_large 4 else 1);
   let header_and_num_fields = std::iter::once(header_byte).chain(num_fields);
   
   let field_ids = PackedU32Iterator::new(id_size, self.fields.keys().copied());
   let offsets = PackedU32Iterator::new(offset_size, 
self.fields.values().map(|offset| *offset as u32));
   let field_ids_and_offsets = field_ids.chain(offsets);
   
   let bytes_to_splice = header_and_num_fields.chain(field_ids_and_offsets);
   buffer.splice(starting_offset..starting_offset, bytes_to_splice);
   ```
   
   <details>
   <summary>PackedU32Iterator</summary>
   
   ```rust
   struct PackedU32Iterator<T: impl Iterator<Item = [u8; 4]>> {
       packed_bytes: usize,
       iterator: T,
       current_item: [u8; 4],
       current_byte: usize, // 0..3
   }
   
   impl<T: impl Iterator<Item = [u8; 4]>> PackedU32Iterator<T> {
       fn new(packed_bytes: usize, iterator: T) -> Self {
           // eliminate corner cases in `next` by initializing with a fake 
already-consumed "first" item
           Self {
               packed_bytes,
               iterator,
               current_item: [0; 4],
               current_byte: packed_bytes,
           }
       }
   }
   
   impl<T: impl Iterator<Item = [u8; 4]>> Iterator for PackedU32Iterator<T> {
       fn size_hint(&self) -> (usize, Option<usize>) {
           let lower = (packed_bytes - current_byte) + packed_bytes * 
iterator.size_hint().0;
           (lower, None)
       }
   
       fn next(&mut self) -> Option<u8> {
           if self.current_byte >= self.packed_bytes {
               let Some(next_item) = self.iterator.next() else {
                   return None;
               };
               self.current_item = next_item;
               self.current_byte = 0;
           }
           let rval = self.current_item[self.current_byte];
           self.current_byte += 1;
           Some(rval)
       }
   }
   ```
   
   </details>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to