This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new ff94702eee MINOR: [C++] Make the utilities used to implement
run_end_encode available to other compute kernels (#35002)
ff94702eee is described below
commit ff94702eee6ee419329d32d86c14bf0bf0c244f3
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Thu Apr 13 15:02:51 2023 -0300
MINOR: [C++] Make the utilities used to implement run_end_encode available
to other compute kernels (#35002)
### Rationale for this change
Make it easier to implement correct kernels dealing with run-end encoded
data.
### What changes are included in this PR?
Extraction of code to a header so kernels like REE filter kernels (see
#35001) can use them.
### Are these changes tested?
Indirectly via the `run_end_encode` and `run_end_decode` tests.
### Are there any user-facing changes?
No, these are internal compute headers.
Authored-by: Felipe Oliveira Carvalho <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
cpp/src/arrow/CMakeLists.txt | 1 +
cpp/src/arrow/compute/kernels/ree_util_internal.cc | 139 +++++++
cpp/src/arrow/compute/kernels/ree_util_internal.h | 339 ++++++++++++++++
.../arrow/compute/kernels/vector_run_end_encode.cc | 444 +++------------------
4 files changed, 531 insertions(+), 392 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 4c91dc57b4..98ab2cc6d4 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -402,6 +402,7 @@ list(APPEND
compute/registry.cc
compute/kernels/codegen_internal.cc
compute/kernels/row_encoder.cc
+ compute/kernels/ree_util_internal.cc
compute/kernels/scalar_cast_boolean.cc
compute/kernels/scalar_cast_dictionary.cc
compute/kernels/scalar_cast_extension.cc
diff --git a/cpp/src/arrow/compute/kernels/ree_util_internal.cc
b/cpp/src/arrow/compute/kernels/ree_util_internal.cc
new file mode 100644
index 0000000000..edfefd324f
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/ree_util_internal.cc
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/compute/kernels/ree_util_internal.h"
+
+#include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+namespace ree_util {
+
+Result<std::shared_ptr<Buffer>> AllocateValuesBuffer(int64_t length, const
DataType& type,
+ MemoryPool* pool,
+ int64_t data_buffer_size)
{
+ if (type.bit_width() == 1) {
+ return AllocateBitmap(length, pool);
+ } else if (is_fixed_width(type.id())) {
+ return AllocateBuffer(length * type.byte_width(), pool);
+ } else {
+ DCHECK(is_base_binary_like(type.id()));
+ return AllocateBuffer(data_buffer_size, pool);
+ }
+}
+
+Result<std::shared_ptr<ArrayData>> PreallocateRunEndsArray(
+ const std::shared_ptr<DataType>& run_end_type, int64_t physical_length,
+ MemoryPool* pool) {
+ DCHECK(is_run_end_type(run_end_type->id()));
+ ARROW_ASSIGN_OR_RAISE(
+ auto run_ends_buffer,
+ AllocateBuffer(physical_length * run_end_type->byte_width(), pool));
+ return ArrayData::Make(run_end_type, physical_length,
+ {NULLPTR, std::move(run_ends_buffer)},
/*null_count=*/0);
+}
+
+Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
+ const std::shared_ptr<DataType>& value_type, bool has_validity_buffer,
int64_t length,
+ int64_t null_count, MemoryPool* pool, int64_t data_buffer_size) {
+ std::vector<std::shared_ptr<Buffer>> values_data_buffers;
+ std::shared_ptr<Buffer> validity_buffer = NULLPTR;
+ if (has_validity_buffer) {
+ ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(length, pool));
+ }
+ ARROW_ASSIGN_OR_RAISE(auto values_buffer, AllocateValuesBuffer(length,
*value_type,
+ pool,
data_buffer_size));
+ if (is_base_binary_like(value_type->id())) {
+ const int offset_byte_width = offset_bit_width(value_type->id()) / 8;
+ ARROW_ASSIGN_OR_RAISE(auto offsets_buffer,
+ AllocateBuffer((length + 1) * offset_byte_width,
pool));
+ // Ensure the first offset is zero
+ memset(offsets_buffer->mutable_data(), 0, offset_byte_width);
+ offsets_buffer->ZeroPadding();
+ values_data_buffers = {std::move(validity_buffer),
std::move(offsets_buffer),
+ std::move(values_buffer)};
+ } else {
+ values_data_buffers = {std::move(validity_buffer),
std::move(values_buffer)};
+ }
+ return ArrayData::Make(value_type, length, std::move(values_data_buffers),
null_count);
+}
+
+Result<std::shared_ptr<ArrayData>> PreallocateREEArray(
+ std::shared_ptr<RunEndEncodedType> ree_type, bool has_validity_buffer,
+ int64_t logical_length, int64_t physical_length, int64_t
physical_null_count,
+ MemoryPool* pool, int64_t data_buffer_size) {
+ ARROW_ASSIGN_OR_RAISE(
+ auto run_ends_data,
+ PreallocateRunEndsArray(ree_type->run_end_type(), physical_length,
pool));
+ ARROW_ASSIGN_OR_RAISE(
+ auto values_data,
+ PreallocateValuesArray(ree_type->value_type(), has_validity_buffer,
physical_length,
+ physical_null_count, pool, data_buffer_size));
+
+ return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR},
+ {std::move(run_ends_data), std::move(values_data)},
+ /*null_count=*/0);
+}
+
+void WriteSingleRunEnd(ArrayData* run_ends_data, int64_t run_end) {
+ DCHECK_GT(run_end, 0);
+ DCHECK(is_run_end_type(run_ends_data->type->id()));
+ auto* output_run_ends = run_ends_data->template GetMutableValues<uint8_t>(1);
+ switch (run_ends_data->type->id()) {
+ case Type::INT16:
+ *reinterpret_cast<int16_t*>(output_run_ends) =
static_cast<int16_t>(run_end);
+ break;
+ case Type::INT32:
+ *reinterpret_cast<int32_t*>(output_run_ends) =
static_cast<int32_t>(run_end);
+ break;
+ default:
+ DCHECK_EQ(run_ends_data->type->id(), Type::INT64);
+ *reinterpret_cast<int64_t*>(output_run_ends) =
static_cast<int64_t>(run_end);
+ break;
+ }
+}
+
+Result<std::shared_ptr<ArrayData>> MakeNullREEArray(
+ const std::shared_ptr<DataType>& run_end_type, int64_t logical_length,
+ MemoryPool* pool) {
+ auto ree_type = std::make_shared<RunEndEncodedType>(run_end_type, null());
+ const int64_t physical_length = logical_length > 0 ? 1 : 0;
+ ARROW_ASSIGN_OR_RAISE(auto run_ends_data,
+ PreallocateRunEndsArray(run_end_type, physical_length,
pool));
+ if (logical_length > 0) {
+ WriteSingleRunEnd(run_ends_data.get(), logical_length);
+ }
+ auto values_data = ArrayData::Make(null(), physical_length, {NULLPTR},
+ /*null_count=*/physical_length);
+ return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR},
+ {std::move(run_ends_data), std::move(values_data)},
+ /*null_count=*/0);
+}
+
+} // namespace ree_util
+} // namespace internal
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/ree_util_internal.h
b/cpp/src/arrow/compute/kernels/ree_util_internal.h
new file mode 100644
index 0000000000..8e06427b8d
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/ree_util_internal.h
@@ -0,0 +1,339 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Useful operations to implement kernels handling run-end encoded arrays
+
+#include <algorithm>
+#include <cstdint>
+#include <limits>
+#include <memory>
+
+#include "arrow/array/data.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+namespace ree_util {
+
+template <typename ArrowType, bool in_has_validity_buffer,
+ bool out_has_validity_buffer = in_has_validity_buffer, typename
Enable = void>
+struct ReadWriteValue {};
+
+// Numeric and primitive C-compatible types
+template <typename ArrowType, bool in_has_validity_buffer, bool
out_has_validity_buffer>
+class ReadWriteValue<ArrowType, in_has_validity_buffer,
out_has_validity_buffer,
+ enable_if_has_c_type<ArrowType>> {
+ public:
+ using ValueRepr = typename ArrowType::c_type;
+
+ private:
+ const uint8_t* input_validity_;
+ const uint8_t* input_values_;
+
+ // Needed only by the writing functions
+ uint8_t* output_validity_;
+ uint8_t* output_values_;
+
+ public:
+ explicit ReadWriteValue(const ArraySpan& input_values_array,
+ ArrayData* output_values_array_data)
+ : input_validity_(in_has_validity_buffer ?
input_values_array.buffers[0].data
+ : NULLPTR),
+ input_values_(input_values_array.buffers[1].data),
+ output_validity_((out_has_validity_buffer && output_values_array_data)
+ ?
output_values_array_data->buffers[0]->mutable_data()
+ : NULLPTR),
+ output_values_(output_values_array_data
+ ?
output_values_array_data->buffers[1]->mutable_data()
+ : NULLPTR) {}
+
+ [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const {
+ bool valid = true;
+ if constexpr (in_has_validity_buffer) {
+ valid = bit_util::GetBit(input_validity_, read_offset);
+ }
+ if constexpr (std::is_same_v<ArrowType, BooleanType>) {
+ *out = bit_util::GetBit(input_values_, read_offset);
+ } else {
+ *out = (reinterpret_cast<const ValueRepr*>(input_values_))[read_offset];
+ }
+ return valid;
+ }
+
+ /// \brief Ensure padding is zeroed in validity bitmap.
+ void ZeroValidityPadding(int64_t length) const {
+ DCHECK(output_values_);
+ if constexpr (out_has_validity_buffer) {
+ DCHECK(output_validity_);
+ const int64_t validity_buffer_size = bit_util::BytesForBits(length);
+ output_validity_[validity_buffer_size - 1] = 0;
+ }
+ }
+
+ void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const {
+ if constexpr (out_has_validity_buffer) {
+ bit_util::SetBitTo(output_validity_, write_offset, valid);
+ }
+ if (valid) {
+ if constexpr (std::is_same_v<ArrowType, BooleanType>) {
+ bit_util::SetBitTo(output_values_, write_offset, value);
+ } else {
+ (reinterpret_cast<ValueRepr*>(output_values_))[write_offset] = value;
+ }
+ }
+ }
+
+ void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
+ ValueRepr value) const {
+ if constexpr (out_has_validity_buffer) {
+ bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid);
+ }
+ if (valid) {
+ if constexpr (std::is_same_v<ArrowType, BooleanType>) {
+ bit_util::SetBitsTo(reinterpret_cast<uint8_t*>(output_values_),
write_offset,
+ run_length, value);
+ } else {
+ auto* output_values_c = reinterpret_cast<ValueRepr*>(output_values_);
+ std::fill(output_values_c + write_offset,
+ output_values_c + write_offset + run_length, value);
+ }
+ }
+ }
+
+ bool Compare(ValueRepr lhs, ValueRepr rhs) const { return lhs == rhs; }
+};
+
+// FixedSizeBinary, Decimal128
+template <typename ArrowType, bool in_has_validity_buffer, bool
out_has_validity_buffer>
+class ReadWriteValue<ArrowType, in_has_validity_buffer,
out_has_validity_buffer,
+ enable_if_fixed_size_binary<ArrowType>> {
+ public:
+ // Every value is represented as a pointer to byte_width_ bytes
+ using ValueRepr = uint8_t const*;
+
+ private:
+ const uint8_t* input_validity_;
+ const uint8_t* input_values_;
+
+ // Needed only by the writing functions
+ uint8_t* output_validity_;
+ uint8_t* output_values_;
+
+ const size_t byte_width_;
+
+ public:
+ ReadWriteValue(const ArraySpan& input_values_array, ArrayData*
output_values_array_data)
+ : input_validity_(in_has_validity_buffer ?
input_values_array.buffers[0].data
+ : NULLPTR),
+ input_values_(input_values_array.buffers[1].data),
+ output_validity_((out_has_validity_buffer && output_values_array_data)
+ ?
output_values_array_data->buffers[0]->mutable_data()
+ : NULLPTR),
+ output_values_(output_values_array_data
+ ?
output_values_array_data->buffers[1]->mutable_data()
+ : NULLPTR),
+ byte_width_(input_values_array.type->byte_width()) {}
+
+ [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const {
+ bool valid = true;
+ if constexpr (in_has_validity_buffer) {
+ valid = bit_util::GetBit(input_validity_, read_offset);
+ }
+ *out = input_values_ + (read_offset * byte_width_);
+ return valid;
+ }
+
+ /// \brief Ensure padding is zeroed in validity bitmap.
+ void ZeroValidityPadding(int64_t length) const {
+ DCHECK(output_values_);
+ if constexpr (out_has_validity_buffer) {
+ DCHECK(output_validity_);
+ const int64_t validity_buffer_size = bit_util::BytesForBits(length);
+ output_validity_[validity_buffer_size - 1] = 0;
+ }
+ }
+
+ void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const {
+ if constexpr (out_has_validity_buffer) {
+ bit_util::SetBitTo(output_validity_, write_offset, valid);
+ }
+ if (valid) {
+ memcpy(output_values_ + (write_offset * byte_width_), value,
byte_width_);
+ }
+ }
+
+ void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
+ ValueRepr value) const {
+ if constexpr (out_has_validity_buffer) {
+ bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid);
+ }
+ if (valid) {
+ uint8_t* ptr = output_values_ + (write_offset * byte_width_);
+ for (int64_t i = 0; i < run_length; ++i) {
+ memcpy(ptr, value, byte_width_);
+ ptr += byte_width_;
+ }
+ }
+ }
+
+ bool Compare(ValueRepr lhs, ValueRepr rhs) const {
+ return memcmp(lhs, rhs, byte_width_) == 0;
+ }
+};
+
+// Binary, String...
+template <typename ArrowType, bool in_has_validity_buffer, bool
out_has_validity_buffer>
+class ReadWriteValue<ArrowType, in_has_validity_buffer,
out_has_validity_buffer,
+ enable_if_base_binary<ArrowType>> {
+ public:
+ using ValueRepr = std::string_view;
+ using offset_type = typename ArrowType::offset_type;
+
+ private:
+ const uint8_t* input_validity_;
+ const offset_type* input_offsets_;
+ const uint8_t* input_values_;
+
+ // Needed only by the writing functions
+ uint8_t* output_validity_;
+ offset_type* output_offsets_;
+ uint8_t* output_values_;
+
+ public:
+ ReadWriteValue(const ArraySpan& input_values_array, ArrayData*
output_values_array_data)
+ : input_validity_(in_has_validity_buffer ?
input_values_array.buffers[0].data
+ : NULLPTR),
+ input_offsets_(input_values_array.template GetValues<offset_type>(1,
0)),
+ input_values_(input_values_array.buffers[2].data),
+ output_validity_((out_has_validity_buffer && output_values_array_data)
+ ?
output_values_array_data->buffers[0]->mutable_data()
+ : NULLPTR),
+ output_offsets_(
+ output_values_array_data
+ ? output_values_array_data->template
GetMutableValues<offset_type>(1, 0)
+ : NULLPTR),
+ output_values_(output_values_array_data
+ ?
output_values_array_data->buffers[2]->mutable_data()
+ : NULLPTR) {}
+
+ [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const {
+ bool valid = true;
+ if constexpr (in_has_validity_buffer) {
+ valid = bit_util::GetBit(input_validity_, read_offset);
+ }
+ if (valid) {
+ const offset_type offset0 = input_offsets_[read_offset];
+ const offset_type offset1 = input_offsets_[read_offset + 1];
+ *out = std::string_view(reinterpret_cast<const char*>(input_values_ +
offset0),
+ offset1 - offset0);
+ }
+ return valid;
+ }
+
+ /// \brief Ensure padding is zeroed in validity bitmap.
+ void ZeroValidityPadding(int64_t length) const {
+ DCHECK(output_values_);
+ if constexpr (out_has_validity_buffer) {
+ DCHECK(output_validity_);
+ const int64_t validity_buffer_size = bit_util::BytesForBits(length);
+ output_validity_[validity_buffer_size - 1] = 0;
+ }
+ }
+
+ void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const {
+ if constexpr (out_has_validity_buffer) {
+ bit_util::SetBitTo(output_validity_, write_offset, valid);
+ }
+ const offset_type offset0 = output_offsets_[write_offset];
+ const offset_type offset1 =
+ offset0 + (valid ? static_cast<offset_type>(value.size()) : 0);
+ output_offsets_[write_offset + 1] = offset1;
+ if (valid) {
+ memcpy(output_values_ + offset0, value.data(), value.size());
+ }
+ }
+
+ void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
+ ValueRepr value) const {
+ if constexpr (out_has_validity_buffer) {
+ bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid);
+ }
+ if (valid) {
+ int64_t i = write_offset;
+ offset_type offset = output_offsets_[i];
+ while (i < write_offset + run_length) {
+ memcpy(output_values_ + offset, value.data(), value.size());
+ offset += static_cast<offset_type>(value.size());
+ i += 1;
+ output_offsets_[i] = offset;
+ }
+ } else {
+ offset_type offset = output_offsets_[write_offset];
+ offset_type* begin = output_offsets_ + write_offset + 1;
+ std::fill(begin, begin + run_length, offset);
+ }
+ }
+
+ bool Compare(ValueRepr lhs, ValueRepr rhs) const { return lhs == rhs; }
+};
+
+Result<std::shared_ptr<Buffer>> AllocateValuesBuffer(int64_t length, const
DataType& type,
+ MemoryPool* pool,
+ int64_t data_buffer_size);
+
+Result<std::shared_ptr<ArrayData>> PreallocateRunEndsArray(
+ const std::shared_ptr<DataType>& run_end_type, int64_t physical_length,
+ MemoryPool* pool);
+
+Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
+ const std::shared_ptr<DataType>& value_type, bool has_validity_buffer,
int64_t length,
+ int64_t null_count, MemoryPool* pool, int64_t data_buffer_size);
+
+/// \brief Preallocate the ArrayData for the run-end encoded version
+/// of the flat input array
+///
+/// \param data_buffer_size the size of the data buffer for string and binary
types
+Result<std::shared_ptr<ArrayData>> PreallocateREEArray(
+ std::shared_ptr<RunEndEncodedType> ree_type, bool has_validity_buffer,
+ int64_t logical_length, int64_t physical_length, int64_t
physical_null_count,
+ MemoryPool* pool, int64_t data_buffer_size);
+
+/// \brief Writes a single run-end to the first slot of the pre-allocated
+/// run-end encoded array in out
+///
+/// Pre-conditions:
+/// - run_ends_data is of a valid run-ends type
+/// - run_ends_data has at least one slot
+/// - run_end > 0
+/// - run_ends fits in the run-end type without overflow
+void WriteSingleRunEnd(ArrayData* run_ends_data, int64_t run_end);
+
+Result<std::shared_ptr<ArrayData>> MakeNullREEArray(
+ const std::shared_ptr<DataType>& run_end_type, int64_t logical_length,
+ MemoryPool* pool);
+
+} // namespace ree_util
+} // namespace internal
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
b/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
index 2fa260321c..c0f54fbf5d 100644
--- a/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
+++ b/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
@@ -20,6 +20,7 @@
#include "arrow/compute/api_vector.h"
#include "arrow/compute/kernel.h"
#include "arrow/compute/kernels/common_internal.h"
+#include "arrow/compute/kernels/ree_util_internal.h"
#include "arrow/type_traits.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/ree_util.h"
@@ -28,282 +29,6 @@ namespace arrow {
namespace compute {
namespace internal {
-template <typename ArrowType, bool has_validity_buffer, typename Enable = void>
-struct ReadWriteValueImpl {};
-
-// Numeric and primitive C-compatible types
-template <typename ArrowType, bool has_validity_buffer>
-class ReadWriteValueImpl<ArrowType, has_validity_buffer,
- enable_if_has_c_type<ArrowType>> {
- public:
- using ValueRepr = typename ArrowType::c_type;
-
- private:
- const uint8_t* input_validity_;
- const uint8_t* input_values_;
-
- // Needed only by the writing functions
- uint8_t* output_validity_;
- uint8_t* output_values_;
-
- public:
- explicit ReadWriteValueImpl(const ArraySpan& input_values_array,
- ArrayData* output_values_array_data)
- : input_validity_(has_validity_buffer ?
input_values_array.buffers[0].data
- : NULLPTR),
- input_values_(input_values_array.buffers[1].data),
- output_validity_((has_validity_buffer && output_values_array_data)
- ?
output_values_array_data->buffers[0]->mutable_data()
- : NULLPTR),
- output_values_(output_values_array_data
- ?
output_values_array_data->buffers[1]->mutable_data()
- : NULLPTR) {}
-
- [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const {
- bool valid = true;
- if constexpr (has_validity_buffer) {
- valid = bit_util::GetBit(input_validity_, read_offset);
- }
- if constexpr (std::is_same_v<ArrowType, BooleanType>) {
- *out = bit_util::GetBit(input_values_, read_offset);
- } else {
- *out = (reinterpret_cast<const ValueRepr*>(input_values_))[read_offset];
- }
- return valid;
- }
-
- /// \brief Ensure padding is zeroed in validity bitmap.
- void ZeroValidityPadding(int64_t length) const {
- DCHECK(output_values_);
- if constexpr (has_validity_buffer) {
- DCHECK(output_validity_);
- const int64_t validity_buffer_size = bit_util::BytesForBits(length);
- output_validity_[validity_buffer_size - 1] = 0;
- }
- }
-
- void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const {
- if constexpr (has_validity_buffer) {
- bit_util::SetBitTo(output_validity_, write_offset, valid);
- }
- if (valid) {
- if constexpr (std::is_same_v<ArrowType, BooleanType>) {
- bit_util::SetBitTo(output_values_, write_offset, value);
- } else {
- (reinterpret_cast<ValueRepr*>(output_values_))[write_offset] = value;
- }
- }
- }
-
- void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
- ValueRepr value) const {
- if constexpr (has_validity_buffer) {
- bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid);
- }
- if (valid) {
- if constexpr (std::is_same_v<ArrowType, BooleanType>) {
- bit_util::SetBitsTo(reinterpret_cast<uint8_t*>(output_values_),
write_offset,
- run_length, value);
- } else {
- auto* output_values_c = reinterpret_cast<ValueRepr*>(output_values_);
- std::fill(output_values_c + write_offset,
- output_values_c + write_offset + run_length, value);
- }
- }
- }
-
- bool Compare(ValueRepr lhs, ValueRepr rhs) const { return lhs == rhs; }
-};
-
-// FixedSizeBinary, Decimal128
-template <typename ArrowType, bool has_validity_buffer>
-class ReadWriteValueImpl<ArrowType, has_validity_buffer,
- enable_if_fixed_size_binary<ArrowType>> {
- public:
- // Every value is represented as a pointer to byte_width_ bytes
- using ValueRepr = uint8_t const*;
-
- private:
- const uint8_t* input_validity_;
- const uint8_t* input_values_;
-
- // Needed only by the writing functions
- uint8_t* output_validity_;
- uint8_t* output_values_;
-
- const size_t byte_width_;
-
- public:
- ReadWriteValueImpl(const ArraySpan& input_values_array,
- ArrayData* output_values_array_data)
- : input_validity_(has_validity_buffer ?
input_values_array.buffers[0].data
- : NULLPTR),
- input_values_(input_values_array.buffers[1].data),
- output_validity_((has_validity_buffer && output_values_array_data)
- ?
output_values_array_data->buffers[0]->mutable_data()
- : NULLPTR),
- output_values_(output_values_array_data
- ?
output_values_array_data->buffers[1]->mutable_data()
- : NULLPTR),
- byte_width_(input_values_array.type->byte_width()) {}
-
- [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const {
- bool valid = true;
- if constexpr (has_validity_buffer) {
- valid = bit_util::GetBit(input_validity_, read_offset);
- }
- *out = input_values_ + (read_offset * byte_width_);
- return valid;
- }
-
- /// \brief Ensure padding is zeroed in validity bitmap.
- void ZeroValidityPadding(int64_t length) const {
- DCHECK(output_values_);
- if constexpr (has_validity_buffer) {
- DCHECK(output_validity_);
- const int64_t validity_buffer_size = bit_util::BytesForBits(length);
- output_validity_[validity_buffer_size - 1] = 0;
- }
- }
-
- void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const {
- if constexpr (has_validity_buffer) {
- bit_util::SetBitTo(output_validity_, write_offset, valid);
- }
- if (valid) {
- memcpy(output_values_ + (write_offset * byte_width_), value,
byte_width_);
- }
- }
-
- void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
- ValueRepr value) const {
- if constexpr (has_validity_buffer) {
- bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid);
- }
- if (valid) {
- uint8_t* ptr = output_values_ + (write_offset * byte_width_);
- for (int64_t i = 0; i < run_length; ++i) {
- memcpy(ptr, value, byte_width_);
- ptr += byte_width_;
- }
- }
- }
-
- bool Compare(ValueRepr lhs, ValueRepr rhs) const {
- return memcmp(lhs, rhs, byte_width_) == 0;
- }
-};
-
-// Binary, String...
-template <typename ArrowType, bool has_validity_buffer>
-class ReadWriteValueImpl<ArrowType, has_validity_buffer,
- enable_if_base_binary<ArrowType>> {
- public:
- using ValueRepr = std::string_view;
- using offset_type = typename ArrowType::offset_type;
-
- private:
- const uint8_t* input_validity_;
- const offset_type* input_offsets_;
- const uint8_t* input_values_;
-
- // Needed only by the writing functions
- uint8_t* output_validity_;
- offset_type* output_offsets_;
- uint8_t* output_values_;
-
- public:
- ReadWriteValueImpl(const ArraySpan& input_values_array,
- ArrayData* output_values_array_data)
- : input_validity_(has_validity_buffer ?
input_values_array.buffers[0].data
- : NULLPTR),
- input_offsets_(input_values_array.template GetValues<offset_type>(1,
0)),
- input_values_(input_values_array.buffers[2].data),
- output_validity_((has_validity_buffer && output_values_array_data)
- ?
output_values_array_data->buffers[0]->mutable_data()
- : NULLPTR),
- output_offsets_(
- output_values_array_data
- ? output_values_array_data->template
GetMutableValues<offset_type>(1, 0)
- : NULLPTR),
- output_values_(output_values_array_data
- ?
output_values_array_data->buffers[2]->mutable_data()
- : NULLPTR) {}
-
- [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const {
- bool valid = true;
- if constexpr (has_validity_buffer) {
- valid = bit_util::GetBit(input_validity_, read_offset);
- }
- if (valid) {
- const offset_type offset0 = input_offsets_[read_offset];
- const offset_type offset1 = input_offsets_[read_offset + 1];
- *out = std::string_view(reinterpret_cast<const char*>(input_values_ +
offset0),
- offset1 - offset0);
- }
- return valid;
- }
-
- /// \brief Ensure padding is zeroed in validity bitmap.
- void ZeroValidityPadding(int64_t length) const {
- DCHECK(output_values_);
- if constexpr (has_validity_buffer) {
- DCHECK(output_validity_);
- const int64_t validity_buffer_size = bit_util::BytesForBits(length);
- output_validity_[validity_buffer_size - 1] = 0;
- }
- }
-
- void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const {
- if constexpr (has_validity_buffer) {
- bit_util::SetBitTo(output_validity_, write_offset, valid);
- }
- const offset_type offset0 = output_offsets_[write_offset];
- const offset_type offset1 =
- offset0 + (valid ? static_cast<offset_type>(value.size()) : 0);
- output_offsets_[write_offset + 1] = offset1;
- if (valid) {
- memcpy(output_values_ + offset0, value.data(), value.size());
- }
- }
-
- void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
- ValueRepr value) const {
- if constexpr (has_validity_buffer) {
- bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid);
- }
- if (valid) {
- int64_t i = write_offset;
- offset_type offset = output_offsets_[i];
- while (i < write_offset + run_length) {
- memcpy(output_values_ + offset, value.data(), value.size());
- offset += static_cast<offset_type>(value.size());
- i += 1;
- output_offsets_[i] = offset;
- }
- } else {
- offset_type offset = output_offsets_[write_offset];
- offset_type* begin = output_offsets_ + write_offset + 1;
- std::fill(begin, begin + run_length, offset);
- }
- }
-
- bool Compare(ValueRepr lhs, ValueRepr rhs) const { return lhs == rhs; }
-};
-
-Result<std::shared_ptr<Buffer>> AllocateValuesBuffer(int64_t length, const
DataType& type,
- MemoryPool* pool,
- int64_t data_buffer_size)
{
- if (type.bit_width() == 1) {
- return AllocateBitmap(length, pool);
- } else if (is_fixed_width(type.id())) {
- return AllocateBuffer(length * type.byte_width(), pool);
- } else {
- DCHECK(is_base_binary_like(type.id()));
- return AllocateBuffer(data_buffer_size, pool);
- }
-}
-
struct RunEndEncondingState : public KernelState {
explicit RunEndEncondingState(std::shared_ptr<DataType> run_end_type)
: run_end_type{std::move(run_end_type)} {}
@@ -319,7 +44,7 @@ class RunEndEncodingLoop {
using RunEndCType = typename RunEndType::c_type;
private:
- using ReadWriteValue = ReadWriteValueImpl<ValueType, has_validity_buffer>;
+ using ReadWriteValue = ree_util::ReadWriteValue<ValueType,
has_validity_buffer>;
using ValueRepr = typename ReadWriteValue::ValueRepr;
private:
@@ -407,75 +132,29 @@ class RunEndEncodingLoop {
}
};
-template <typename RunEndType>
-Status ValidateRunEndType(int64_t input_length) {
- using RunEndCType = typename RunEndType::c_type;
- constexpr int64_t kRunEndMax = std::numeric_limits<RunEndCType>::max();
- if (input_length < 0 || input_length > kRunEndMax) {
+ARROW_NOINLINE Status ValidateRunEndType(const std::shared_ptr<DataType>&
run_end_type,
+ int64_t input_length) {
+ int64_t run_end_max = std::numeric_limits<int64_t>::max();
+ switch (run_end_type->id()) {
+ case Type::INT16:
+ run_end_max = std::numeric_limits<int16_t>::max();
+ break;
+ case Type::INT32:
+ run_end_max = std::numeric_limits<int32_t>::max();
+ break;
+ default:
+ DCHECK_EQ(run_end_type->id(), Type::INT64);
+ break;
+ }
+ if (input_length < 0 || input_length > run_end_max) {
return Status::Invalid(
"Cannot run-end encode Arrays with more elements than the "
"run end type can hold: ",
- kRunEndMax);
+ run_end_max);
}
return Status::OK();
}
-Result<std::shared_ptr<ArrayData>> PreallocateRunEndsArray(
- const std::shared_ptr<DataType>& run_end_type, int64_t physical_length,
- MemoryPool* pool) {
- ARROW_ASSIGN_OR_RAISE(
- auto run_ends_buffer,
- AllocateBuffer(physical_length * run_end_type->byte_width(), pool));
- return ArrayData::Make(run_end_type, physical_length,
- {NULLPTR, std::move(run_ends_buffer)},
/*null_count=*/0);
-}
-
-ARROW_NOINLINE Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
- const std::shared_ptr<DataType>& value_type, bool has_validity_buffer,
int64_t length,
- int64_t null_count, MemoryPool* pool, int64_t data_buffer_size) {
- std::vector<std::shared_ptr<Buffer>> values_data_buffers;
- std::shared_ptr<Buffer> validity_buffer = NULLPTR;
- if (has_validity_buffer) {
- ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(length, pool));
- }
- ARROW_ASSIGN_OR_RAISE(auto values_buffer, AllocateValuesBuffer(length,
*value_type,
- pool,
data_buffer_size));
- if (is_base_binary_like(value_type->id())) {
- const int offset_byte_width = offset_bit_width(value_type->id()) / 8;
- ARROW_ASSIGN_OR_RAISE(auto offsets_buffer,
- AllocateBuffer((length + 1) * offset_byte_width,
pool));
- // Ensure the first offset is zero
- memset(offsets_buffer->mutable_data(), 0, offset_byte_width);
- offsets_buffer->ZeroPadding();
- values_data_buffers = {std::move(validity_buffer),
std::move(offsets_buffer),
- std::move(values_buffer)};
- } else {
- values_data_buffers = {std::move(validity_buffer),
std::move(values_buffer)};
- }
- return ArrayData::Make(value_type, length, std::move(values_data_buffers),
null_count);
-}
-
-/// \brief Preallocate the ArrayData for the run-end encoded version
-/// of the flat input array
-///
-/// \param data_buffer_size the size of the data buffer for string and binary
types
-ARROW_NOINLINE Result<std::shared_ptr<ArrayData>> PreallocateREEArray(
- std::shared_ptr<RunEndEncodedType> ree_type, bool has_validity_buffer,
- int64_t logical_length, int64_t physical_length, int64_t
physical_null_count,
- MemoryPool* pool, int64_t data_buffer_size) {
- ARROW_ASSIGN_OR_RAISE(
- auto run_ends_data,
- PreallocateRunEndsArray(ree_type->run_end_type(), physical_length,
pool));
- ARROW_ASSIGN_OR_RAISE(
- auto values_data,
- PreallocateValuesArray(ree_type->value_type(), has_validity_buffer,
physical_length,
- physical_null_count, pool, data_buffer_size));
-
- return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR},
- {std::move(run_ends_data), std::move(values_data)},
- /*null_count=*/0);
-}
-
template <typename RunEndType, typename ValueType, bool has_validity_buffer>
class RunEndEncodeImpl {
private:
@@ -492,13 +171,14 @@ class RunEndEncodeImpl {
Status Exec() {
const int64_t input_length = input_array_.length;
+ auto run_end_type = TypeTraits<RunEndType>::type_singleton();
auto ree_type = std::make_shared<RunEndEncodedType>(
- TypeTraits<RunEndType>::type_singleton(),
input_array_.type->GetSharedPtr());
+ run_end_type, input_array_.type->GetSharedPtr());
if (input_length == 0) {
ARROW_ASSIGN_OR_RAISE(
auto output_array_data,
- PreallocateREEArray(std::move(ree_type), has_validity_buffer,
input_length, 0,
- 0, ctx_->memory_pool(), 0));
+ ree_util::PreallocateREEArray(std::move(ree_type),
has_validity_buffer,
+ input_length, 0, 0,
ctx_->memory_pool(), 0));
output_->value = std::move(output_array_data);
return Status::OK();
}
@@ -507,7 +187,7 @@ class RunEndEncodeImpl {
int64_t num_valid_runs = 0;
int64_t num_output_runs = 0;
int64_t data_buffer_size = 0; // for string and binary types
- RETURN_NOT_OK(ValidateRunEndType<RunEndType>(input_length));
+ RETURN_NOT_OK(ValidateRunEndType(run_end_type, input_length));
RunEndEncodingLoop<RunEndType, ValueType, has_validity_buffer>
counting_loop(
input_array_,
@@ -518,9 +198,9 @@ class RunEndEncodeImpl {
ARROW_ASSIGN_OR_RAISE(
auto output_array_data,
- PreallocateREEArray(std::move(ree_type), has_validity_buffer,
input_length,
- num_output_runs, num_output_runs - num_valid_runs,
- ctx_->memory_pool(), data_buffer_size));
+ ree_util::PreallocateREEArray(
+ std::move(ree_type), has_validity_buffer, input_length,
num_output_runs,
+ num_output_runs - num_valid_runs, ctx_->memory_pool(),
data_buffer_size));
// Initialize the output pointers
auto* output_run_ends =
@@ -538,51 +218,27 @@ class RunEndEncodeImpl {
}
};
-template <typename RunEndType>
-Result<std::shared_ptr<ArrayData>> PreallocateNullREEArray(int64_t
logical_length,
- int64_t
physical_length,
- MemoryPool* pool) {
- ARROW_ASSIGN_OR_RAISE(
- auto run_ends_buffer,
- AllocateBuffer(TypeTraits<RunEndType>::bytes_required(physical_length),
pool));
-
- auto ree_type =
- std::make_shared<RunEndEncodedType>(std::make_shared<RunEndType>(),
null());
- auto run_ends_data = ArrayData::Make(std::make_shared<RunEndType>(),
physical_length,
- {NULLPTR, std::move(run_ends_buffer)},
- /*null_count=*/0);
- auto values_data = ArrayData::Make(null(), physical_length, {NULLPTR},
- /*null_count=*/physical_length);
- return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR},
- {std::move(run_ends_data), std::move(values_data)},
- /*null_count=*/0);
-}
-
-template <typename RunEndType>
-Status RunEndEncodeNullArray(KernelContext* ctx, const ArraySpan& input_array,
- ExecResult* output) {
- using RunEndCType = typename RunEndType::c_type;
-
+ARROW_NOINLINE Status RunEndEncodeNullArray(const std::shared_ptr<DataType>&
run_end_type,
+ KernelContext* ctx,
+ const ArraySpan& input_array,
+ ExecResult* output) {
const int64_t input_length = input_array.length;
DCHECK(input_array.type->id() == Type::NA);
if (input_length == 0) {
- ARROW_ASSIGN_OR_RAISE(auto output_array_data,
- PreallocateNullREEArray<RunEndType>(0, 0,
ctx->memory_pool()));
+ ARROW_ASSIGN_OR_RAISE(
+ auto output_array_data,
+ ree_util::MakeNullREEArray(run_end_type, 0, ctx->memory_pool()));
output->value = std::move(output_array_data);
return Status::OK();
}
// Abort if run-end type cannot hold the input length
- RETURN_NOT_OK(ValidateRunEndType<RunEndType>(input_array.length));
-
- ARROW_ASSIGN_OR_RAISE(auto output_array_data,
PreallocateNullREEArray<RunEndType>(
- input_length, 1,
ctx->memory_pool()));
+ RETURN_NOT_OK(ValidateRunEndType(run_end_type, input_array.length));
- // Write the single run-end this REE has
- auto* output_run_ends =
- output_array_data->child_data[0]->template
GetMutableValues<RunEndCType>(1, 0);
- output_run_ends[0] = static_cast<RunEndCType>(input_length);
+ ARROW_ASSIGN_OR_RAISE(
+ auto output_array_data,
+ ree_util::MakeNullREEArray(run_end_type, input_length,
ctx->memory_pool()));
output->value = std::move(output_array_data);
return Status::OK();
@@ -594,7 +250,8 @@ struct RunEndEncodeExec {
DCHECK(span.values[0].is_array());
const auto& input_array = span.values[0].array;
if constexpr (ValueType::type_id == Type::NA) {
- return RunEndEncodeNullArray<RunEndType>(ctx, input_array, result);
+ return RunEndEncodeNullArray(TypeTraits<RunEndType>::type_singleton(),
ctx,
+ input_array, result);
} else {
const bool has_validity_buffer = input_array.MayHaveNulls();
if (has_validity_buffer) {
@@ -645,7 +302,7 @@ class RunEndDecodingLoop {
using RunEndCType = typename RunEndType::c_type;
private:
- using ReadWriteValue = ReadWriteValueImpl<ValueType, has_validity_buffer>;
+ using ReadWriteValue = ree_util::ReadWriteValue<ValueType,
has_validity_buffer>;
using ValueRepr = typename ReadWriteValue::ValueRepr;
const ArraySpan& input_array_;
@@ -660,19 +317,20 @@ class RunEndDecodingLoop {
public:
RunEndDecodingLoop(const ArraySpan& input_array, ArrayData*
output_array_data)
- : RunEndDecodingLoop(input_array, ree_util::ValuesArray(input_array),
+ : RunEndDecodingLoop(input_array,
arrow::ree_util::ValuesArray(input_array),
output_array_data) {}
/// \brief For variable-length types, calculate the total length of the data
/// buffer needed to store the expanded values.
int64_t CalculateOutputDataBufferSize() const {
- auto& input_array_values = ree_util::ValuesArray(input_array_);
+ auto& input_array_values = arrow::ree_util::ValuesArray(input_array_);
DCHECK_EQ(input_array_values.type->id(), ValueType::type_id);
if constexpr (is_base_binary_like(ValueType::type_id)) {
using offset_type = typename ValueType::offset_type;
int64_t data_buffer_size = 0;
- const ree_util::RunEndEncodedArraySpan<RunEndCType>
ree_array_span(input_array_);
+ const arrow::ree_util::RunEndEncodedArraySpan<RunEndCType>
ree_array_span(
+ input_array_);
const auto* offsets_buffer =
input_array_values.template GetValues<offset_type>(1, 0);
auto it = ree_array_span.begin();
@@ -695,7 +353,8 @@ class RunEndDecodingLoop {
ARROW_NOINLINE int64_t ExpandAllRuns() {
read_write_value_.ZeroValidityPadding(input_array_.length);
- const ree_util::RunEndEncodedArraySpan<RunEndCType>
ree_array_span(input_array_);
+ const arrow::ree_util::RunEndEncodedArraySpan<RunEndCType> ree_array_span(
+ input_array_);
int64_t write_offset = 0;
int64_t output_valid_count = 0;
for (auto it = ree_array_span.begin(); !it.is_end(ree_array_span); ++it) {
@@ -738,10 +397,10 @@ class RunEndDecodeImpl {
}
}
- ARROW_ASSIGN_OR_RAISE(
- auto output_array_data,
- PreallocateValuesArray(ree_type->value_type(), has_validity_buffer,
length,
- kUnknownNullCount, ctx_->memory_pool(),
data_buffer_size));
+ ARROW_ASSIGN_OR_RAISE(auto output_array_data,
+ ree_util::PreallocateValuesArray(
+ ree_type->value_type(), has_validity_buffer,
length,
+ kUnknownNullCount, ctx_->memory_pool(),
data_buffer_size));
int64_t output_null_count = 0;
if (length > 0) {
@@ -774,7 +433,8 @@ struct RunEndDecodeExec {
if constexpr (ValueType::type_id == Type::NA) {
return RunEndDecodeNullREEArray(ctx, input_array, result);
} else {
- const bool has_validity_buffer =
ree_util::ValuesArray(input_array).MayHaveNulls();
+ const bool has_validity_buffer =
+ arrow::ree_util::ValuesArray(input_array).MayHaveNulls();
if (has_validity_buffer) {
return RunEndDecodeImpl<RunEndType, ValueType, true>(ctx, input_array,
result)
.Exec();