This is an automated email from the ASF dual-hosted git repository.
felipecrv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new ec41209ea0 GH-37055: [C++] Optimize hash kernels for Dictionary
ChunkedArrays (#38394)
ec41209ea0 is described below
commit ec41209ea02bdb410bc7e049cb3100afedf4ba2f
Author: Jin Shang <[email protected]>
AuthorDate: Sat Dec 23 23:50:39 2023 +0800
GH-37055: [C++] Optimize hash kernels for Dictionary ChunkedArrays (#38394)
### Rationale for this change
When merging dictionaries across chunks, the hash kernels unnecessarily
unify the existing dictionary, dragging down the performance.
### What changes are included in this PR?
Reuse the dictionary unifier across chunks.
### Are these changes tested?
Yes, with a new benchmark for dictionary chunked arrays.
### Are there any user-facing changes?
No.
* Closes: #37055
Lead-authored-by: Jin Shang <[email protected]>
Co-authored-by: Felipe Oliveira Carvalho <[email protected]>
Signed-off-by: Felipe Oliveira Carvalho <[email protected]>
---
cpp/src/arrow/compute/kernels/vector_hash.cc | 55 +++++++++++++++-------
.../arrow/compute/kernels/vector_hash_benchmark.cc | 36 ++++++++++++++
2 files changed, 74 insertions(+), 17 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/vector_hash.cc
b/cpp/src/arrow/compute/kernels/vector_hash.cc
index 65e59d1a2e..800deba3a5 100644
--- a/cpp/src/arrow/compute/kernels/vector_hash.cc
+++ b/cpp/src/arrow/compute/kernels/vector_hash.cc
@@ -26,17 +26,20 @@
#include "arrow/array/concatenate.h"
#include "arrow/array/dict_internal.h"
#include "arrow/array/util.h"
+#include "arrow/buffer.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/cast.h"
#include "arrow/compute/kernels/common_internal.h"
#include "arrow/result.h"
#include "arrow/util/hashing.h"
+#include "arrow/util/int_util.h"
#include "arrow/util/unreachable.h"
namespace arrow {
using internal::DictionaryTraits;
using internal::HashTraits;
+using internal::TransposeInts;
namespace compute {
namespace internal {
@@ -448,9 +451,9 @@ class DictionaryHashKernel : public HashKernel {
Status Append(const ArraySpan& arr) override {
auto arr_dict = arr.dictionary().ToArray();
- if (!dictionary_) {
- dictionary_ = arr_dict;
- } else if (!dictionary_->Equals(*arr_dict)) {
+ if (!first_dictionary_) {
+ first_dictionary_ = arr_dict;
+ } else if (!first_dictionary_->Equals(*arr_dict)) {
// NOTE: This approach computes a new dictionary unification per chunk.
// This is in effect O(n*k) where n is the total chunked array length and
// k is the number of chunks (therefore O(n**2) if chunks have a fixed
size).
@@ -458,21 +461,23 @@ class DictionaryHashKernel : public HashKernel {
// A better approach may be to run the kernel over each individual chunk,
// and then hash-aggregate all results (for example sum-group-by for
// the "value_counts" kernel).
- auto out_dict_type = dictionary_->type();
+ if (dictionary_unifier_ == nullptr) {
+ ARROW_ASSIGN_OR_RAISE(dictionary_unifier_,
+
DictionaryUnifier::Make(first_dictionary_->type()));
+ RETURN_NOT_OK(dictionary_unifier_->Unify(*first_dictionary_));
+ }
+ auto out_dict_type = first_dictionary_->type();
std::shared_ptr<Buffer> transpose_map;
- std::shared_ptr<Array> out_dict;
- ARROW_ASSIGN_OR_RAISE(auto unifier,
DictionaryUnifier::Make(out_dict_type));
- ARROW_CHECK_OK(unifier->Unify(*dictionary_));
- ARROW_CHECK_OK(unifier->Unify(*arr_dict, &transpose_map));
- ARROW_CHECK_OK(unifier->GetResult(&out_dict_type, &out_dict));
+ RETURN_NOT_OK(dictionary_unifier_->Unify(*arr_dict, &transpose_map));
- dictionary_ = out_dict;
auto transpose = reinterpret_cast<const int32_t*>(transpose_map->data());
- auto in_dict_array = arr.ToArray();
+ auto in_array = arr.ToArray();
+ const auto& in_dict_array =
+ arrow::internal::checked_cast<const DictionaryArray&>(*in_array);
ARROW_ASSIGN_OR_RAISE(
- auto tmp, arrow::internal::checked_cast<const
DictionaryArray&>(*in_dict_array)
- .Transpose(arr.type->GetSharedPtr(), out_dict,
transpose));
+ auto tmp, in_dict_array.Transpose(arr.type->GetSharedPtr(),
+ in_dict_array.dictionary(),
transpose));
return indices_kernel_->Append(*tmp->data());
}
@@ -495,12 +500,27 @@ class DictionaryHashKernel : public HashKernel {
return dictionary_value_type_;
}
- std::shared_ptr<Array> dictionary() const { return dictionary_; }
+ /// This can't be called more than once because
DictionaryUnifier::GetResult()
+ /// can't be called more than once and produce the same output.
+ Result<std::shared_ptr<Array>> dictionary() const {
+ if (!first_dictionary_) { // Append was never called
+ return nullptr;
+ }
+ if (!dictionary_unifier_) { // Append was called only once
+ return first_dictionary_;
+ }
+
+ auto out_dict_type = first_dictionary_->type();
+ std::shared_ptr<Array> out_dict;
+ RETURN_NOT_OK(dictionary_unifier_->GetResult(&out_dict_type, &out_dict));
+ return out_dict;
+ }
private:
std::unique_ptr<HashKernel> indices_kernel_;
- std::shared_ptr<Array> dictionary_;
+ std::shared_ptr<Array> first_dictionary_;
std::shared_ptr<DataType> dictionary_value_type_;
+ std::unique_ptr<DictionaryUnifier> dictionary_unifier_;
};
// ----------------------------------------------------------------------
@@ -630,8 +650,9 @@ Status ValueCountsFinalize(KernelContext* ctx,
std::vector<Datum>* out) {
// hence have no dictionary.
Result<std::shared_ptr<ArrayData>> EnsureHashDictionary(KernelContext* ctx,
DictionaryHashKernel*
hash) {
- if (hash->dictionary()) {
- return hash->dictionary()->data();
+ ARROW_ASSIGN_OR_RAISE(auto dict, hash->dictionary());
+ if (dict) {
+ return dict->data();
}
ARROW_ASSIGN_OR_RAISE(auto null,
MakeArrayOfNull(hash->dictionary_value_type(),
/*length=*/0,
ctx->memory_pool()));
diff --git a/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc
b/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc
index e9548e133a..472f50db8c 100644
--- a/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc
+++ b/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc
@@ -25,6 +25,7 @@
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/testing/util.h"
+#include "arrow/util/logging.h"
#include "arrow/compute/api.h"
@@ -226,6 +227,33 @@ static void UniqueString100bytes(benchmark::State& state) {
BenchUnique(state,
HashParams<StringType>{general_bench_cases[state.range(0)], 100});
}
+template <typename ParamType>
+void BenchValueCountsDictionaryChunks(benchmark::State& state, const
ParamType& params) {
+ std::shared_ptr<Array> arr;
+ params.GenerateTestData(&arr);
+ // chunk arr to 100 slices
+ std::vector<std::shared_ptr<Array>> chunks;
+ const int64_t chunk_size = arr->length() / 100;
+ for (int64_t i = 0; i < 100; ++i) {
+ auto slice = arr->Slice(i * chunk_size, chunk_size);
+ auto datum = DictionaryEncode(slice).ValueOrDie();
+ ARROW_CHECK(datum.is_array());
+ chunks.push_back(datum.make_array());
+ }
+ auto chunked_array = std::make_shared<ChunkedArray>(chunks);
+
+ while (state.KeepRunning()) {
+ ABORT_NOT_OK(ValueCounts(chunked_array).status());
+ }
+ params.SetMetadata(state);
+}
+
+static void ValueCountsDictionaryChunks(benchmark::State& state) {
+ // Dictionary of byte strings with 10 bytes each
+ BenchValueCountsDictionaryChunks(
+ state, HashParams<StringType>{general_bench_cases[state.range(0)], 10});
+}
+
void HashSetArgs(benchmark::internal::Benchmark* bench) {
for (int i = 0; i < static_cast<int>(general_bench_cases.size()); ++i) {
bench->Arg(i);
@@ -239,6 +267,14 @@ BENCHMARK(UniqueInt64)->Apply(HashSetArgs);
BENCHMARK(UniqueString10bytes)->Apply(HashSetArgs);
BENCHMARK(UniqueString100bytes)->Apply(HashSetArgs);
+void DictionaryChunksHashSetArgs(benchmark::internal::Benchmark* bench) {
+ for (int i = 0; i < static_cast<int>(general_bench_cases.size()); ++i) {
+ bench->Arg(i);
+ }
+}
+
+BENCHMARK(ValueCountsDictionaryChunks)->Apply(DictionaryChunksHashSetArgs);
+
void UInt8SetArgs(benchmark::internal::Benchmark* bench) {
for (int i = 0; i < static_cast<int>(uint8_bench_cases.size()); ++i) {
bench->Arg(i);