This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new ff94702eee MINOR: [C++] Make the utilities used to implement 
run_end_encode available to other compute kernels (#35002)
ff94702eee is described below

commit ff94702eee6ee419329d32d86c14bf0bf0c244f3
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Thu Apr 13 15:02:51 2023 -0300

    MINOR: [C++] Make the utilities used to implement run_end_encode available 
to other compute kernels (#35002)
    
    ### Rationale for this change
    
    Make it easier to implement correct kernels dealing with run-end encoded 
data.
    
    ### What changes are included in this PR?
    
    Extraction of code to a header so kernels like REE filter kernels (see 
#35001) can use them.
    
    ### Are these changes tested?
    
    Indirectly via the `run_end_encode` and `run_end_decode` tests.
    
    ### Are there any user-facing changes?
    
    No, these are internal compute headers.
    
    Authored-by: Felipe Oliveira Carvalho <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 cpp/src/arrow/CMakeLists.txt                       |   1 +
 cpp/src/arrow/compute/kernels/ree_util_internal.cc | 139 +++++++
 cpp/src/arrow/compute/kernels/ree_util_internal.h  | 339 ++++++++++++++++
 .../arrow/compute/kernels/vector_run_end_encode.cc | 444 +++------------------
 4 files changed, 531 insertions(+), 392 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 4c91dc57b4..98ab2cc6d4 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -402,6 +402,7 @@ list(APPEND
      compute/registry.cc
      compute/kernels/codegen_internal.cc
      compute/kernels/row_encoder.cc
+     compute/kernels/ree_util_internal.cc
      compute/kernels/scalar_cast_boolean.cc
      compute/kernels/scalar_cast_dictionary.cc
      compute/kernels/scalar_cast_extension.cc
diff --git a/cpp/src/arrow/compute/kernels/ree_util_internal.cc 
b/cpp/src/arrow/compute/kernels/ree_util_internal.cc
new file mode 100644
index 0000000000..edfefd324f
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/ree_util_internal.cc
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/compute/kernels/ree_util_internal.h"
+
+#include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+namespace ree_util {
+
+Result<std::shared_ptr<Buffer>> AllocateValuesBuffer(int64_t length, const 
DataType& type,
+                                                     MemoryPool* pool,
+                                                     int64_t data_buffer_size) 
{
+  if (type.bit_width() == 1) {
+    return AllocateBitmap(length, pool);
+  } else if (is_fixed_width(type.id())) {
+    return AllocateBuffer(length * type.byte_width(), pool);
+  } else {
+    DCHECK(is_base_binary_like(type.id()));
+    return AllocateBuffer(data_buffer_size, pool);
+  }
+}
+
+Result<std::shared_ptr<ArrayData>> PreallocateRunEndsArray(
+    const std::shared_ptr<DataType>& run_end_type, int64_t physical_length,
+    MemoryPool* pool) {
+  DCHECK(is_run_end_type(run_end_type->id()));
+  ARROW_ASSIGN_OR_RAISE(
+      auto run_ends_buffer,
+      AllocateBuffer(physical_length * run_end_type->byte_width(), pool));
+  return ArrayData::Make(run_end_type, physical_length,
+                         {NULLPTR, std::move(run_ends_buffer)}, 
/*null_count=*/0);
+}
+
+Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
+    const std::shared_ptr<DataType>& value_type, bool has_validity_buffer, 
int64_t length,
+    int64_t null_count, MemoryPool* pool, int64_t data_buffer_size) {
+  std::vector<std::shared_ptr<Buffer>> values_data_buffers;
+  std::shared_ptr<Buffer> validity_buffer = NULLPTR;
+  if (has_validity_buffer) {
+    ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(length, pool));
+  }
+  ARROW_ASSIGN_OR_RAISE(auto values_buffer, AllocateValuesBuffer(length, 
*value_type,
+                                                                 pool, 
data_buffer_size));
+  if (is_base_binary_like(value_type->id())) {
+    const int offset_byte_width = offset_bit_width(value_type->id()) / 8;
+    ARROW_ASSIGN_OR_RAISE(auto offsets_buffer,
+                          AllocateBuffer((length + 1) * offset_byte_width, 
pool));
+    // Ensure the first offset is zero
+    memset(offsets_buffer->mutable_data(), 0, offset_byte_width);
+    offsets_buffer->ZeroPadding();
+    values_data_buffers = {std::move(validity_buffer), 
std::move(offsets_buffer),
+                           std::move(values_buffer)};
+  } else {
+    values_data_buffers = {std::move(validity_buffer), 
std::move(values_buffer)};
+  }
+  return ArrayData::Make(value_type, length, std::move(values_data_buffers), 
null_count);
+}
+
+Result<std::shared_ptr<ArrayData>> PreallocateREEArray(
+    std::shared_ptr<RunEndEncodedType> ree_type, bool has_validity_buffer,
+    int64_t logical_length, int64_t physical_length, int64_t 
physical_null_count,
+    MemoryPool* pool, int64_t data_buffer_size) {
+  ARROW_ASSIGN_OR_RAISE(
+      auto run_ends_data,
+      PreallocateRunEndsArray(ree_type->run_end_type(), physical_length, 
pool));
+  ARROW_ASSIGN_OR_RAISE(
+      auto values_data,
+      PreallocateValuesArray(ree_type->value_type(), has_validity_buffer, 
physical_length,
+                             physical_null_count, pool, data_buffer_size));
+
+  return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR},
+                         {std::move(run_ends_data), std::move(values_data)},
+                         /*null_count=*/0);
+}
+
+void WriteSingleRunEnd(ArrayData* run_ends_data, int64_t run_end) {
+  DCHECK_GT(run_end, 0);
+  DCHECK(is_run_end_type(run_ends_data->type->id()));
+  auto* output_run_ends = run_ends_data->template GetMutableValues<uint8_t>(1);
+  switch (run_ends_data->type->id()) {
+    case Type::INT16:
+      *reinterpret_cast<int16_t*>(output_run_ends) = 
static_cast<int16_t>(run_end);
+      break;
+    case Type::INT32:
+      *reinterpret_cast<int32_t*>(output_run_ends) = 
static_cast<int32_t>(run_end);
+      break;
+    default:
+      DCHECK_EQ(run_ends_data->type->id(), Type::INT64);
+      *reinterpret_cast<int64_t*>(output_run_ends) = 
static_cast<int64_t>(run_end);
+      break;
+  }
+}
+
+Result<std::shared_ptr<ArrayData>> MakeNullREEArray(
+    const std::shared_ptr<DataType>& run_end_type, int64_t logical_length,
+    MemoryPool* pool) {
+  auto ree_type = std::make_shared<RunEndEncodedType>(run_end_type, null());
+  const int64_t physical_length = logical_length > 0 ? 1 : 0;
+  ARROW_ASSIGN_OR_RAISE(auto run_ends_data,
+                        PreallocateRunEndsArray(run_end_type, physical_length, 
pool));
+  if (logical_length > 0) {
+    WriteSingleRunEnd(run_ends_data.get(), logical_length);
+  }
+  auto values_data = ArrayData::Make(null(), physical_length, {NULLPTR},
+                                     /*null_count=*/physical_length);
+  return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR},
+                         {std::move(run_ends_data), std::move(values_data)},
+                         /*null_count=*/0);
+}
+
+}  // namespace ree_util
+}  // namespace internal
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/ree_util_internal.h 
b/cpp/src/arrow/compute/kernels/ree_util_internal.h
new file mode 100644
index 0000000000..8e06427b8d
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/ree_util_internal.h
@@ -0,0 +1,339 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Useful operations to implement kernels handling run-end encoded arrays
+
+#include <algorithm>
+#include <cstdint>
+#include <limits>
+#include <memory>
+
+#include "arrow/array/data.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+namespace ree_util {
+
+template <typename ArrowType, bool in_has_validity_buffer,
+          bool out_has_validity_buffer = in_has_validity_buffer, typename 
Enable = void>
+struct ReadWriteValue {};
+
+// Numeric and primitive C-compatible types
+template <typename ArrowType, bool in_has_validity_buffer, bool 
out_has_validity_buffer>
+class ReadWriteValue<ArrowType, in_has_validity_buffer, 
out_has_validity_buffer,
+                     enable_if_has_c_type<ArrowType>> {
+ public:
+  using ValueRepr = typename ArrowType::c_type;
+
+ private:
+  const uint8_t* input_validity_;
+  const uint8_t* input_values_;
+
+  // Needed only by the writing functions
+  uint8_t* output_validity_;
+  uint8_t* output_values_;
+
+ public:
+  explicit ReadWriteValue(const ArraySpan& input_values_array,
+                          ArrayData* output_values_array_data)
+      : input_validity_(in_has_validity_buffer ? 
input_values_array.buffers[0].data
+                                               : NULLPTR),
+        input_values_(input_values_array.buffers[1].data),
+        output_validity_((out_has_validity_buffer && output_values_array_data)
+                             ? 
output_values_array_data->buffers[0]->mutable_data()
+                             : NULLPTR),
+        output_values_(output_values_array_data
+                           ? 
output_values_array_data->buffers[1]->mutable_data()
+                           : NULLPTR) {}
+
+  [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const {
+    bool valid = true;
+    if constexpr (in_has_validity_buffer) {
+      valid = bit_util::GetBit(input_validity_, read_offset);
+    }
+    if constexpr (std::is_same_v<ArrowType, BooleanType>) {
+      *out = bit_util::GetBit(input_values_, read_offset);
+    } else {
+      *out = (reinterpret_cast<const ValueRepr*>(input_values_))[read_offset];
+    }
+    return valid;
+  }
+
+  /// \brief Ensure padding is zeroed in validity bitmap.
+  void ZeroValidityPadding(int64_t length) const {
+    DCHECK(output_values_);
+    if constexpr (out_has_validity_buffer) {
+      DCHECK(output_validity_);
+      const int64_t validity_buffer_size = bit_util::BytesForBits(length);
+      output_validity_[validity_buffer_size - 1] = 0;
+    }
+  }
+
+  void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const {
+    if constexpr (out_has_validity_buffer) {
+      bit_util::SetBitTo(output_validity_, write_offset, valid);
+    }
+    if (valid) {
+      if constexpr (std::is_same_v<ArrowType, BooleanType>) {
+        bit_util::SetBitTo(output_values_, write_offset, value);
+      } else {
+        (reinterpret_cast<ValueRepr*>(output_values_))[write_offset] = value;
+      }
+    }
+  }
+
+  void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
+                ValueRepr value) const {
+    if constexpr (out_has_validity_buffer) {
+      bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid);
+    }
+    if (valid) {
+      if constexpr (std::is_same_v<ArrowType, BooleanType>) {
+        bit_util::SetBitsTo(reinterpret_cast<uint8_t*>(output_values_), 
write_offset,
+                            run_length, value);
+      } else {
+        auto* output_values_c = reinterpret_cast<ValueRepr*>(output_values_);
+        std::fill(output_values_c + write_offset,
+                  output_values_c + write_offset + run_length, value);
+      }
+    }
+  }
+
+  bool Compare(ValueRepr lhs, ValueRepr rhs) const { return lhs == rhs; }
+};
+
+// FixedSizeBinary, Decimal128
+template <typename ArrowType, bool in_has_validity_buffer, bool 
out_has_validity_buffer>
+class ReadWriteValue<ArrowType, in_has_validity_buffer, 
out_has_validity_buffer,
+                     enable_if_fixed_size_binary<ArrowType>> {
+ public:
+  // Every value is represented as a pointer to byte_width_ bytes
+  using ValueRepr = uint8_t const*;
+
+ private:
+  const uint8_t* input_validity_;
+  const uint8_t* input_values_;
+
+  // Needed only by the writing functions
+  uint8_t* output_validity_;
+  uint8_t* output_values_;
+
+  const size_t byte_width_;
+
+ public:
+  ReadWriteValue(const ArraySpan& input_values_array, ArrayData* 
output_values_array_data)
+      : input_validity_(in_has_validity_buffer ? 
input_values_array.buffers[0].data
+                                               : NULLPTR),
+        input_values_(input_values_array.buffers[1].data),
+        output_validity_((out_has_validity_buffer && output_values_array_data)
+                             ? 
output_values_array_data->buffers[0]->mutable_data()
+                             : NULLPTR),
+        output_values_(output_values_array_data
+                           ? 
output_values_array_data->buffers[1]->mutable_data()
+                           : NULLPTR),
+        byte_width_(input_values_array.type->byte_width()) {}
+
+  [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const {
+    bool valid = true;
+    if constexpr (in_has_validity_buffer) {
+      valid = bit_util::GetBit(input_validity_, read_offset);
+    }
+    *out = input_values_ + (read_offset * byte_width_);
+    return valid;
+  }
+
+  /// \brief Ensure padding is zeroed in validity bitmap.
+  void ZeroValidityPadding(int64_t length) const {
+    DCHECK(output_values_);
+    if constexpr (out_has_validity_buffer) {
+      DCHECK(output_validity_);
+      const int64_t validity_buffer_size = bit_util::BytesForBits(length);
+      output_validity_[validity_buffer_size - 1] = 0;
+    }
+  }
+
+  void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const {
+    if constexpr (out_has_validity_buffer) {
+      bit_util::SetBitTo(output_validity_, write_offset, valid);
+    }
+    if (valid) {
+      memcpy(output_values_ + (write_offset * byte_width_), value, 
byte_width_);
+    }
+  }
+
+  void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
+                ValueRepr value) const {
+    if constexpr (out_has_validity_buffer) {
+      bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid);
+    }
+    if (valid) {
+      uint8_t* ptr = output_values_ + (write_offset * byte_width_);
+      for (int64_t i = 0; i < run_length; ++i) {
+        memcpy(ptr, value, byte_width_);
+        ptr += byte_width_;
+      }
+    }
+  }
+
+  bool Compare(ValueRepr lhs, ValueRepr rhs) const {
+    return memcmp(lhs, rhs, byte_width_) == 0;
+  }
+};
+
+// Binary, String...
+template <typename ArrowType, bool in_has_validity_buffer, bool 
out_has_validity_buffer>
+class ReadWriteValue<ArrowType, in_has_validity_buffer, 
out_has_validity_buffer,
+                     enable_if_base_binary<ArrowType>> {
+ public:
+  using ValueRepr = std::string_view;
+  using offset_type = typename ArrowType::offset_type;
+
+ private:
+  const uint8_t* input_validity_;
+  const offset_type* input_offsets_;
+  const uint8_t* input_values_;
+
+  // Needed only by the writing functions
+  uint8_t* output_validity_;
+  offset_type* output_offsets_;
+  uint8_t* output_values_;
+
+ public:
+  ReadWriteValue(const ArraySpan& input_values_array, ArrayData* 
output_values_array_data)
+      : input_validity_(in_has_validity_buffer ? 
input_values_array.buffers[0].data
+                                               : NULLPTR),
+        input_offsets_(input_values_array.template GetValues<offset_type>(1, 
0)),
+        input_values_(input_values_array.buffers[2].data),
+        output_validity_((out_has_validity_buffer && output_values_array_data)
+                             ? 
output_values_array_data->buffers[0]->mutable_data()
+                             : NULLPTR),
+        output_offsets_(
+            output_values_array_data
+                ? output_values_array_data->template 
GetMutableValues<offset_type>(1, 0)
+                : NULLPTR),
+        output_values_(output_values_array_data
+                           ? 
output_values_array_data->buffers[2]->mutable_data()
+                           : NULLPTR) {}
+
+  [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const {
+    bool valid = true;
+    if constexpr (in_has_validity_buffer) {
+      valid = bit_util::GetBit(input_validity_, read_offset);
+    }
+    if (valid) {
+      const offset_type offset0 = input_offsets_[read_offset];
+      const offset_type offset1 = input_offsets_[read_offset + 1];
+      *out = std::string_view(reinterpret_cast<const char*>(input_values_ + 
offset0),
+                              offset1 - offset0);
+    }
+    return valid;
+  }
+
+  /// \brief Ensure padding is zeroed in validity bitmap.
+  void ZeroValidityPadding(int64_t length) const {
+    DCHECK(output_values_);
+    if constexpr (out_has_validity_buffer) {
+      DCHECK(output_validity_);
+      const int64_t validity_buffer_size = bit_util::BytesForBits(length);
+      output_validity_[validity_buffer_size - 1] = 0;
+    }
+  }
+
+  void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const {
+    if constexpr (out_has_validity_buffer) {
+      bit_util::SetBitTo(output_validity_, write_offset, valid);
+    }
+    const offset_type offset0 = output_offsets_[write_offset];
+    const offset_type offset1 =
+        offset0 + (valid ? static_cast<offset_type>(value.size()) : 0);
+    output_offsets_[write_offset + 1] = offset1;
+    if (valid) {
+      memcpy(output_values_ + offset0, value.data(), value.size());
+    }
+  }
+
+  void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
+                ValueRepr value) const {
+    if constexpr (out_has_validity_buffer) {
+      bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid);
+    }
+    if (valid) {
+      int64_t i = write_offset;
+      offset_type offset = output_offsets_[i];
+      while (i < write_offset + run_length) {
+        memcpy(output_values_ + offset, value.data(), value.size());
+        offset += static_cast<offset_type>(value.size());
+        i += 1;
+        output_offsets_[i] = offset;
+      }
+    } else {
+      offset_type offset = output_offsets_[write_offset];
+      offset_type* begin = output_offsets_ + write_offset + 1;
+      std::fill(begin, begin + run_length, offset);
+    }
+  }
+
+  bool Compare(ValueRepr lhs, ValueRepr rhs) const { return lhs == rhs; }
+};
+
+Result<std::shared_ptr<Buffer>> AllocateValuesBuffer(int64_t length, const 
DataType& type,
+                                                     MemoryPool* pool,
+                                                     int64_t data_buffer_size);
+
+Result<std::shared_ptr<ArrayData>> PreallocateRunEndsArray(
+    const std::shared_ptr<DataType>& run_end_type, int64_t physical_length,
+    MemoryPool* pool);
+
+Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
+    const std::shared_ptr<DataType>& value_type, bool has_validity_buffer, 
int64_t length,
+    int64_t null_count, MemoryPool* pool, int64_t data_buffer_size);
+
+/// \brief Preallocate the ArrayData for the run-end encoded version
+/// of the flat input array
+///
+/// \param data_buffer_size the size of the data buffer for string and binary 
types
+Result<std::shared_ptr<ArrayData>> PreallocateREEArray(
+    std::shared_ptr<RunEndEncodedType> ree_type, bool has_validity_buffer,
+    int64_t logical_length, int64_t physical_length, int64_t 
physical_null_count,
+    MemoryPool* pool, int64_t data_buffer_size);
+
+/// \brief Writes a single run-end to the first slot of the pre-allocated
+/// run-end encoded array in out
+///
+/// Pre-conditions:
+/// - run_ends_data is of a valid run-ends type
+/// - run_ends_data has at least one slot
+/// - run_end > 0
+/// - run_ends fits in the run-end type without overflow
+void WriteSingleRunEnd(ArrayData* run_ends_data, int64_t run_end);
+
+Result<std::shared_ptr<ArrayData>> MakeNullREEArray(
+    const std::shared_ptr<DataType>& run_end_type, int64_t logical_length,
+    MemoryPool* pool);
+
+}  // namespace ree_util
+}  // namespace internal
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc 
b/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
index 2fa260321c..c0f54fbf5d 100644
--- a/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
+++ b/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
@@ -20,6 +20,7 @@
 #include "arrow/compute/api_vector.h"
 #include "arrow/compute/kernel.h"
 #include "arrow/compute/kernels/common_internal.h"
+#include "arrow/compute/kernels/ree_util_internal.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/ree_util.h"
@@ -28,282 +29,6 @@ namespace arrow {
 namespace compute {
 namespace internal {
 
-template <typename ArrowType, bool has_validity_buffer, typename Enable = void>
-struct ReadWriteValueImpl {};
-
-// Numeric and primitive C-compatible types
-template <typename ArrowType, bool has_validity_buffer>
-class ReadWriteValueImpl<ArrowType, has_validity_buffer,
-                         enable_if_has_c_type<ArrowType>> {
- public:
-  using ValueRepr = typename ArrowType::c_type;
-
- private:
-  const uint8_t* input_validity_;
-  const uint8_t* input_values_;
-
-  // Needed only by the writing functions
-  uint8_t* output_validity_;
-  uint8_t* output_values_;
-
- public:
-  explicit ReadWriteValueImpl(const ArraySpan& input_values_array,
-                              ArrayData* output_values_array_data)
-      : input_validity_(has_validity_buffer ? 
input_values_array.buffers[0].data
-                                            : NULLPTR),
-        input_values_(input_values_array.buffers[1].data),
-        output_validity_((has_validity_buffer && output_values_array_data)
-                             ? 
output_values_array_data->buffers[0]->mutable_data()
-                             : NULLPTR),
-        output_values_(output_values_array_data
-                           ? 
output_values_array_data->buffers[1]->mutable_data()
-                           : NULLPTR) {}
-
-  [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const {
-    bool valid = true;
-    if constexpr (has_validity_buffer) {
-      valid = bit_util::GetBit(input_validity_, read_offset);
-    }
-    if constexpr (std::is_same_v<ArrowType, BooleanType>) {
-      *out = bit_util::GetBit(input_values_, read_offset);
-    } else {
-      *out = (reinterpret_cast<const ValueRepr*>(input_values_))[read_offset];
-    }
-    return valid;
-  }
-
-  /// \brief Ensure padding is zeroed in validity bitmap.
-  void ZeroValidityPadding(int64_t length) const {
-    DCHECK(output_values_);
-    if constexpr (has_validity_buffer) {
-      DCHECK(output_validity_);
-      const int64_t validity_buffer_size = bit_util::BytesForBits(length);
-      output_validity_[validity_buffer_size - 1] = 0;
-    }
-  }
-
-  void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const {
-    if constexpr (has_validity_buffer) {
-      bit_util::SetBitTo(output_validity_, write_offset, valid);
-    }
-    if (valid) {
-      if constexpr (std::is_same_v<ArrowType, BooleanType>) {
-        bit_util::SetBitTo(output_values_, write_offset, value);
-      } else {
-        (reinterpret_cast<ValueRepr*>(output_values_))[write_offset] = value;
-      }
-    }
-  }
-
-  void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
-                ValueRepr value) const {
-    if constexpr (has_validity_buffer) {
-      bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid);
-    }
-    if (valid) {
-      if constexpr (std::is_same_v<ArrowType, BooleanType>) {
-        bit_util::SetBitsTo(reinterpret_cast<uint8_t*>(output_values_), 
write_offset,
-                            run_length, value);
-      } else {
-        auto* output_values_c = reinterpret_cast<ValueRepr*>(output_values_);
-        std::fill(output_values_c + write_offset,
-                  output_values_c + write_offset + run_length, value);
-      }
-    }
-  }
-
-  bool Compare(ValueRepr lhs, ValueRepr rhs) const { return lhs == rhs; }
-};
-
-// FixedSizeBinary, Decimal128
-template <typename ArrowType, bool has_validity_buffer>
-class ReadWriteValueImpl<ArrowType, has_validity_buffer,
-                         enable_if_fixed_size_binary<ArrowType>> {
- public:
-  // Every value is represented as a pointer to byte_width_ bytes
-  using ValueRepr = uint8_t const*;
-
- private:
-  const uint8_t* input_validity_;
-  const uint8_t* input_values_;
-
-  // Needed only by the writing functions
-  uint8_t* output_validity_;
-  uint8_t* output_values_;
-
-  const size_t byte_width_;
-
- public:
-  ReadWriteValueImpl(const ArraySpan& input_values_array,
-                     ArrayData* output_values_array_data)
-      : input_validity_(has_validity_buffer ? 
input_values_array.buffers[0].data
-                                            : NULLPTR),
-        input_values_(input_values_array.buffers[1].data),
-        output_validity_((has_validity_buffer && output_values_array_data)
-                             ? 
output_values_array_data->buffers[0]->mutable_data()
-                             : NULLPTR),
-        output_values_(output_values_array_data
-                           ? 
output_values_array_data->buffers[1]->mutable_data()
-                           : NULLPTR),
-        byte_width_(input_values_array.type->byte_width()) {}
-
-  [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const {
-    bool valid = true;
-    if constexpr (has_validity_buffer) {
-      valid = bit_util::GetBit(input_validity_, read_offset);
-    }
-    *out = input_values_ + (read_offset * byte_width_);
-    return valid;
-  }
-
-  /// \brief Ensure padding is zeroed in validity bitmap.
-  void ZeroValidityPadding(int64_t length) const {
-    DCHECK(output_values_);
-    if constexpr (has_validity_buffer) {
-      DCHECK(output_validity_);
-      const int64_t validity_buffer_size = bit_util::BytesForBits(length);
-      output_validity_[validity_buffer_size - 1] = 0;
-    }
-  }
-
-  void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const {
-    if constexpr (has_validity_buffer) {
-      bit_util::SetBitTo(output_validity_, write_offset, valid);
-    }
-    if (valid) {
-      memcpy(output_values_ + (write_offset * byte_width_), value, 
byte_width_);
-    }
-  }
-
-  void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
-                ValueRepr value) const {
-    if constexpr (has_validity_buffer) {
-      bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid);
-    }
-    if (valid) {
-      uint8_t* ptr = output_values_ + (write_offset * byte_width_);
-      for (int64_t i = 0; i < run_length; ++i) {
-        memcpy(ptr, value, byte_width_);
-        ptr += byte_width_;
-      }
-    }
-  }
-
-  bool Compare(ValueRepr lhs, ValueRepr rhs) const {
-    return memcmp(lhs, rhs, byte_width_) == 0;
-  }
-};
-
-// Binary, String...
-template <typename ArrowType, bool has_validity_buffer>
-class ReadWriteValueImpl<ArrowType, has_validity_buffer,
-                         enable_if_base_binary<ArrowType>> {
- public:
-  using ValueRepr = std::string_view;
-  using offset_type = typename ArrowType::offset_type;
-
- private:
-  const uint8_t* input_validity_;
-  const offset_type* input_offsets_;
-  const uint8_t* input_values_;
-
-  // Needed only by the writing functions
-  uint8_t* output_validity_;
-  offset_type* output_offsets_;
-  uint8_t* output_values_;
-
- public:
-  ReadWriteValueImpl(const ArraySpan& input_values_array,
-                     ArrayData* output_values_array_data)
-      : input_validity_(has_validity_buffer ? 
input_values_array.buffers[0].data
-                                            : NULLPTR),
-        input_offsets_(input_values_array.template GetValues<offset_type>(1, 
0)),
-        input_values_(input_values_array.buffers[2].data),
-        output_validity_((has_validity_buffer && output_values_array_data)
-                             ? 
output_values_array_data->buffers[0]->mutable_data()
-                             : NULLPTR),
-        output_offsets_(
-            output_values_array_data
-                ? output_values_array_data->template 
GetMutableValues<offset_type>(1, 0)
-                : NULLPTR),
-        output_values_(output_values_array_data
-                           ? 
output_values_array_data->buffers[2]->mutable_data()
-                           : NULLPTR) {}
-
-  [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const {
-    bool valid = true;
-    if constexpr (has_validity_buffer) {
-      valid = bit_util::GetBit(input_validity_, read_offset);
-    }
-    if (valid) {
-      const offset_type offset0 = input_offsets_[read_offset];
-      const offset_type offset1 = input_offsets_[read_offset + 1];
-      *out = std::string_view(reinterpret_cast<const char*>(input_values_ + 
offset0),
-                              offset1 - offset0);
-    }
-    return valid;
-  }
-
-  /// \brief Ensure padding is zeroed in validity bitmap.
-  void ZeroValidityPadding(int64_t length) const {
-    DCHECK(output_values_);
-    if constexpr (has_validity_buffer) {
-      DCHECK(output_validity_);
-      const int64_t validity_buffer_size = bit_util::BytesForBits(length);
-      output_validity_[validity_buffer_size - 1] = 0;
-    }
-  }
-
-  void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const {
-    if constexpr (has_validity_buffer) {
-      bit_util::SetBitTo(output_validity_, write_offset, valid);
-    }
-    const offset_type offset0 = output_offsets_[write_offset];
-    const offset_type offset1 =
-        offset0 + (valid ? static_cast<offset_type>(value.size()) : 0);
-    output_offsets_[write_offset + 1] = offset1;
-    if (valid) {
-      memcpy(output_values_ + offset0, value.data(), value.size());
-    }
-  }
-
-  void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
-                ValueRepr value) const {
-    if constexpr (has_validity_buffer) {
-      bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid);
-    }
-    if (valid) {
-      int64_t i = write_offset;
-      offset_type offset = output_offsets_[i];
-      while (i < write_offset + run_length) {
-        memcpy(output_values_ + offset, value.data(), value.size());
-        offset += static_cast<offset_type>(value.size());
-        i += 1;
-        output_offsets_[i] = offset;
-      }
-    } else {
-      offset_type offset = output_offsets_[write_offset];
-      offset_type* begin = output_offsets_ + write_offset + 1;
-      std::fill(begin, begin + run_length, offset);
-    }
-  }
-
-  bool Compare(ValueRepr lhs, ValueRepr rhs) const { return lhs == rhs; }
-};
-
-Result<std::shared_ptr<Buffer>> AllocateValuesBuffer(int64_t length, const 
DataType& type,
-                                                     MemoryPool* pool,
-                                                     int64_t data_buffer_size) 
{
-  if (type.bit_width() == 1) {
-    return AllocateBitmap(length, pool);
-  } else if (is_fixed_width(type.id())) {
-    return AllocateBuffer(length * type.byte_width(), pool);
-  } else {
-    DCHECK(is_base_binary_like(type.id()));
-    return AllocateBuffer(data_buffer_size, pool);
-  }
-}
-
 struct RunEndEncondingState : public KernelState {
   explicit RunEndEncondingState(std::shared_ptr<DataType> run_end_type)
       : run_end_type{std::move(run_end_type)} {}
@@ -319,7 +44,7 @@ class RunEndEncodingLoop {
   using RunEndCType = typename RunEndType::c_type;
 
  private:
-  using ReadWriteValue = ReadWriteValueImpl<ValueType, has_validity_buffer>;
+  using ReadWriteValue = ree_util::ReadWriteValue<ValueType, 
has_validity_buffer>;
   using ValueRepr = typename ReadWriteValue::ValueRepr;
 
  private:
@@ -407,75 +132,29 @@ class RunEndEncodingLoop {
   }
 };
 
-template <typename RunEndType>
-Status ValidateRunEndType(int64_t input_length) {
-  using RunEndCType = typename RunEndType::c_type;
-  constexpr int64_t kRunEndMax = std::numeric_limits<RunEndCType>::max();
-  if (input_length < 0 || input_length > kRunEndMax) {
+ARROW_NOINLINE Status ValidateRunEndType(const std::shared_ptr<DataType>& 
run_end_type,
+                                         int64_t input_length) {
+  int64_t run_end_max = std::numeric_limits<int64_t>::max();
+  switch (run_end_type->id()) {
+    case Type::INT16:
+      run_end_max = std::numeric_limits<int16_t>::max();
+      break;
+    case Type::INT32:
+      run_end_max = std::numeric_limits<int32_t>::max();
+      break;
+    default:
+      DCHECK_EQ(run_end_type->id(), Type::INT64);
+      break;
+  }
+  if (input_length < 0 || input_length > run_end_max) {
     return Status::Invalid(
         "Cannot run-end encode Arrays with more elements than the "
         "run end type can hold: ",
-        kRunEndMax);
+        run_end_max);
   }
   return Status::OK();
 }
 
-Result<std::shared_ptr<ArrayData>> PreallocateRunEndsArray(
-    const std::shared_ptr<DataType>& run_end_type, int64_t physical_length,
-    MemoryPool* pool) {
-  ARROW_ASSIGN_OR_RAISE(
-      auto run_ends_buffer,
-      AllocateBuffer(physical_length * run_end_type->byte_width(), pool));
-  return ArrayData::Make(run_end_type, physical_length,
-                         {NULLPTR, std::move(run_ends_buffer)}, 
/*null_count=*/0);
-}
-
-ARROW_NOINLINE Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
-    const std::shared_ptr<DataType>& value_type, bool has_validity_buffer, 
int64_t length,
-    int64_t null_count, MemoryPool* pool, int64_t data_buffer_size) {
-  std::vector<std::shared_ptr<Buffer>> values_data_buffers;
-  std::shared_ptr<Buffer> validity_buffer = NULLPTR;
-  if (has_validity_buffer) {
-    ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(length, pool));
-  }
-  ARROW_ASSIGN_OR_RAISE(auto values_buffer, AllocateValuesBuffer(length, 
*value_type,
-                                                                 pool, 
data_buffer_size));
-  if (is_base_binary_like(value_type->id())) {
-    const int offset_byte_width = offset_bit_width(value_type->id()) / 8;
-    ARROW_ASSIGN_OR_RAISE(auto offsets_buffer,
-                          AllocateBuffer((length + 1) * offset_byte_width, 
pool));
-    // Ensure the first offset is zero
-    memset(offsets_buffer->mutable_data(), 0, offset_byte_width);
-    offsets_buffer->ZeroPadding();
-    values_data_buffers = {std::move(validity_buffer), 
std::move(offsets_buffer),
-                           std::move(values_buffer)};
-  } else {
-    values_data_buffers = {std::move(validity_buffer), 
std::move(values_buffer)};
-  }
-  return ArrayData::Make(value_type, length, std::move(values_data_buffers), 
null_count);
-}
-
-/// \brief Preallocate the ArrayData for the run-end encoded version
-/// of the flat input array
-///
-/// \param data_buffer_size the size of the data buffer for string and binary 
types
-ARROW_NOINLINE Result<std::shared_ptr<ArrayData>> PreallocateREEArray(
-    std::shared_ptr<RunEndEncodedType> ree_type, bool has_validity_buffer,
-    int64_t logical_length, int64_t physical_length, int64_t 
physical_null_count,
-    MemoryPool* pool, int64_t data_buffer_size) {
-  ARROW_ASSIGN_OR_RAISE(
-      auto run_ends_data,
-      PreallocateRunEndsArray(ree_type->run_end_type(), physical_length, 
pool));
-  ARROW_ASSIGN_OR_RAISE(
-      auto values_data,
-      PreallocateValuesArray(ree_type->value_type(), has_validity_buffer, 
physical_length,
-                             physical_null_count, pool, data_buffer_size));
-
-  return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR},
-                         {std::move(run_ends_data), std::move(values_data)},
-                         /*null_count=*/0);
-}
-
 template <typename RunEndType, typename ValueType, bool has_validity_buffer>
 class RunEndEncodeImpl {
  private:
@@ -492,13 +171,14 @@ class RunEndEncodeImpl {
   Status Exec() {
     const int64_t input_length = input_array_.length;
 
+    auto run_end_type = TypeTraits<RunEndType>::type_singleton();
     auto ree_type = std::make_shared<RunEndEncodedType>(
-        TypeTraits<RunEndType>::type_singleton(), 
input_array_.type->GetSharedPtr());
+        run_end_type, input_array_.type->GetSharedPtr());
     if (input_length == 0) {
       ARROW_ASSIGN_OR_RAISE(
           auto output_array_data,
-          PreallocateREEArray(std::move(ree_type), has_validity_buffer, 
input_length, 0,
-                              0, ctx_->memory_pool(), 0));
+          ree_util::PreallocateREEArray(std::move(ree_type), 
has_validity_buffer,
+                                        input_length, 0, 0, 
ctx_->memory_pool(), 0));
       output_->value = std::move(output_array_data);
       return Status::OK();
     }
@@ -507,7 +187,7 @@ class RunEndEncodeImpl {
     int64_t num_valid_runs = 0;
     int64_t num_output_runs = 0;
     int64_t data_buffer_size = 0;  // for string and binary types
-    RETURN_NOT_OK(ValidateRunEndType<RunEndType>(input_length));
+    RETURN_NOT_OK(ValidateRunEndType(run_end_type, input_length));
 
     RunEndEncodingLoop<RunEndType, ValueType, has_validity_buffer> 
counting_loop(
         input_array_,
@@ -518,9 +198,9 @@ class RunEndEncodeImpl {
 
     ARROW_ASSIGN_OR_RAISE(
         auto output_array_data,
-        PreallocateREEArray(std::move(ree_type), has_validity_buffer, 
input_length,
-                            num_output_runs, num_output_runs - num_valid_runs,
-                            ctx_->memory_pool(), data_buffer_size));
+        ree_util::PreallocateREEArray(
+            std::move(ree_type), has_validity_buffer, input_length, 
num_output_runs,
+            num_output_runs - num_valid_runs, ctx_->memory_pool(), 
data_buffer_size));
 
     // Initialize the output pointers
     auto* output_run_ends =
@@ -538,51 +218,27 @@ class RunEndEncodeImpl {
   }
 };
 
-template <typename RunEndType>
-Result<std::shared_ptr<ArrayData>> PreallocateNullREEArray(int64_t 
logical_length,
-                                                           int64_t 
physical_length,
-                                                           MemoryPool* pool) {
-  ARROW_ASSIGN_OR_RAISE(
-      auto run_ends_buffer,
-      AllocateBuffer(TypeTraits<RunEndType>::bytes_required(physical_length), 
pool));
-
-  auto ree_type =
-      std::make_shared<RunEndEncodedType>(std::make_shared<RunEndType>(), 
null());
-  auto run_ends_data = ArrayData::Make(std::make_shared<RunEndType>(), 
physical_length,
-                                       {NULLPTR, std::move(run_ends_buffer)},
-                                       /*null_count=*/0);
-  auto values_data = ArrayData::Make(null(), physical_length, {NULLPTR},
-                                     /*null_count=*/physical_length);
-  return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR},
-                         {std::move(run_ends_data), std::move(values_data)},
-                         /*null_count=*/0);
-}
-
-template <typename RunEndType>
-Status RunEndEncodeNullArray(KernelContext* ctx, const ArraySpan& input_array,
-                             ExecResult* output) {
-  using RunEndCType = typename RunEndType::c_type;
-
+ARROW_NOINLINE Status RunEndEncodeNullArray(const std::shared_ptr<DataType>& 
run_end_type,
+                                            KernelContext* ctx,
+                                            const ArraySpan& input_array,
+                                            ExecResult* output) {
   const int64_t input_length = input_array.length;
   DCHECK(input_array.type->id() == Type::NA);
 
   if (input_length == 0) {
-    ARROW_ASSIGN_OR_RAISE(auto output_array_data,
-                          PreallocateNullREEArray<RunEndType>(0, 0, 
ctx->memory_pool()));
+    ARROW_ASSIGN_OR_RAISE(
+        auto output_array_data,
+        ree_util::MakeNullREEArray(run_end_type, 0, ctx->memory_pool()));
     output->value = std::move(output_array_data);
     return Status::OK();
   }
 
   // Abort if run-end type cannot hold the input length
-  RETURN_NOT_OK(ValidateRunEndType<RunEndType>(input_array.length));
-
-  ARROW_ASSIGN_OR_RAISE(auto output_array_data, 
PreallocateNullREEArray<RunEndType>(
-                                                    input_length, 1, 
ctx->memory_pool()));
+  RETURN_NOT_OK(ValidateRunEndType(run_end_type, input_array.length));
 
-  // Write the single run-end this REE has
-  auto* output_run_ends =
-      output_array_data->child_data[0]->template 
GetMutableValues<RunEndCType>(1, 0);
-  output_run_ends[0] = static_cast<RunEndCType>(input_length);
+  ARROW_ASSIGN_OR_RAISE(
+      auto output_array_data,
+      ree_util::MakeNullREEArray(run_end_type, input_length, 
ctx->memory_pool()));
 
   output->value = std::move(output_array_data);
   return Status::OK();
@@ -594,7 +250,8 @@ struct RunEndEncodeExec {
     DCHECK(span.values[0].is_array());
     const auto& input_array = span.values[0].array;
     if constexpr (ValueType::type_id == Type::NA) {
-      return RunEndEncodeNullArray<RunEndType>(ctx, input_array, result);
+      return RunEndEncodeNullArray(TypeTraits<RunEndType>::type_singleton(), 
ctx,
+                                   input_array, result);
     } else {
       const bool has_validity_buffer = input_array.MayHaveNulls();
       if (has_validity_buffer) {
@@ -645,7 +302,7 @@ class RunEndDecodingLoop {
   using RunEndCType = typename RunEndType::c_type;
 
  private:
-  using ReadWriteValue = ReadWriteValueImpl<ValueType, has_validity_buffer>;
+  using ReadWriteValue = ree_util::ReadWriteValue<ValueType, 
has_validity_buffer>;
   using ValueRepr = typename ReadWriteValue::ValueRepr;
 
   const ArraySpan& input_array_;
@@ -660,19 +317,20 @@ class RunEndDecodingLoop {
 
  public:
   RunEndDecodingLoop(const ArraySpan& input_array, ArrayData* 
output_array_data)
-      : RunEndDecodingLoop(input_array, ree_util::ValuesArray(input_array),
+      : RunEndDecodingLoop(input_array, 
arrow::ree_util::ValuesArray(input_array),
                            output_array_data) {}
 
   /// \brief For variable-length types, calculate the total length of the data
   /// buffer needed to store the expanded values.
   int64_t CalculateOutputDataBufferSize() const {
-    auto& input_array_values = ree_util::ValuesArray(input_array_);
+    auto& input_array_values = arrow::ree_util::ValuesArray(input_array_);
     DCHECK_EQ(input_array_values.type->id(), ValueType::type_id);
     if constexpr (is_base_binary_like(ValueType::type_id)) {
       using offset_type = typename ValueType::offset_type;
       int64_t data_buffer_size = 0;
 
-      const ree_util::RunEndEncodedArraySpan<RunEndCType> 
ree_array_span(input_array_);
+      const arrow::ree_util::RunEndEncodedArraySpan<RunEndCType> 
ree_array_span(
+          input_array_);
       const auto* offsets_buffer =
           input_array_values.template GetValues<offset_type>(1, 0);
       auto it = ree_array_span.begin();
@@ -695,7 +353,8 @@ class RunEndDecodingLoop {
   ARROW_NOINLINE int64_t ExpandAllRuns() {
     read_write_value_.ZeroValidityPadding(input_array_.length);
 
-    const ree_util::RunEndEncodedArraySpan<RunEndCType> 
ree_array_span(input_array_);
+    const arrow::ree_util::RunEndEncodedArraySpan<RunEndCType> ree_array_span(
+        input_array_);
     int64_t write_offset = 0;
     int64_t output_valid_count = 0;
     for (auto it = ree_array_span.begin(); !it.is_end(ree_array_span); ++it) {
@@ -738,10 +397,10 @@ class RunEndDecodeImpl {
       }
     }
 
-    ARROW_ASSIGN_OR_RAISE(
-        auto output_array_data,
-        PreallocateValuesArray(ree_type->value_type(), has_validity_buffer, 
length,
-                               kUnknownNullCount, ctx_->memory_pool(), 
data_buffer_size));
+    ARROW_ASSIGN_OR_RAISE(auto output_array_data,
+                          ree_util::PreallocateValuesArray(
+                              ree_type->value_type(), has_validity_buffer, 
length,
+                              kUnknownNullCount, ctx_->memory_pool(), 
data_buffer_size));
 
     int64_t output_null_count = 0;
     if (length > 0) {
@@ -774,7 +433,8 @@ struct RunEndDecodeExec {
     if constexpr (ValueType::type_id == Type::NA) {
       return RunEndDecodeNullREEArray(ctx, input_array, result);
     } else {
-      const bool has_validity_buffer = 
ree_util::ValuesArray(input_array).MayHaveNulls();
+      const bool has_validity_buffer =
+          arrow::ree_util::ValuesArray(input_array).MayHaveNulls();
       if (has_validity_buffer) {
         return RunEndDecodeImpl<RunEndType, ValueType, true>(ctx, input_array, 
result)
             .Exec();


Reply via email to