This is an automated email from the ASF dual-hosted git repository.
kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 1222996 ARROW-3398: [Rust] Update existing Builder to use
MutableBuffer internally
1222996 is described below
commit 1222996c6d4353067a3f9178fd27834e438dcf6e
Author: Paddy Horan <[email protected]>
AuthorDate: Wed Oct 10 19:32:16 2018 +0200
ARROW-3398: [Rust] Update existing Builder to use MutableBuffer internally
I also updated a lint issue in buffer.rs that was caught by nightly
rustfmt. It's the same issue that was addressed in #2658 so I don't know how
it got reverted.
There may be other lint issues that I will address as CI flags them (I hope
not) as locally I am running into [this
issue](https://github.com/rust-lang-nursery/rustfmt/issues/2916) after running
`rustup update`
cc @kszucs @sunchao @andygrove
Author: Paddy Horan <[email protected]>
Closes #2700 from paddyhoran/ARROW-3398 and squashes the following commits:
4e05ff03 <Paddy Horan> Re-phrased old doc-comment
c1bff9e3 <Paddy Horan> Addressed review
c670d07f <Paddy Horan> Added tests for `write_bytes`
8dc5ed37 <Paddy Horan> Fixed lint issues
4cf5e376 <Paddy Horan> Updated `Builder` to use `MutableBuffer` internally
---
rust/src/builder.rs | 275 +++++++++++++++++++++-------------------------------
1 file changed, 112 insertions(+), 163 deletions(-)
diff --git a/rust/src/builder.rs b/rust/src/builder.rs
index 8b5438e..2584ae9 100644
--- a/rust/src/builder.rs
+++ b/rust/src/builder.rs
@@ -15,158 +15,99 @@
// specific language governing permissions and limitations
// under the License.
-use libc;
-use std::cmp;
+//! Defines a `BufferBuilder` capable of creating a `Buffer` which can be used
as an internal
+//! buffer in an `ArrayData` object.
+
+use std::io::Write;
+use std::marker::PhantomData;
use std::mem;
-use std::ptr;
-use std::slice;
use super::buffer::*;
use super::datatypes::*;
-use super::memory::*;
+use error::{ArrowError, Result};
/// Buffer builder with zero-copy build method
-pub struct Builder<T>
+pub struct BufferBuilder<T>
where
T: ArrowPrimitiveType,
{
- data: *mut T,
- len: usize,
- capacity: usize,
+ buffer: MutableBuffer,
+ len: i64,
+ _marker: PhantomData<T>,
}
-impl<T> Builder<T>
+impl<T> BufferBuilder<T>
where
T: ArrowPrimitiveType,
{
- /// Creates a builder with a default capacity
- pub fn new() -> Self {
- Builder::with_capacity(64)
- }
-
- /// Creates a builder with a fixed capacity
- pub fn with_capacity(capacity: usize) -> Self {
- let sz = mem::size_of::<T>();
- let buffer = allocate_aligned((capacity * sz) as i64).unwrap();
- Builder {
+ /// Creates a builder with a fixed initial capacity
+ pub fn new(capacity: i64) -> Self {
+ let buffer = MutableBuffer::new(capacity as usize *
mem::size_of::<T>());
+ Self {
+ buffer,
len: 0,
- capacity,
- data: buffer as *mut T,
+ _marker: PhantomData,
}
}
- /// Get the number of elements in the builder
- pub fn len(&self) -> usize {
+ /// Returns the number of array elements (slots) in the builder
+ pub fn len(&self) -> i64 {
self.len
}
- /// Get the capacity of the builder (number of elements)
- pub fn capacity(&self) -> usize {
- self.capacity
- }
-
- /// Get the internal byte-aligned memory buffer as a mutable slice
- pub fn slice_mut(&mut self, start: usize, end: usize) -> &mut [T] {
- assert!(
- end <= self.capacity as usize,
- "the end of the slice must be within the capacity"
- );
- assert!(
- start <= end,
- "the start of the slice cannot exceed the end of the slice"
- );
- unsafe {
- slice::from_raw_parts_mut(self.data.offset(start as isize), (end -
start) as usize)
- }
- }
-
- /// Override the length
- pub fn set_len(&mut self, len: usize) {
- self.len = len;
+ /// Returns the current capacity of the builder (number of elements)
+ pub fn capacity(&self) -> i64 {
+ let byte_capacity = self.buffer.capacity();
+ (byte_capacity / mem::size_of::<T>()) as i64
}
/// Push a value into the builder, growing the internal buffer as needed
- pub fn push(&mut self, v: T) {
- assert!(!self.data.is_null(), "cannot push onto uninitialized data");
- if self.len == self.capacity {
- // grow capacity by 64 bytes or double the current capacity,
whichever is greater
- let new_capacity = cmp::max(64, self.capacity * 2);
- self.grow(new_capacity);
- }
- assert!(self.len < self.capacity, "new length exceeds capacity");
- unsafe {
- *self.data.offset(self.len as isize) = v;
- }
- self.len += 1;
- }
-
- /// Set a value at a slot in the allocated memory without adjusting the
length
- pub fn set(&mut self, i: usize, v: T) {
- assert!(
- !self.data.is_null(),
- "cannot set value if data is uninitialized"
- );
- assert!(i < self.capacity, "index exceeds capacity");
- unsafe {
- *self.data.offset(i as isize) = v;
- }
+ pub fn push(&mut self, v: T) -> Result<()> {
+ self.reserve(1)?;
+ self.write_bytes(v.to_byte_slice(), 1)
}
- /// push a slice of type T, growing the internal buffer as needed
- pub fn push_slice(&mut self, slice: &[T]) {
- self.reserve(slice.len());
- let sz = mem::size_of::<T>();
- unsafe {
- libc::memcpy(
- self.data.offset(self.len() as isize) as *mut libc::c_void,
- slice.as_ptr() as *const libc::c_void,
- slice.len() * sz,
- );
- }
- self.len += slice.len();
+ /// Push a slice of type T, growing the internal buffer as needed
+ pub fn push_slice(&mut self, slice: &[T]) -> Result<()> {
+ let array_slots = slice.len() as i64;
+ self.reserve(array_slots)?;
+ self.write_bytes(slice.to_byte_slice(), array_slots)
}
/// Reserve memory for n elements of type T
- pub fn reserve(&mut self, n: usize) {
- if self.len + n > self.capacity {
- let new_capacity = cmp::max(self.capacity * 2, n);
- self.grow(new_capacity);
- }
- }
-
- /// Grow the buffer to the new size n (number of elements of type T)
- fn grow(&mut self, new_capacity: usize) {
- let sz = mem::size_of::<T>();
- let old_buffer = self.data;
- let new_buffer = allocate_aligned((new_capacity * sz) as i64).unwrap();
- unsafe {
- libc::memcpy(
- new_buffer as *mut libc::c_void,
- old_buffer as *const libc::c_void,
- self.len * sz,
- );
+ pub fn reserve(&mut self, n: i64) -> Result<()> {
+ let new_capacity = self.len + n;
+ if new_capacity > self.capacity() {
+ return self.grow(new_capacity);
}
- self.capacity = new_capacity;
- self.data = new_buffer as *mut T;
- free_aligned(old_buffer as *const u8);
- }
-
- /// Build a Buffer from the existing memory
- pub fn finish(&mut self) -> Buffer {
- assert!(!self.data.is_null(), "data has not been initialized");
- let p = self.data;
- self.data = ptr::null_mut(); // ensure builder cannot be re-used
- Buffer::from_raw_parts(p as *mut u8, self.len)
- }
-}
-
-impl<T> Drop for Builder<T>
-where
- T: ArrowPrimitiveType,
-{
- fn drop(&mut self) {
- if !self.data.is_null() {
- free_aligned(self.data as *const u8);
+ Ok(())
+ }
+
+ /// Grow the internal buffer to `new_capacity`, where `new_capacity` is
the capacity in
+ /// elements of type T
+ fn grow(&mut self, new_capacity: i64) -> Result<()> {
+ let byte_capacity = mem::size_of::<T>() * new_capacity as usize;
+ self.buffer.resize(byte_capacity)
+ }
+
+ /// Build an immutable `Buffer` from the existing internal
`MutableBuffer`'s memory
+ pub fn finish(self) -> Buffer {
+ self.buffer.freeze()
+ }
+
+ /// Writes a byte slice to the underlying buffer and updates the `len`,
i.e. the number array
+ /// elements in the builder. Also, converts the `io::Result` required by
the `Write` trait
+ /// to the Arrow `Result` type.
+ fn write_bytes(&mut self, bytes: &[u8], len_added: i64) -> Result<()> {
+ let write_result = self.buffer.write(bytes);
+ // `io::Result` has many options one of which we use, so pattern
matching is overkill here
+ if write_result.is_err() {
+ Err(ArrowError::MemoryError(
+ "Could not write to Buffer, not big enough".to_string(),
+ ))
+ } else {
+ self.len += len_added;
+ Ok(())
}
}
}
@@ -177,83 +118,91 @@ mod tests {
#[test]
fn test_builder_i32_empty() {
- let mut b: Builder<i32> = Builder::with_capacity(5);
+ let b = BufferBuilder::<i32>::new(5);
+ assert_eq!(0, b.len());
+ assert_eq!(16, b.capacity());
let a = b.finish();
assert_eq!(0, a.len());
}
#[test]
fn test_builder_i32_alloc_zero_bytes() {
- let mut b: Builder<i32> = Builder::with_capacity(0);
- b.push(123);
+ let mut b = BufferBuilder::<i32>::new(0);
+ b.push(123).unwrap();
let a = b.finish();
- assert_eq!(1, a.len());
+ assert_eq!(4, a.len());
}
#[test]
fn test_builder_i32() {
- let mut b: Builder<i32> = Builder::with_capacity(5);
+ let mut b = BufferBuilder::<i32>::new(5);
for i in 0..5 {
- b.push(i);
+ b.push(i).unwrap();
}
+ assert_eq!(16, b.capacity());
let a = b.finish();
- assert_eq!(5, a.len());
+ assert_eq!(20, a.len());
}
#[test]
fn test_builder_i32_grow_buffer() {
- let mut b: Builder<i32> = Builder::with_capacity(2);
- for i in 0..5 {
- b.push(i);
+ let mut b = BufferBuilder::<i32>::new(2);
+ assert_eq!(16, b.capacity());
+ for i in 0..20 {
+ b.push(i).unwrap();
}
+ assert_eq!(32, b.capacity());
let a = b.finish();
- assert_eq!(5, a.len());
+ assert_eq!(80, a.len());
}
#[test]
fn test_reserve() {
- let mut b: Builder<u8> = Builder::with_capacity(2);
- assert_eq!(2, b.capacity());
- b.reserve(2);
- assert_eq!(2, b.capacity());
- b.reserve(3);
- assert_eq!(4, b.capacity());
+ let mut b = BufferBuilder::<u8>::new(2);
+ assert_eq!(64, b.capacity());
+ b.reserve(64).unwrap();
+ assert_eq!(64, b.capacity());
+ b.reserve(65).unwrap();
+ assert_eq!(128, b.capacity());
+
+ let mut b = BufferBuilder::<i32>::new(2);
+ assert_eq!(16, b.capacity());
+ b.reserve(16).unwrap();
+ assert_eq!(16, b.capacity());
+ b.reserve(17).unwrap();
+ assert_eq!(32, b.capacity());
}
#[test]
fn test_push_slice() {
- let mut b: Builder<u8> = Builder::new();
- b.push_slice("Hello, ".as_bytes());
- b.push_slice("World!".as_bytes());
+ let mut b = BufferBuilder::<u8>::new(0);
+ b.push_slice("Hello, ".as_bytes()).unwrap();
+ b.push_slice("World!".as_bytes()).unwrap();
let buffer = b.finish();
assert_eq!(13, buffer.len());
- }
-
- #[test]
- fn test_slice_empty_at_end() {
- let mut b: Builder<u8> = Builder::with_capacity(2);
- let s = b.slice_mut(2, 2);
- assert_eq!(0, s.len());
- }
- #[test]
- #[should_panic(expected = "the end of the slice must be within the
capacity")]
- fn test_slice_start_out_of_bounds() {
- let mut b: Builder<u8> = Builder::with_capacity(2);
- b.slice_mut(3, 3); // should panic
+ let mut b = BufferBuilder::<i32>::new(0);
+ b.push_slice(&[32, 54]).unwrap();
+ let buffer = b.finish();
+ assert_eq!(8, buffer.len());
}
#[test]
- #[should_panic(expected = "the end of the slice must be within the
capacity")]
- fn test_slice_end_out_of_bounds() {
- let mut b: Builder<u8> = Builder::with_capacity(2);
- b.slice_mut(0, 3); // should panic
+ fn test_write_bytes() {
+ let mut b = BufferBuilder::<bool>::new(4);
+ let bytes = [false, true, false, true].to_byte_slice();
+ b.write_bytes(bytes, 4).unwrap();
+ assert_eq!(4, b.len());
+ assert_eq!(64, b.capacity());
+ let buffer = b.finish();
+ assert_eq!(4, buffer.len());
}
#[test]
- #[should_panic(expected = "the start of the slice cannot exceed the end of
the slice")]
- fn test_slice_end_before_start() {
- let mut b: Builder<u8> = Builder::with_capacity(2);
- b.slice_mut(1, 0); // should panic
+ #[should_panic(expected = "Could not write to Buffer, not big enough")]
+ fn test_write_too_many_bytes() {
+ let mut b = BufferBuilder::<bool>::new(0);
+ let bytes = [false, true, false, true].to_byte_slice();
+ b.write_bytes(bytes, 4).unwrap();
}
}