This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new fcbcb7dab7 GH-34569: [C++] Diffing of Run-End Encoded arrays (#35003)
fcbcb7dab7 is described below
commit fcbcb7dab71b14ff3436af7b21bd23132877e2fa
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Thu Oct 26 18:35:05 2023 -0300
GH-34569: [C++] Diffing of Run-End Encoded arrays (#35003)
### What changes are included in this PR?
Ability to diff run-end encoded arrays without expanding them first. This
is useful for debugging and understanding of unit test failures.
Many successive random accesses to REE arrays by logical indices incur many
successive binary searches on the run-ends array to find the physical index of
the values array, so this PR implements a few interesting optimizations -- the
most complicated one is encapsulated in a new utility available via
`ree_util.h` as there are many potential uses for it other than diffing.
+-------------+---------------------------+----------------------------------------------------+
| | Optimizations
|
+-------------+---------------------------+-----------------------+----------------------------+
| # bsearches | Null checks on Comparator | RunLengthOfEqualsFrom |
Cached FindPhysicalIndex() |
+-------------+---------------------------+----------------------+-----------------------------+
| 3798 | | |
|
+-------------+---------------------------+----------------------+-----------------------------+
| 1914 | X | |
|
+-------------+---------------------------+----------------------+-----------------------------+
| 180 | X | X |
|
+-------------+---------------------------+----------------------+-----------------------------+
| 153 | X | |
X |
+-------------+---------------------------+----------------------+-----------------------------+
| 75 | X | X |
X |
+-------------+---------------------------+----------------------+-----------------------------+
### Are these changes tested?
Yes. By existing and newly added REE-specific tests.
### Are there any user-facing changes?
No.
* Closes: #34569
Authored-by: Felipe Oliveira Carvalho <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
---
cpp/src/arrow/array/diff.cc | 319 +++++++++++++++++++++++++++++++--------
cpp/src/arrow/array/diff_test.cc | 107 +++++++++++++
cpp/src/arrow/util/ree_util.cc | 56 +++++++
cpp/src/arrow/util/ree_util.h | 68 +++++++++
4 files changed, 491 insertions(+), 59 deletions(-)
diff --git a/cpp/src/arrow/array/diff.cc b/cpp/src/arrow/array/diff.cc
index f267c52c0d..be9597e59b 100644
--- a/cpp/src/arrow/array/diff.cc
+++ b/cpp/src/arrow/array/diff.cc
@@ -21,6 +21,7 @@
#include <chrono>
#include <functional>
#include <memory>
+#include <optional>
#include <sstream>
#include <string>
#include <string_view>
@@ -32,6 +33,7 @@
#include "arrow/array/array_decimal.h"
#include "arrow/array/array_nested.h"
#include "arrow/array/array_primitive.h"
+#include "arrow/array/array_run_end.h"
#include "arrow/buffer.h"
#include "arrow/buffer_builder.h"
#include "arrow/extension_type.h"
@@ -43,7 +45,9 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/range.h"
+#include "arrow/util/ree_util.h"
#include "arrow/util/string.h"
+#include "arrow/util/unreachable.h"
#include "arrow/vendored/datetime.h"
#include "arrow/visit_type_inline.h"
@@ -96,45 +100,247 @@ static UnitSlice GetView(const UnionArray& array, int64_t
index) {
return UnitSlice{&array, index};
}
-using ValueComparator = std::function<bool(const Array&, int64_t, const
Array&, int64_t)>;
+/// \brief A simple virtual comparator interface for two arrays.
+///
+/// The base and target array ara bound at construction time. Then
+/// Equals(base_index, target_index) should return true if the values
+/// at the given indices are equal.
+struct ValueComparator {
+ virtual ~ValueComparator() = default;
+
+ /// \brief Compare the validity and values at the given indices in the base
and target
+ /// arrays.
+ ///
+ /// \param base_index The index in the base array.
+ /// \param target_index The index in the target array.
+ /// \return true if the values at the given indices are equal, false
otherwise.
+ /// \pre base_index and target_index are valid indices in their respective
arrays.
+ virtual bool Equals(int64_t base_index, int64_t target_index) = 0;
+
+ /// \brief Return the run length of equal values starting at the given
indices in the
+ /// base and target arrays.
+ ///
+ /// \param base_index The starting index in the base array.
+ /// \param base_length The length of the base array.
+ /// \param target_index The starting index in the target array.
+ /// \param target_length The length of the target array.
+ /// \return The run length of equal values starting at the given indices in
the base
+ /// and target arrays.
+ virtual int64_t RunLengthOfEqualsFrom(int64_t base_index, int64_t
base_length,
+ int64_t target_index, int64_t
target_length) {
+ int64_t run_length_of_equals = 0;
+ while (base_index < base_length && target_index < target_length) {
+ if (!Equals(base_index, target_index)) {
+ break;
+ }
+ base_index += 1;
+ target_index += 1;
+ run_length_of_equals += 1;
+ }
+ return run_length_of_equals;
+ }
+};
+
+template <typename ArrayType>
+struct DefaultValueComparator : public ValueComparator {
+ const ArrayType& base;
+ const ArrayType& target;
+
+ DefaultValueComparator(const ArrayType& base, const ArrayType& target)
+ : base(base), target(target) {}
+
+ ~DefaultValueComparator() override = default;
+
+ bool Equals(int64_t base_index, int64_t target_index) override {
+ const bool base_valid = base.IsValid(base_index);
+ const bool target_valid = target.IsValid(target_index);
+ if (base_valid && target_valid) {
+ return GetView(base, base_index) == GetView(target, target_index);
+ }
+ return base_valid == target_valid;
+ }
+};
+
+template <typename RunEndCType>
+class REEValueComparator : public ValueComparator {
+ private:
+ const RunEndEncodedArray& base_;
+ const RunEndEncodedArray& target_;
+ std::unique_ptr<ValueComparator> inner_value_comparator_;
+ ree_util::PhysicalIndexFinder<RunEndCType> base_physical_index_finder_;
+ ree_util::PhysicalIndexFinder<RunEndCType> target_physical_index_finder_;
+
+ public:
+ REEValueComparator(const RunEndEncodedArray& base, const RunEndEncodedArray&
target,
+ std::unique_ptr<ValueComparator>&& inner_value_comparator)
+ : base_(base),
+ target_(target),
+ inner_value_comparator_(std::move(inner_value_comparator)),
+ base_physical_index_finder_(*base_.data()),
+ target_physical_index_finder_(*target_.data()) {
+ DCHECK_EQ(*base_.type(), *target_.type());
+ }
+
+ ~REEValueComparator() override = default;
+
+ private:
+ /// \pre 0 <= i < base_.length()
+ inline int64_t FindPhysicalIndexOnBase(int64_t i) {
+ return base_physical_index_finder_.FindPhysicalIndex(i);
+ }
+
+ /// \pre 0 <= i < target_.length()
+ inline int64_t FindPhysicalIndexOnTarget(int64_t i) {
+ return target_physical_index_finder_.FindPhysicalIndex(i);
+ }
+
+ const RunEndCType* base_run_ends() { return
base_physical_index_finder_.run_ends; }
+
+ const RunEndCType* target_run_ends() { return
target_physical_index_finder_.run_ends; }
+
+ public:
+ int64_t RunLengthOfEqualsFrom(int64_t base_index, int64_t base_length,
+ int64_t target_index, int64_t target_length)
override {
+ // Ensure the first search for physical index on the values arrays is safe.
+ if (base_index >= base_length || target_index >= target_length) {
+ // Without values on either side, there is no run of equal values.
+ return 0;
+ }
+
+ // Translate the two logical indices into physical indices.
+ int64_t physical_base_index = FindPhysicalIndexOnBase(base_index);
+ int64_t physical_target_index = FindPhysicalIndexOnTarget(target_index);
+
+ int64_t run_length_of_equals = 0;
+ // The loop invariant (base_index < base_length && target_index <
target_length)
+ // is valid when the loop starts because of the check above.
+ for (;;) {
+ const auto base_run_end =
+ static_cast<int64_t>(base_run_ends()[physical_base_index]) -
base_.offset();
+ const auto target_run_end =
+ static_cast<int64_t>(target_run_ends()[physical_target_index]) -
+ target_.offset();
+ // The end of the runs containing the logical indices, by definition,
ends
+ // after the logical indices.
+ DCHECK_LT(base_index, base_run_end);
+ DCHECK_LT(target_index, target_run_end);
+
+ // Compare the physical values that make up the runs containing
base_index
+ // and target_index.
+ if (!inner_value_comparator_->Equals(physical_base_index,
physical_target_index)) {
+ // First difference found, stop because the run of equal values cannot
+ // be extended further.
+ break;
+ }
+
+ const int64_t base_run = std::min(base_run_end, base_length) -
base_index;
+ const int64_t target_run = std::min(target_run_end, target_length) -
target_index;
+ // Due to the loop-invariant (base_index < base_length && target_index <
+ // target_length) and properties of the run-ends asserted above, both
base_run and
+ // target_run are strictly greater than zero.
+ DCHECK_GT(base_run, 0);
+ DCHECK_GT(target_run, 0);
+
+ // Skip the smallest run (or both runs if they are equal)
+ const int64_t increment = std::min(base_run, target_run);
+ physical_base_index += increment == base_run;
+ physical_target_index += increment == target_run;
+
+ // Since both base_run and target_run are greater than zero,
+ // increment is also greater than zero...
+ DCHECK_GT(increment, 0);
+ // ...which implies that the loop will make progress and eventually
terminate
+ // because base_index or target_index will equal base_length or
target_length,
+ // respectively.
+ base_index += increment;
+ target_index += increment;
+ // The value representing the two runs are equal, so we can assume that
at
+ // least `increment` (size of smallest run) values are equal.
+ run_length_of_equals += increment;
+
+ if (base_index >= base_length || target_index >= target_length) {
+ break;
+ }
+ }
+
+ return run_length_of_equals;
+ }
-struct ValueComparatorVisitor {
+ bool Equals(int64_t base_index, int64_t target_index) override {
+ const int64_t physical_base_index = FindPhysicalIndexOnBase(base_index);
+ const int64_t physical_target_index =
FindPhysicalIndexOnTarget(target_index);
+ return inner_value_comparator_->Equals(physical_base_index,
physical_target_index);
+ }
+};
+
+class ValueComparatorFactory {
+ private:
+ std::unique_ptr<ValueComparator> comparator_;
+
+ public:
template <typename T>
- Status Visit(const T&) {
+ Status Visit(const T&, const Array& base, const Array& target) {
using ArrayType = typename TypeTraits<T>::ArrayType;
- out = [](const Array& base, int64_t base_index, const Array& target,
- int64_t target_index) {
- return (GetView(checked_cast<const ArrayType&>(base), base_index) ==
- GetView(checked_cast<const ArrayType&>(target), target_index));
- };
+ comparator_ = std::make_unique<DefaultValueComparator<ArrayType>>(
+ checked_cast<const ArrayType&>(base), checked_cast<const
ArrayType&>(target));
return Status::OK();
}
- Status Visit(const NullType&) { return Status::NotImplemented("null type"); }
+ Status Visit(const NullType&, const Array&, const Array&) {
+ return Status::NotImplemented("null type");
+ }
- Status Visit(const ExtensionType&) { return
Status::NotImplemented("extension type"); }
+ Status Visit(const ExtensionType&, const Array&, const Array&) {
+ return Status::NotImplemented("extension type");
+ }
- Status Visit(const DictionaryType&) {
+ Status Visit(const DictionaryType&, const Array& base, const Array& target) {
return Status::NotImplemented("dictionary type");
}
- Status Visit(const RunEndEncodedType&) {
- return Status::NotImplemented("run-end encoded type");
+ Status Visit(const RunEndEncodedType& ree_type, const Array& base,
+ const Array& target) {
+ const auto& base_ree = checked_cast<const RunEndEncodedArray&>(base);
+ const auto& target_ree = checked_cast<const RunEndEncodedArray&>(target);
+
+ ARROW_ASSIGN_OR_RAISE(
+ auto inner_values_comparator,
+ Create(*ree_type.value_type(), *base_ree.values(),
*target_ree.values()));
+
+ // Instantiate the specialized comparator types with operator new instead
of
+ // make_unique<T>() to avoid binary bloat. unique_ptr<T>'s constructor is
templated
+ // on the type of the deleter and we're fine with destructor calls being
virtually
+ // dispatched via ValueComparator.
+ ValueComparator* ree_value_comparator = nullptr;
+ switch (ree_type.run_end_type()->id()) {
+ case Type::INT16:
+ ree_value_comparator = new REEValueComparator<int16_t>(
+ base_ree, target_ree, std::move(inner_values_comparator));
+ break;
+ case Type::INT32:
+ ree_value_comparator = new REEValueComparator<int32_t>(
+ base_ree, target_ree, std::move(inner_values_comparator));
+ break;
+ case Type::INT64:
+ ree_value_comparator = new REEValueComparator<int64_t>(
+ base_ree, target_ree, std::move(inner_values_comparator));
+ break;
+ default:
+ Unreachable();
+ }
+ comparator_.reset(ree_value_comparator);
+ return Status::OK();
}
- ValueComparator Create(const DataType& type) {
- DCHECK_OK(VisitTypeInline(type, this));
- return out;
+ static Result<std::unique_ptr<ValueComparator>> Create(const DataType& type,
+ const Array& base,
+ const Array& target) {
+ ValueComparatorFactory self;
+ RETURN_NOT_OK(VisitTypeInline(type, &self, base, target));
+ return std::move(self.comparator_);
}
-
- ValueComparator out;
};
-ValueComparator GetValueComparator(const DataType& type) {
- ValueComparatorVisitor type_visitor;
- return type_visitor.Create(type);
-}
-
// represents an intermediate state in the comparison of two arrays
struct EditPoint {
int64_t base, target;
@@ -161,33 +367,9 @@ struct EditPoint {
class QuadraticSpaceMyersDiff {
public:
QuadraticSpaceMyersDiff(const Array& base, const Array& target, MemoryPool*
pool)
- : base_(base),
- target_(target),
- pool_(pool),
- value_comparator_(GetValueComparator(*base.type())),
- base_begin_(0),
- base_end_(base.length()),
- target_begin_(0),
- target_end_(target.length()),
- endpoint_base_({ExtendFrom({base_begin_, target_begin_}).base}),
- insert_({true}) {
- if ((base_end_ - base_begin_ == target_end_ - target_begin_) &&
- endpoint_base_[0] == base_end_) {
- // trivial case: base == target
- finish_index_ = 0;
- }
- }
-
- bool ValuesEqual(int64_t base_index, int64_t target_index) const {
- bool base_null = base_.IsNull(base_index);
- bool target_null = target_.IsNull(target_index);
- if (base_null || target_null) {
- // If only one is null, then this is false, otherwise true
- return base_null && target_null;
- }
- return value_comparator_(base_, base_index, target_, target_index);
- }
+ : base_(base), target_(target), pool_(pool) {}
+ private:
// increment the position within base (the element pointed to was deleted)
// then extend maximally
EditPoint DeleteOne(EditPoint p) const {
@@ -209,11 +391,10 @@ class QuadraticSpaceMyersDiff {
// increment the position within base and target (the elements skipped in
this way were
// present in both sequences)
EditPoint ExtendFrom(EditPoint p) const {
- for (; p.base != base_end_ && p.target != target_end_; ++p.base,
++p.target) {
- if (!ValuesEqual(p.base, p.target)) {
- break;
- }
- }
+ const int64_t run_length_of_equals =
+ _comparator->RunLengthOfEqualsFrom(p.base, base_end_, p.target,
target_end_);
+ p.base += run_length_of_equals;
+ p.target += run_length_of_equals;
return p;
}
@@ -321,7 +502,24 @@ class QuadraticSpaceMyersDiff {
{field("insert", boolean()), field("run_length", int64())});
}
+ public:
Result<std::shared_ptr<StructArray>> Diff() {
+ base_begin_ = 0;
+ base_end_ = base_.length();
+ target_begin_ = 0;
+ target_end_ = target_.length();
+ ARROW_ASSIGN_OR_RAISE(_comparator,
+ ValueComparatorFactory::Create(*base_.type(), base_,
target_));
+
+ finish_index_ = -1;
+ edit_count_ = 0;
+ endpoint_base_ = {ExtendFrom({base_begin_, target_begin_}).base};
+ insert_ = {true};
+ if ((base_end_ - base_begin_ == target_end_ - target_begin_) &&
+ endpoint_base_[0] == base_end_) {
+ // trivial case: base == target
+ finish_index_ = 0;
+ }
while (!Done()) {
Next();
}
@@ -329,14 +527,19 @@ class QuadraticSpaceMyersDiff {
}
private:
+ // Constructor-injected references
const Array& base_;
const Array& target_;
MemoryPool* pool_;
- ValueComparator value_comparator_;
+
+ // Initialized on Diff() and immutable thereafter
+ int64_t base_begin_ = 0, base_end_ = -1;
+ int64_t target_begin_ = 0, target_end_ = -1;
+ std::unique_ptr<ValueComparator> _comparator;
+
+ // Initialized on Next() and mutated throughout the diffing process
int64_t finish_index_ = -1;
int64_t edit_count_ = 0;
- int64_t base_begin_, base_end_;
- int64_t target_begin_, target_end_;
// each element of endpoint_base_ is the furthest position in base reachable
given an
// edit_count and (# insertions) - (# deletions). Each bit of insert_
records whether
// the corresponding furthest position was reached via an insertion or a
deletion
@@ -386,8 +589,6 @@ Result<std::shared_ptr<StructArray>> Diff(const Array&
base, const Array& target
return Diff(*base_storage, *target_storage, pool);
} else if (base.type()->id() == Type::DICTIONARY) {
return Status::NotImplemented("diffing arrays of type ", *base.type());
- } else if (base.type()->id() == Type::RUN_END_ENCODED) {
- return Status::NotImplemented("diffing arrays of type ", *base.type());
} else {
return QuadraticSpaceMyersDiff(base, target, pool).Diff();
}
diff --git a/cpp/src/arrow/array/diff_test.cc b/cpp/src/arrow/array/diff_test.cc
index 4c2f39eddf..145978a91a 100644
--- a/cpp/src/arrow/array/diff_test.cc
+++ b/cpp/src/arrow/array/diff_test.cc
@@ -163,6 +163,107 @@ class DiffTest : public ::testing::Test {
AssertRunLengthIs("[2, 0, 0]");
}
+ std::shared_ptr<RunEndEncodedArray> RunEndEncodedArrayFromJSON(
+ int64_t logical_length, const std::shared_ptr<DataType>& ree_type,
+ std::string_view run_ends_json, std::string_view values_json,
+ int64_t logical_offset = 0) {
+ auto& ree_type_ref = checked_cast<const RunEndEncodedType&>(*ree_type);
+ auto run_ends = ArrayFromJSON(ree_type_ref.run_end_type(), run_ends_json);
+ auto values = ArrayFromJSON(ree_type_ref.value_type(), values_json);
+ return RunEndEncodedArray::Make(logical_length, std::move(run_ends),
+ std::move(values), logical_offset)
+ .ValueOrDie();
+ }
+
+ template <typename RunEndType>
+ void TestBasicsWithREEs() {
+ auto run_end_type = std::make_shared<RunEndType>();
+ auto value_type = utf8();
+ auto ree_type = run_end_encoded(run_end_type, value_type);
+
+ // empty REEs
+ base_ = RunEndEncodedArrayFromJSON(0, ree_type, "[]", "[]");
+ target_ = RunEndEncodedArrayFromJSON(0, ree_type, "[]", "[]");
+ DoDiff();
+ AssertInsertIs("[false]");
+ AssertRunLengthIs("[0]");
+
+ // null REE arrays of different lengths
+ base_ = RunEndEncodedArrayFromJSON(2, ree_type, "[2]", "[null]");
+ target_ = RunEndEncodedArrayFromJSON(4, ree_type, "[4]", "[null]");
+ DoDiff();
+ AssertInsertIs("[false, true, true]");
+ AssertRunLengthIs("[2, 0, 0]");
+
+ // identical REE arrays w/ offsets
+ base_ =
+ RunEndEncodedArrayFromJSON(110, ree_type, R"([20, 120])", R"(["a",
"b"])", 10);
+ target_ =
+ RunEndEncodedArrayFromJSON(110, ree_type, R"([20, 120])", R"(["a",
"b"])", 10);
+ DoDiff();
+ AssertInsertIs("[false]");
+ AssertRunLengthIs("[110]");
+
+ // equivalent REE arrays
+ base_ = RunEndEncodedArrayFromJSON(120, ree_type, R"([10, 20, 120])",
+ R"(["a", "a", "b"])");
+ target_ = RunEndEncodedArrayFromJSON(120, ree_type, R"([20, 30, 120])",
+ R"(["a", "b", "b"])");
+ DoDiff();
+ AssertInsertIs("[false]");
+ AssertRunLengthIs("[120]");
+
+ // slice so last run-end goes beyond length
+ base_ = base_->Slice(5, 105);
+ target_ = target_->Slice(5, 105);
+ DoDiff();
+ AssertInsertIs("[false]");
+ AssertRunLengthIs("[105]");
+
+ // insert one
+ base_ = RunEndEncodedArrayFromJSON(12, ree_type, R"([2, 12])", R"(["a",
"b"])");
+ target_ = RunEndEncodedArrayFromJSON(13, ree_type, R"([3, 13])", R"(["a",
"b"])");
+ DoDiff();
+ AssertInsertIs("[false, true]");
+ AssertRunLengthIs("[2, 10]");
+
+ // delete one
+ base_ =
+ RunEndEncodedArrayFromJSON(13, ree_type, R"([2, 5, 13])", R"(["a",
"b", "c"])");
+ target_ =
+ RunEndEncodedArrayFromJSON(12, ree_type, R"([2, 4, 12])", R"(["a",
"b", "c"])");
+ DoDiff();
+ AssertInsertIs("[false, false]");
+ AssertRunLengthIs("[4, 8]");
+
+ // null out one
+ base_ =
+ RunEndEncodedArrayFromJSON(12, ree_type, R"([2, 5, 12])", R"(["a",
"b", "c"])");
+ target_ = RunEndEncodedArrayFromJSON(12, ree_type, R"([2, 4, 5, 12])",
+ R"(["a", "b", null, "c"])");
+ DoDiff();
+ AssertInsertIs("[false, false, true]");
+ AssertRunLengthIs("[4, 0, 7]");
+
+ // append some
+ base_ = RunEndEncodedArrayFromJSON(12, ree_type, R"([2, 4, 8, 12])",
+ R"(["a", "b", "c", "d"])");
+ target_ = RunEndEncodedArrayFromJSON(15, ree_type, R"([2, 4, 8, 13, 15])",
+ R"(["a", "b", "c", "d", "e"])");
+ DoDiff();
+ AssertInsertIs("[false, true, true, true]");
+ AssertRunLengthIs("[12, 0, 0, 0]");
+
+ // prepend some
+ base_ = RunEndEncodedArrayFromJSON(12, ree_type, R"([2, 4, 8, 12])",
+ R"(["c", "d", "e", "f"])");
+ target_ = RunEndEncodedArrayFromJSON(15, ree_type, R"([1, 3, 5, 7, 11,
15])",
+ R"(["a", "b", "c", "d", "e", "f"])");
+ DoDiff();
+ AssertInsertIs("[false, true, true, true]");
+ AssertRunLengthIs("[0, 0, 0, 12]");
+ }
+
random::RandomArrayGenerator rng_;
std::shared_ptr<StructArray> edits_;
std::shared_ptr<Array> base_, target_;
@@ -415,6 +516,12 @@ TEST_F(DiffTest, BasicsWithSparseUnions) {
TestBasicsWithUnions(UnionMode::SPARS
TEST_F(DiffTest, BasicsWithDenseUnions) {
TestBasicsWithUnions(UnionMode::DENSE); }
+TEST_F(DiffTest, BasicsWithREEs) {
+ TestBasicsWithREEs<Int16Type>();
+ TestBasicsWithREEs<Int32Type>();
+ TestBasicsWithREEs<Int64Type>();
+}
+
TEST_F(DiffTest, UnifiedDiffFormatter) {
// no changes
base_ = ArrayFromJSON(utf8(), R"(["give", "me", "a", "break"])");
diff --git a/cpp/src/arrow/util/ree_util.cc b/cpp/src/arrow/util/ree_util.cc
index fcd6c204e0..819de5eb60 100644
--- a/cpp/src/arrow/util/ree_util.cc
+++ b/cpp/src/arrow/util/ree_util.cc
@@ -61,6 +61,62 @@ int64_t LogicalNullCount(const ArraySpan& span) {
return LogicalNullCount<int64_t>(span);
}
+namespace internal {
+
+/// \pre 0 <= i < array_span.length()
+template <typename RunEndCType>
+int64_t FindPhysicalIndexImpl(PhysicalIndexFinder<RunEndCType>& self, int64_t
i) {
+ DCHECK_LT(i, self.array_span.length);
+ const int64_t run_ends_size = ree_util::RunEndsArray(self.array_span).length;
+ DCHECK_LT(self.last_physical_index, run_ends_size);
+ // This access to self.run_ends[last_physical_index] is alwas safe because:
+ // 1. 0 <= i < array_span.length() implies there is at least one run and the
initial
+ // value 0 will be safe to index with.
+ // 2. last_physical_index > 0 is always the result of a valid call to
+ // internal::FindPhysicalIndex.
+ if (ARROW_PREDICT_TRUE(self.array_span.offset + i <
+ self.run_ends[self.last_physical_index])) {
+ // The cached value is an upper-bound, but is it the least upper-bound?
+ if (self.last_physical_index == 0 ||
+ self.array_span.offset + i >= self.run_ends[self.last_physical_index -
1]) {
+ return self.last_physical_index;
+ }
+ // last_physical_index - 1 is a candidate for the least upper-bound,
+ // so search for the least upper-bound in the range that includes it.
+ const int64_t j = ree_util::internal::FindPhysicalIndex<RunEndCType>(
+ self.run_ends, /*run_ends_size=*/self.last_physical_index, i,
+ self.array_span.offset);
+ DCHECK_LT(j, self.last_physical_index);
+ return self.last_physical_index = j;
+ }
+
+ // last_physical_index is not an upper-bound, and the logical index i MUST be
+ // in the runs that follow it. Since i is a valid logical index, we know
that at least
+ // one extra run is present.
+ DCHECK_LT(self.last_physical_index + 1, run_ends_size);
+ const int64_t min_physical_index = self.last_physical_index + 1;
+
+ const int64_t j = ree_util::internal::FindPhysicalIndex<RunEndCType>(
+ /*run_ends=*/self.run_ends + min_physical_index,
+ /*run_ends_size=*/run_ends_size - min_physical_index, i,
self.array_span.offset);
+ DCHECK_LT(min_physical_index + j, run_ends_size);
+ return self.last_physical_index = min_physical_index + j;
+}
+
+int64_t FindPhysicalIndexImpl16(PhysicalIndexFinder<int16_t>& self, int64_t i)
{
+ return FindPhysicalIndexImpl(self, i);
+}
+
+int64_t FindPhysicalIndexImpl32(PhysicalIndexFinder<int32_t>& self, int64_t i)
{
+ return FindPhysicalIndexImpl(self, i);
+}
+
+int64_t FindPhysicalIndexImpl64(PhysicalIndexFinder<int64_t>& self, int64_t i)
{
+ return FindPhysicalIndexImpl(self, i);
+}
+
+} // namespace internal
+
int64_t FindPhysicalIndex(const ArraySpan& span, int64_t i, int64_t
absolute_offset) {
const auto type_id = RunEndsArray(span).type->id();
if (type_id == Type::INT16) {
diff --git a/cpp/src/arrow/util/ree_util.h b/cpp/src/arrow/util/ree_util.h
index a3d3d16c0d..2b7940154a 100644
--- a/cpp/src/arrow/util/ree_util.h
+++ b/cpp/src/arrow/util/ree_util.h
@@ -23,6 +23,7 @@
#include "arrow/array/data.h"
#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
#include "arrow/util/macros.h"
namespace arrow {
@@ -139,6 +140,69 @@ int64_t FindPhysicalLength(const ArraySpan& span) {
/*offset=*/span.offset);
}
+template <typename RunEndCType>
+struct PhysicalIndexFinder;
+
+// non-inline implementations for each run-end type
+ARROW_EXPORT int64_t FindPhysicalIndexImpl16(PhysicalIndexFinder<int16_t>&
self,
+ int64_t i);
+ARROW_EXPORT int64_t FindPhysicalIndexImpl32(PhysicalIndexFinder<int32_t>&
self,
+ int64_t i);
+ARROW_EXPORT int64_t FindPhysicalIndexImpl64(PhysicalIndexFinder<int64_t>&
self,
+ int64_t i);
+
+/// \brief Stateful version of FindPhysicalIndex() that caches the result of
+/// the previous search and uses it to optimize the next search.
+///
+/// When new queries for the physical index of a logical index come in,
+/// binary search is performed again but the first candidate checked is the
+/// result of the previous search (cached physical index) instead of the
+/// midpoint of the run-ends array.
+///
+/// If that test fails, internal::FindPhysicalIndex() is called with one of the
+/// partitions defined by the cached index. If the queried logical indices
+/// follow an increasing or decreasing pattern, this first test is much more
+/// effective in (1) finding the answer right away (close logical indices
belong
+/// to the same runs) or (2) discarding many more candidates than probing
+/// the midpoint would.
+///
+/// The most adversarial case (i.e. alternating between 0 and length-1 queries)
+/// only adds one extra binary search probe when compared to always starting
+/// binary search from the midpoint without any of these optimizations.
+///
+/// \tparam RunEndCType The numeric type of the run-ends array.
+template <typename RunEndCType>
+struct PhysicalIndexFinder {
+ const ArraySpan array_span;
+ const RunEndCType* run_ends;
+ int64_t last_physical_index = 0;
+
+ explicit PhysicalIndexFinder(const ArrayData& data)
+ : array_span(data),
+ run_ends(RunEndsArray(array_span).template GetValues<RunEndCType>(1)) {
+ assert(CTypeTraits<RunEndCType>::ArrowType::type_id ==
+ ::arrow::internal::checked_cast<const
RunEndEncodedType&>(*data.type)
+ .run_end_type()
+ ->id());
+ }
+
+ /// \brief Find the physical index into the values array of the REE array.
+ ///
+ /// \pre 0 <= i < array_span.length()
+ /// \param i the logical index into the REE array
+ /// \return the physical index into the values array
+ int64_t FindPhysicalIndex(int64_t i) {
+ if constexpr (std::is_same_v<RunEndCType, int16_t>) {
+ return FindPhysicalIndexImpl16(*this, i);
+ } else if constexpr (std::is_same_v<RunEndCType, int32_t>) {
+ return FindPhysicalIndexImpl32(*this, i);
+ } else {
+ static_assert(std::is_same_v<RunEndCType, int64_t>, "Unsupported
RunEndCType.");
+ return FindPhysicalIndexImpl64(*this, i);
+ }
+ }
+};
+
} // namespace internal
/// \brief Find the physical index into the values array of the REE ArraySpan
@@ -166,6 +230,10 @@ ARROW_EXPORT std::pair<int64_t, int64_t>
FindPhysicalRange(const ArraySpan& span
int64_t offset,
int64_t length);
+// Publish PhysicalIndexFinder outside of the internal namespace.
+template <typename RunEndCType>
+using PhysicalIndexFinder = internal::PhysicalIndexFinder<RunEndCType>;
+
template <typename RunEndCType>
class RunEndEncodedArraySpan {
private: