Repository: arrow Updated Branches: refs/heads/master 222628c9d -> 608b89e16
ARROW-1073: C++: Adapative integer builder Author: Uwe L. Korn <uw...@xhochy.com> Closes #723 from xhochy/ARROW-1073 and squashes the following commits: 5bab9c2f [Uwe L. Korn] ARROW-1073: C++: Adapative integer builder Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/608b89e1 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/608b89e1 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/608b89e1 Branch: refs/heads/master Commit: 608b89e1648ef8a116c3139606b0440122d2b75a Parents: 222628c Author: Uwe L. Korn <uw...@xhochy.com> Authored: Thu Jun 22 17:32:41 2017 -0400 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Thu Jun 22 17:32:41 2017 -0400 ---------------------------------------------------------------------- cpp/src/arrow/array-test.cc | 220 +++++++++++++++++++++ cpp/src/arrow/builder-benchmark.cc | 66 +++++++ cpp/src/arrow/builder.cc | 331 ++++++++++++++++++++++++++++++++ cpp/src/arrow/builder.h | 190 +++++++++++++++++- 4 files changed, 806 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/608b89e1/cpp/src/arrow/array-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc index 636d97f..beffa1b 100644 --- a/cpp/src/arrow/array-test.cc +++ b/cpp/src/arrow/array-test.cc @@ -1258,6 +1258,226 @@ TEST_F(TestFWBinaryArray, Slice) { } // ---------------------------------------------------------------------- +// AdaptiveInt tests + +class TestAdaptiveIntBuilder : public TestBuilder { + public: + void SetUp() { + TestBuilder::SetUp(); + builder_ = std::make_shared<AdaptiveIntBuilder>(pool_); + } + + void Done() { EXPECT_OK(builder_->Finish(&result_)); } + + protected: + std::shared_ptr<AdaptiveIntBuilder> builder_; + + std::shared_ptr<Array> expected_; + std::shared_ptr<Array> result_; +}; + +TEST_F(TestAdaptiveIntBuilder, TestInt8) { + builder_->Append(0); + builder_->Append(127); + builder_->Append(-128); + + Done(); + + std::vector<int8_t> expected_values({0, 127, -128}); + ArrayFromVector<Int8Type, int8_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveIntBuilder, TestInt16) { + builder_->Append(0); + builder_->Append(128); + Done(); + + std::vector<int16_t> expected_values({0, 128}); + ArrayFromVector<Int16Type, int16_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(-129); + expected_values = {-129}; + Done(); + + ArrayFromVector<Int16Type, int16_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(std::numeric_limits<int16_t>::max()); + builder_->Append(std::numeric_limits<int16_t>::min()); + expected_values = { + std::numeric_limits<int16_t>::max(), std::numeric_limits<int16_t>::min()}; + Done(); + + ArrayFromVector<Int16Type, int16_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveIntBuilder, TestInt32) { + builder_->Append(0); + builder_->Append(static_cast<int64_t>(std::numeric_limits<int16_t>::max()) + 1); + Done(); + + std::vector<int32_t> expected_values( + {0, static_cast<int32_t>(std::numeric_limits<int16_t>::max()) + 1}); + ArrayFromVector<Int32Type, int32_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(static_cast<int64_t>(std::numeric_limits<int16_t>::min()) - 1); + expected_values = {static_cast<int32_t>(std::numeric_limits<int16_t>::min()) - 1}; + Done(); + + ArrayFromVector<Int32Type, int32_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(std::numeric_limits<int32_t>::max()); + builder_->Append(std::numeric_limits<int32_t>::min()); + expected_values = { + std::numeric_limits<int32_t>::max(), std::numeric_limits<int32_t>::min()}; + Done(); + + ArrayFromVector<Int32Type, int32_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveIntBuilder, TestInt64) { + builder_->Append(0); + builder_->Append(static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1); + Done(); + + std::vector<int64_t> expected_values( + {0, static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1}); + ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(static_cast<int64_t>(std::numeric_limits<int32_t>::min()) - 1); + expected_values = {static_cast<int64_t>(std::numeric_limits<int32_t>::min()) - 1}; + Done(); + + ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(std::numeric_limits<int64_t>::max()); + builder_->Append(std::numeric_limits<int64_t>::min()); + expected_values = { + std::numeric_limits<int64_t>::max(), std::numeric_limits<int64_t>::min()}; + Done(); + + ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveIntBuilder, TestAppendVector) { + std::vector<int64_t> expected_values( + {0, static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1}); + builder_->Append(expected_values.data(), expected_values.size()); + Done(); + + ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +class TestAdaptiveUIntBuilder : public TestBuilder { + public: + void SetUp() { + TestBuilder::SetUp(); + builder_ = std::make_shared<AdaptiveUIntBuilder>(pool_); + } + + void Done() { EXPECT_OK(builder_->Finish(&result_)); } + + protected: + std::shared_ptr<AdaptiveUIntBuilder> builder_; + + std::shared_ptr<Array> expected_; + std::shared_ptr<Array> result_; +}; + +TEST_F(TestAdaptiveUIntBuilder, TestUInt8) { + builder_->Append(0); + builder_->Append(255); + + Done(); + + std::vector<uint8_t> expected_values({0, 255}); + ArrayFromVector<UInt8Type, uint8_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveUIntBuilder, TestUInt16) { + builder_->Append(0); + builder_->Append(256); + Done(); + + std::vector<uint16_t> expected_values({0, 256}); + ArrayFromVector<UInt16Type, uint16_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(std::numeric_limits<uint16_t>::max()); + expected_values = {std::numeric_limits<uint16_t>::max()}; + Done(); + + ArrayFromVector<UInt16Type, uint16_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveUIntBuilder, TestUInt32) { + builder_->Append(0); + builder_->Append(static_cast<uint64_t>(std::numeric_limits<uint16_t>::max()) + 1); + Done(); + + std::vector<uint32_t> expected_values( + {0, static_cast<uint32_t>(std::numeric_limits<uint16_t>::max()) + 1}); + ArrayFromVector<UInt32Type, uint32_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(std::numeric_limits<uint32_t>::max()); + expected_values = {std::numeric_limits<uint32_t>::max()}; + Done(); + + ArrayFromVector<UInt32Type, uint32_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveUIntBuilder, TestUInt64) { + builder_->Append(0); + builder_->Append(static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); + Done(); + + std::vector<uint64_t> expected_values( + {0, static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1}); + ArrayFromVector<UInt64Type, uint64_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(std::numeric_limits<uint64_t>::max()); + expected_values = {std::numeric_limits<uint64_t>::max()}; + Done(); + + ArrayFromVector<UInt64Type, uint64_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveUIntBuilder, TestAppendVector) { + std::vector<uint64_t> expected_values( + {0, static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1}); + builder_->Append(expected_values.data(), expected_values.size()); + Done(); + + ArrayFromVector<UInt64Type, uint64_t>(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +// ---------------------------------------------------------------------- // List tests class TestListBuilder : public TestBuilder { http://git-wip-us.apache.org/repos/asf/arrow/blob/608b89e1/cpp/src/arrow/builder-benchmark.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/builder-benchmark.cc b/cpp/src/arrow/builder-benchmark.cc index b0c3cd1..62f2fd6 100644 --- a/cpp/src/arrow/builder-benchmark.cc +++ b/cpp/src/arrow/builder-benchmark.cc @@ -61,4 +61,70 @@ static void BM_BuildVectorNoNulls( BENCHMARK(BM_BuildVectorNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond); +static void BM_BuildAdaptiveIntNoNulls( + benchmark::State& state) { // NOLINT non-const reference + int64_t size = static_cast<int64_t>(std::numeric_limits<int16_t>::max()) * 256; + int64_t chunk_size = size / 8; + std::vector<int64_t> data; + for (int64_t i = 0; i < size; i++) { + data.push_back(i); + } + while (state.KeepRunning()) { + AdaptiveIntBuilder builder(default_memory_pool()); + for (int64_t i = 0; i < size; i += chunk_size) { + // Build up an array of 512 MiB in size + builder.Append(data.data() + i, chunk_size, nullptr); + } + std::shared_ptr<Array> out; + builder.Finish(&out); + } + state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t)); +} + +BENCHMARK(BM_BuildAdaptiveIntNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond); + +static void BM_BuildAdaptiveIntNoNullsScalarAppend( + benchmark::State& state) { // NOLINT non-const reference + int64_t size = static_cast<int64_t>(std::numeric_limits<int16_t>::max()) * 256; + std::vector<int64_t> data; + for (int64_t i = 0; i < size; i++) { + data.push_back(i); + } + while (state.KeepRunning()) { + AdaptiveIntBuilder builder(default_memory_pool()); + for (int64_t i = 0; i < size; i++) { + builder.Append(data[i]); + } + std::shared_ptr<Array> out; + builder.Finish(&out); + } + state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t)); +} + +BENCHMARK(BM_BuildAdaptiveIntNoNullsScalarAppend) + ->Repetitions(3) + ->Unit(benchmark::kMillisecond); + +static void BM_BuildAdaptiveUIntNoNulls( + benchmark::State& state) { // NOLINT non-const reference + int64_t size = static_cast<int64_t>(std::numeric_limits<uint16_t>::max()) * 256; + int64_t chunk_size = size / 8; + std::vector<uint64_t> data; + for (uint64_t i = 0; i < static_cast<uint64_t>(size); i++) { + data.push_back(i); + } + while (state.KeepRunning()) { + AdaptiveUIntBuilder builder(default_memory_pool()); + for (int64_t i = 0; i < size; i += chunk_size) { + // Build up an array of 512 MiB in size + builder.Append(data.data() + i, chunk_size, nullptr); + } + std::shared_ptr<Array> out; + builder.Finish(&out); + } + state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t)); +} + +BENCHMARK(BM_BuildAdaptiveUIntNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond); + } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/608b89e1/cpp/src/arrow/builder.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc index ab43c2a..6762e17 100644 --- a/cpp/src/arrow/builder.cc +++ b/cpp/src/arrow/builder.cc @@ -17,6 +17,7 @@ #include "arrow/builder.h" +#include <algorithm> #include <cstdint> #include <cstring> #include <limits> @@ -248,6 +249,336 @@ template class PrimitiveBuilder<HalfFloatType>; template class PrimitiveBuilder<FloatType>; template class PrimitiveBuilder<DoubleType>; +AdaptiveIntBuilderBase::AdaptiveIntBuilderBase(MemoryPool* pool) + : ArrayBuilder(pool, int64()), data_(nullptr), raw_data_(nullptr), int_size_(1) {} + +Status AdaptiveIntBuilderBase::Init(int64_t capacity) { + RETURN_NOT_OK(ArrayBuilder::Init(capacity)); + data_ = std::make_shared<PoolBuffer>(pool_); + + int64_t nbytes = capacity * int_size_; + RETURN_NOT_OK(data_->Resize(nbytes)); + // TODO(emkornfield) valgrind complains without this + memset(data_->mutable_data(), 0, static_cast<size_t>(nbytes)); + + raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data()); + return Status::OK(); +} + +Status AdaptiveIntBuilderBase::Resize(int64_t capacity) { + // XXX: Set floor size for now + if (capacity < kMinBuilderCapacity) { capacity = kMinBuilderCapacity; } + + if (capacity_ == 0) { + RETURN_NOT_OK(Init(capacity)); + } else { + RETURN_NOT_OK(ArrayBuilder::Resize(capacity)); + const int64_t old_bytes = data_->size(); + const int64_t new_bytes = capacity * int_size_; + RETURN_NOT_OK(data_->Resize(new_bytes)); + raw_data_ = data_->mutable_data(); + // TODO(emkornfield) valgrind complains without this + memset( + data_->mutable_data() + old_bytes, 0, static_cast<size_t>(new_bytes - old_bytes)); + } + return Status::OK(); +} + +AdaptiveIntBuilder::AdaptiveIntBuilder(MemoryPool* pool) : AdaptiveIntBuilderBase(pool) {} + +Status AdaptiveIntBuilder::Finish(std::shared_ptr<Array>* out) { + const int64_t bytes_required = length_ * int_size_; + if (bytes_required > 0 && bytes_required < data_->size()) { + // Trim buffers + RETURN_NOT_OK(data_->Resize(bytes_required)); + } + switch (int_size_) { + case 1: + *out = + std::make_shared<Int8Array>(int8(), length_, data_, null_bitmap_, null_count_); + break; + case 2: + *out = std::make_shared<Int16Array>( + int16(), length_, data_, null_bitmap_, null_count_); + break; + case 4: + *out = std::make_shared<Int32Array>( + int32(), length_, data_, null_bitmap_, null_count_); + break; + case 8: + *out = std::make_shared<Int64Array>( + int64(), length_, data_, null_bitmap_, null_count_); + break; + default: + DCHECK(false); + return Status::NotImplemented("Only ints of size 1,2,4,8 are supported"); + } + + data_ = null_bitmap_ = nullptr; + capacity_ = length_ = null_count_ = 0; + return Status::OK(); +} + +Status AdaptiveIntBuilder::Append( + const int64_t* values, int64_t length, const uint8_t* valid_bytes) { + RETURN_NOT_OK(Reserve(length)); + + if (length > 0) { + if (int_size_ < 8) { + uint8_t new_int_size = int_size_; + for (int64_t i = 0; i < length; i++) { + if (valid_bytes == nullptr || valid_bytes[i]) { + new_int_size = expanded_int_size(values[i], new_int_size); + } + } + if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); } + } + } + + if (int_size_ == 8) { + std::memcpy(reinterpret_cast<int64_t*>(raw_data_) + length_, values, + sizeof(int64_t) * length); + } else { + // int_size_ may have changed, so we need to recheck + switch (int_size_) { + case 1: { + int8_t* data_ptr = reinterpret_cast<int8_t*>(raw_data_) + length_; + std::transform(values, values + length, data_ptr, + [](int64_t x) { return static_cast<int8_t>(x); }); + } break; + case 2: { + int16_t* data_ptr = reinterpret_cast<int16_t*>(raw_data_) + length_; + std::transform(values, values + length, data_ptr, + [](int64_t x) { return static_cast<int16_t>(x); }); + } break; + case 4: { + int32_t* data_ptr = reinterpret_cast<int32_t*>(raw_data_) + length_; + std::transform(values, values + length, data_ptr, + [](int64_t x) { return static_cast<int32_t>(x); }); + } break; + default: + DCHECK(false); + } + } + + // length_ is update by these + ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length); + + return Status::OK(); +} + +template <typename new_type, typename old_type> +typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type +AdaptiveIntBuilder::ExpandIntSizeInternal() { + return Status::OK(); +} + +#define __LESS(a, b) (a) < (b) +template <typename new_type, typename old_type> +typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type +AdaptiveIntBuilder::ExpandIntSizeInternal() { + int_size_ = sizeof(new_type); + RETURN_NOT_OK(Resize(data_->size() / sizeof(old_type))); + + old_type* src = reinterpret_cast<old_type*>(raw_data_); + new_type* dst = reinterpret_cast<new_type*>(raw_data_); + // By doing the backward copy, we ensure that no element is overriden during + // the copy process and the copy stays in-place. + std::copy_backward(src, src + length_, dst + length_); + + return Status::OK(); +} +#undef __LESS + +template <typename new_type> +Status AdaptiveIntBuilder::ExpandIntSizeN() { + switch (int_size_) { + case 1: + RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int8_t>())); + break; + case 2: + RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int16_t>())); + break; + case 4: + RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int32_t>())); + break; + case 8: + RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int64_t>())); + break; + default: + DCHECK(false); + } + return Status::OK(); +} + +Status AdaptiveIntBuilder::ExpandIntSize(uint8_t new_int_size) { + switch (new_int_size) { + case 1: + RETURN_NOT_OK((ExpandIntSizeN<int8_t>())); + break; + case 2: + RETURN_NOT_OK((ExpandIntSizeN<int16_t>())); + break; + case 4: + RETURN_NOT_OK((ExpandIntSizeN<int32_t>())); + break; + case 8: + RETURN_NOT_OK((ExpandIntSizeN<int64_t>())); + break; + default: + DCHECK(false); + } + return Status::OK(); +} + +AdaptiveUIntBuilder::AdaptiveUIntBuilder(MemoryPool* pool) + : AdaptiveIntBuilderBase(pool) {} + +Status AdaptiveUIntBuilder::Finish(std::shared_ptr<Array>* out) { + const int64_t bytes_required = length_ * int_size_; + if (bytes_required > 0 && bytes_required < data_->size()) { + // Trim buffers + RETURN_NOT_OK(data_->Resize(bytes_required)); + } + switch (int_size_) { + case 1: + *out = std::make_shared<UInt8Array>( + uint8(), length_, data_, null_bitmap_, null_count_); + break; + case 2: + *out = std::make_shared<UInt16Array>( + uint16(), length_, data_, null_bitmap_, null_count_); + break; + case 4: + *out = std::make_shared<UInt32Array>( + uint32(), length_, data_, null_bitmap_, null_count_); + break; + case 8: + *out = std::make_shared<UInt64Array>( + uint64(), length_, data_, null_bitmap_, null_count_); + break; + default: + DCHECK(false); + return Status::NotImplemented("Only ints of size 1,2,4,8 are supported"); + } + + data_ = null_bitmap_ = nullptr; + capacity_ = length_ = null_count_ = 0; + return Status::OK(); +} + +Status AdaptiveUIntBuilder::Append( + const uint64_t* values, int64_t length, const uint8_t* valid_bytes) { + RETURN_NOT_OK(Reserve(length)); + + if (length > 0) { + if (int_size_ < 8) { + uint8_t new_int_size = int_size_; + for (int64_t i = 0; i < length; i++) { + if (valid_bytes == nullptr || valid_bytes[i]) { + new_int_size = expanded_uint_size(values[i], new_int_size); + } + } + if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); } + } + } + + if (int_size_ == 8) { + std::memcpy(reinterpret_cast<uint64_t*>(raw_data_) + length_, values, + sizeof(uint64_t) * length); + } else { + // int_size_ may have changed, so we need to recheck + switch (int_size_) { + case 1: { + uint8_t* data_ptr = reinterpret_cast<uint8_t*>(raw_data_) + length_; + std::transform(values, values + length, data_ptr, + [](uint64_t x) { return static_cast<uint8_t>(x); }); + } break; + case 2: { + uint16_t* data_ptr = reinterpret_cast<uint16_t*>(raw_data_) + length_; + std::transform(values, values + length, data_ptr, + [](uint64_t x) { return static_cast<uint16_t>(x); }); + } break; + case 4: { + uint32_t* data_ptr = reinterpret_cast<uint32_t*>(raw_data_) + length_; + std::transform(values, values + length, data_ptr, + [](uint64_t x) { return static_cast<uint32_t>(x); }); + } break; + default: + DCHECK(false); + } + } + + // length_ is update by these + ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length); + + return Status::OK(); +} + +template <typename new_type, typename old_type> +typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type +AdaptiveUIntBuilder::ExpandIntSizeInternal() { + return Status::OK(); +} + +#define __LESS(a, b) (a) < (b) +template <typename new_type, typename old_type> +typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type +AdaptiveUIntBuilder::ExpandIntSizeInternal() { + int_size_ = sizeof(new_type); + RETURN_NOT_OK(Resize(data_->size() / sizeof(old_type))); + + old_type* src = reinterpret_cast<old_type*>(raw_data_); + new_type* dst = reinterpret_cast<new_type*>(raw_data_); + // By doing the backward copy, we ensure that no element is overriden during + // the copy process and the copy stays in-place. + std::copy_backward(src, src + length_, dst + length_); + + return Status::OK(); +} +#undef __LESS + +template <typename new_type> +Status AdaptiveUIntBuilder::ExpandIntSizeN() { + switch (int_size_) { + case 1: + RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint8_t>())); + break; + case 2: + RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint16_t>())); + break; + case 4: + RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint32_t>())); + break; + case 8: + RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint64_t>())); + break; + default: + DCHECK(false); + } + return Status::OK(); +} + +Status AdaptiveUIntBuilder::ExpandIntSize(uint8_t new_int_size) { + switch (new_int_size) { + case 1: + RETURN_NOT_OK((ExpandIntSizeN<uint8_t>())); + break; + case 2: + RETURN_NOT_OK((ExpandIntSizeN<uint16_t>())); + break; + case 4: + RETURN_NOT_OK((ExpandIntSizeN<uint32_t>())); + break; + case 8: + RETURN_NOT_OK((ExpandIntSizeN<uint64_t>())); + break; + default: + DCHECK(false); + } + return Status::OK(); +} + BooleanBuilder::BooleanBuilder(MemoryPool* pool) : ArrayBuilder(pool, boolean()), data_(nullptr), raw_data_(nullptr) {} http://git-wip-us.apache.org/repos/asf/arrow/blob/608b89e1/cpp/src/arrow/builder.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h index 6876916..d77223e 100644 --- a/cpp/src/arrow/builder.h +++ b/cpp/src/arrow/builder.h @@ -20,6 +20,7 @@ #include <cstdint> #include <functional> +#include <limits> #include <memory> #include <string> #include <vector> @@ -247,6 +248,193 @@ using HalfFloatBuilder = NumericBuilder<HalfFloatType>; using FloatBuilder = NumericBuilder<FloatType>; using DoubleBuilder = NumericBuilder<DoubleType>; +class ARROW_EXPORT AdaptiveIntBuilderBase : public ArrayBuilder { + public: + explicit AdaptiveIntBuilderBase(MemoryPool* pool); + + /// Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory + Status AppendNulls(const uint8_t* valid_bytes, int64_t length) { + RETURN_NOT_OK(Reserve(length)); + UnsafeAppendToBitmap(valid_bytes, length); + return Status::OK(); + } + + Status AppendNull() { + RETURN_NOT_OK(Reserve(1)); + UnsafeAppendToBitmap(false); + return Status::OK(); + } + + std::shared_ptr<Buffer> data() const { return data_; } + + Status Init(int64_t capacity) override; + + /// Increase the capacity of the builder to accommodate at least the indicated + /// number of elements + Status Resize(int64_t capacity) override; + + protected: + std::shared_ptr<PoolBuffer> data_; + uint8_t* raw_data_; + + uint8_t int_size_; +}; + +// Check if we would need to expand the underlying storage type +inline uint8_t expanded_uint_size(uint64_t val, uint8_t current_int_size) { + if (current_int_size == 8 || + (current_int_size < 8 && + (val > static_cast<uint64_t>(std::numeric_limits<uint32_t>::max())))) { + return 8; + } else if (current_int_size == 4 || + (current_int_size < 4 && + (val > static_cast<uint64_t>(std::numeric_limits<uint16_t>::max())))) { + return 4; + } else if (current_int_size == 2 || + (current_int_size == 1 && + (val > static_cast<uint64_t>(std::numeric_limits<uint8_t>::max())))) { + return 2; + } else { + return 1; + } +} + +class ARROW_EXPORT AdaptiveUIntBuilder : public AdaptiveIntBuilderBase { + public: + explicit AdaptiveUIntBuilder(MemoryPool* pool); + + using ArrayBuilder::Advance; + + /// Scalar append + Status Append(uint64_t val) { + RETURN_NOT_OK(Reserve(1)); + BitUtil::SetBit(null_bitmap_data_, length_); + + uint8_t new_int_size = expanded_uint_size(val, int_size_); + if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); } + + switch (int_size_) { + case 1: + reinterpret_cast<uint8_t*>(raw_data_)[length_++] = static_cast<uint8_t>(val); + break; + case 2: + reinterpret_cast<uint16_t*>(raw_data_)[length_++] = static_cast<uint16_t>(val); + break; + case 4: + reinterpret_cast<uint32_t*>(raw_data_)[length_++] = static_cast<uint32_t>(val); + break; + case 8: + reinterpret_cast<uint64_t*>(raw_data_)[length_++] = val; + break; + default: + return Status::NotImplemented("This code shall never be reached"); + } + return Status::OK(); + } + + /// Vector append + /// + /// If passed, valid_bytes is of equal length to values, and any zero byte + /// will be considered as a null for that slot + Status Append( + const uint64_t* values, int64_t length, const uint8_t* valid_bytes = nullptr); + + Status ExpandIntSize(uint8_t new_int_size); + Status Finish(std::shared_ptr<Array>* out) override; + + protected: + template <typename new_type, typename old_type> + typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type + ExpandIntSizeInternal(); +#define __LESS(a, b) (a) < (b) + template <typename new_type, typename old_type> + typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type + ExpandIntSizeInternal(); +#undef __LESS + + template <typename new_type> + Status ExpandIntSizeN(); +}; + +// Check if we would need to expand the underlying storage type +inline uint8_t expanded_int_size(int64_t val, uint8_t current_int_size) { + if (current_int_size == 8 || + (current_int_size < 8 && + (val > static_cast<int64_t>(std::numeric_limits<int32_t>::max()) || + val < static_cast<int64_t>(std::numeric_limits<int32_t>::min())))) { + return 8; + } else if (current_int_size == 4 || + (current_int_size < 4 && + (val > static_cast<int64_t>(std::numeric_limits<int16_t>::max()) || + val < static_cast<int64_t>(std::numeric_limits<int16_t>::min())))) { + return 4; + } else if (current_int_size == 2 || + (current_int_size == 1 && + (val > static_cast<int64_t>(std::numeric_limits<int8_t>::max()) || + val < static_cast<int64_t>(std::numeric_limits<int8_t>::min())))) { + return 2; + } else { + return 1; + } +} + +class ARROW_EXPORT AdaptiveIntBuilder : public AdaptiveIntBuilderBase { + public: + explicit AdaptiveIntBuilder(MemoryPool* pool); + + using ArrayBuilder::Advance; + + /// Scalar append + Status Append(int64_t val) { + RETURN_NOT_OK(Reserve(1)); + BitUtil::SetBit(null_bitmap_data_, length_); + + uint8_t new_int_size = expanded_int_size(val, int_size_); + if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); } + + switch (int_size_) { + case 1: + reinterpret_cast<int8_t*>(raw_data_)[length_++] = static_cast<int8_t>(val); + break; + case 2: + reinterpret_cast<int16_t*>(raw_data_)[length_++] = static_cast<int16_t>(val); + break; + case 4: + reinterpret_cast<int32_t*>(raw_data_)[length_++] = static_cast<int32_t>(val); + break; + case 8: + reinterpret_cast<int64_t*>(raw_data_)[length_++] = val; + break; + default: + return Status::NotImplemented("This code shall never be reached"); + } + return Status::OK(); + } + + /// Vector append + /// + /// If passed, valid_bytes is of equal length to values, and any zero byte + /// will be considered as a null for that slot + Status Append( + const int64_t* values, int64_t length, const uint8_t* valid_bytes = nullptr); + + Status ExpandIntSize(uint8_t new_int_size); + Status Finish(std::shared_ptr<Array>* out) override; + + protected: + template <typename new_type, typename old_type> + typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type + ExpandIntSizeInternal(); +#define __LESS(a, b) (a) < (b) + template <typename new_type, typename old_type> + typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type + ExpandIntSizeInternal(); +#undef __LESS + + template <typename new_type> + Status ExpandIntSizeN(); +}; + class ARROW_EXPORT BooleanBuilder : public ArrayBuilder { public: explicit BooleanBuilder(MemoryPool* pool); @@ -271,7 +459,7 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder { /// Scalar append Status Append(bool val) { - Reserve(1); + RETURN_NOT_OK(Reserve(1)); BitUtil::SetBit(null_bitmap_data_, length_); if (val) { BitUtil::SetBit(raw_data_, length_);