Alex-PLACET commented on code in PR #49679:
URL: https://github.com/apache/arrow/pull/49679#discussion_r3182091672


##########
cpp/src/arrow/compute/kernels/vector_search_sorted.cc:
##########
@@ -0,0 +1,1158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_vector.h"
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <optional>
+#include <ranges>
+#include <span>
+#include <type_traits>
+#include <utility>
+
+#include "arrow/array/array_primitive.h"
+#include "arrow/array/concatenate.h"
+#include "arrow/array/array_run_end.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/array/util.h"
+#include "arrow/chunk_resolver.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/vector_sort_internal.h"
+#include "arrow/compute/registry.h"
+#include "arrow/compute/registry_internal.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/logging_internal.h"
+#include "arrow/util/ree_util.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute::internal {
+namespace {
+
+const SearchSortedOptions* GetDefaultSearchSortedOptions() {
+  static const auto kDefaultSearchSortedOptions = 
SearchSortedOptions::Defaults();
+  return &kDefaultSearchSortedOptions;
+}
+
+const FunctionDoc search_sorted_doc(
+    "Find insertion indices for sorted input",
+    ("Return the index where each needle should be inserted in a sorted input 
array\n"
+     "to maintain ascending order.\n"
+     "\n"
+     "With side='left', returns the first suitable index (lower bound).\n"
+     "With side='right', returns the last suitable index (upper bound).\n"
+     "\n"
+     "The searched values may be provided as an array or chunked array and 
must\n"
+     "already be sorted in ascending order. Null values in the searched array 
are\n"
+     "supported when clustered entirely at the start or\n"
+     "entirely at the end. Non-null needles are matched only against the 
non-null\n"
+     "portion of the searched array. Needles may be a scalar, array, or 
chunked\n"
+     "array. Null needles emit nulls in the output."),
+    {"values", "needles"}, "SearchSortedOptions");
+
+// This file implements search_sorted as a small pipeline that first normalizes
+// Arrow input shapes and then runs one typed binary-search core on logical
+// values.
+//
+// Plain arrays, run-end encoded arrays, chunked arrays, and scalar needles are
+// all adapted into a common accessor and run-visitor model so the search logic
+// does not care about physical layout.
+//
+// After validation, the kernel isolates the contiguous non-null window of the
+// searched values, because nulls are only supported when clustered at one end.
+// That window uses logical null counting for run-end encoded inputs, whose
+// nulls live in the values child rather than in a top-level validity bitmap.
+//
+// Needles then follow one of two paths. Scalars and plain arrays go through a
+// shared logical-run visitor: scalars become a single run, plain arrays become
+// one-element runs, and chunked inputs recurse chunk by chunk. Run-end encoded
+// needles take a simpler physical-run path: search each physical needle once,
+// rebuild a temporary run-end encoded uint64 result with the same run ends,
+// and run-end decode it back to the dense output shape.
+//
+// Output materialization is unified behind small output sinks. Non-null-only
+// needles write directly into a preallocated uint64 buffer, while nullable
+// needles append null and non-null runs through a UInt64Builder. The builder
+// path is optimized for repeated runs by bulk-filling reserved memory instead
+// of appending one insertion index at a time.
+//
+// High-level flow:
+//
+//   values datum
+//       |
+//       +--> ValidateSortedValuesInput
+//       |
+//       +--> LogicalType / FindNonNullValuesRange
+//       |
+//       +--> VisitValuesAccessor
+//             |
+//             +--> PlainArrayAccessor
+//             |
+//             +--> RunEndEncodedValuesAccessor
+//             |
+//             +--> ChunkedArrayAccessor
+//             |
+//             `--> ChunkedRunEndEncodedValuesAccessor
+//
+//   needles datum
+//       |
+//       +--> ValidateNeedleInput
+//       |
+//       +--> DatumHasNulls
+//       |
+//       +--> REE needles
+//       |     +--> search physical runs once
+//       |     +--> rebuild temporary REE uint64 result
+//       |     `--> RunEndDecode back to dense output
+//       |
+//       `--> VisitNeedleRuns
+//             |
+//             +--> scalar needle  -> one logical run
+//             |
+//             +--> plain array    -> one-element runs
+//             |
+//             `--> chunked input  -> recurse chunk by chunk
+//
+//   normalized values accessor + normalized needle runs
+//       |
+//       `--> FindInsertionPoint<T>
+//             |
+//             +--> side = left  -> lower_bound semantics
+//             |
+//             `--> side = right -> upper_bound semantics
+//
+//   result materialization
+//       |
+//       +--> no needle nulls
+//       |     +--> MakeMutableUInt64Array
+//       |     `--> PreallocatedInsertionIndexOutput
+//       |           `--> fill output buffer directly
+//       |
+//       `--> nullable needles
+//             +--> UInt64Builder
+//             `--> BuilderInsertionIndexOutput
+//                   +--> AppendNulls for null runs
+//                   `--> bulk fill + UnsafeAdvance for repeated indices
+//
+// A rough map of the file:
+//
+//   [validation + type helpers]
+//           |
+//   [value accessors]
+//           |
+//   [needle visitors]
+//           |
+//   [typed search + output helpers]
+//           |
+//   [meta-function dispatch]
+//
+
+#define VISIT_SEARCH_SORTED_TYPES(VISIT) \
+  VISIT(BooleanType)                     \
+  VISIT(Int8Type)                        \
+  VISIT(Int16Type)                       \
+  VISIT(Int32Type)                       \
+  VISIT(Int64Type)                       \
+  VISIT(UInt8Type)                       \
+  VISIT(UInt16Type)                      \
+  VISIT(UInt32Type)                      \
+  VISIT(UInt64Type)                      \
+  VISIT(FloatType)                       \

Review Comment:
   done



##########
cpp/src/arrow/compute/kernels/vector_search_sorted_benchmark.cc:
##########
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/builder.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/datum.h"
+#include "arrow/scalar.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/benchmark_util.h"
+
+namespace arrow {
+namespace compute {
+
+constexpr auto kSeed = 0x5EA4C42;
+constexpr int64_t kNeedleToValueRatio = 4;
+constexpr int64_t kValuesRunLength = 16;
+constexpr int64_t kNeedlesRunLength = 8;
+constexpr int32_t kStringMinLength = 8;
+constexpr int32_t kStringMaxLength = 24;
+
+void SetSearchSortedQuickArgs(benchmark::internal::Benchmark* bench) {
+  bench->Unit(benchmark::kMicrosecond);
+  for (const auto size : std::vector<int64_t>{kL1Size, kL2Size}) {
+    bench->Arg(size);
+  }
+}
+
+void SetSearchSortedArgs(benchmark::internal::Benchmark* bench) {
+  bench->Unit(benchmark::kMicrosecond);
+  for (const auto size : {kL1Size, kL2Size}) {
+    bench->Arg(size);
+  }
+}
+
+int64_t Int64LengthFromBytes(int64_t size_bytes) {
+  return std::max<int64_t>(1, size_bytes / 
static_cast<int64_t>(sizeof(int64_t)));
+}
+
+int64_t NeedleLengthFromBytes(int64_t size_bytes) {
+  return std::max<int64_t>(1, Int64LengthFromBytes(size_bytes) / 
kNeedleToValueRatio);
+}
+
+int64_t StringLengthFromBytes(int64_t size_bytes) {
+  const int64_t average_length = (kStringMinLength + kStringMaxLength) / 2;
+  return std::max<int64_t>(1, size_bytes / average_length);
+}
+
+std::shared_ptr<Int64Array> BuildSortedInt64Values(int64_t size_bytes) {
+  random::RandomArrayGenerator rand(kSeed);
+  const auto length = Int64LengthFromBytes(size_bytes);
+  const auto max_value = std::max<int64_t>(length / 8, 1);
+
+  auto values =
+      std::static_pointer_cast<Int64Array>(rand.Int64(length, 0, max_value, 
0.0));
+  std::vector<int64_t> data(values->raw_values(),
+                            values->raw_values() + values->length());
+  std::ranges::sort(data);
+
+  Int64Builder builder;
+  ABORT_NOT_OK(builder.AppendValues(data));
+  return std::static_pointer_cast<Int64Array>(builder.Finish().ValueOrDie());
+}
+
+std::shared_ptr<Int64Array> BuildInt64Needles(int64_t size_bytes) {
+  random::RandomArrayGenerator rand(kSeed + 1);
+  const auto length = NeedleLengthFromBytes(size_bytes);
+  const auto max_value = std::max<int64_t>(Int64LengthFromBytes(size_bytes) / 
8, 1);
+  return std::static_pointer_cast<Int64Array>(rand.Int64(length, 0, max_value, 
0.0));
+}
+
+std::shared_ptr<StringArray> BuildSortedStringValues(int64_t size_bytes) {
+  random::RandomArrayGenerator rand(kSeed + 2);
+  const auto length = StringLengthFromBytes(size_bytes);
+  auto values = std::static_pointer_cast<StringArray>(
+      rand.String(length, kStringMinLength, kStringMaxLength, 0.0));
+
+  std::vector<std::string> data;
+  data.reserve(static_cast<size_t>(values->length()));
+  for (int64_t index = 0; index < values->length(); ++index) {
+    data.push_back(values->GetString(index));
+  }
+  std::ranges::sort(data);
+
+  StringBuilder builder;
+  ABORT_NOT_OK(builder.AppendValues(data));
+  return std::static_pointer_cast<StringArray>(builder.Finish().ValueOrDie());
+}
+
+std::shared_ptr<StringArray> BuildStringNeedles(int64_t size_bytes) {
+  random::RandomArrayGenerator rand(kSeed + 3);
+  const auto length = std::max<int64_t>(1, StringLengthFromBytes(size_bytes) / 
4);
+  return std::static_pointer_cast<StringArray>(
+      rand.String(length, kStringMinLength, kStringMaxLength, 0.0));
+}
+
+std::shared_ptr<Int64Array> BuildRunHeavyInt64Values(int64_t logical_length,
+                                                     int64_t run_length) {
+  Int64Builder builder;
+  ABORT_NOT_OK(builder.Reserve(logical_length));
+  for (int64_t index = 0; index < logical_length; ++index) {
+    builder.UnsafeAppend(index / run_length);
+  }
+  return std::static_pointer_cast<Int64Array>(builder.Finish().ValueOrDie());
+}
+
+std::shared_ptr<Int64Array> BuildRunHeavyInt64NeedlesWithNullRuns(int64_t 
logical_length,
+                                                                  int64_t 
run_length) {
+  std::vector<int64_t> data(static_cast<size_t>(logical_length), 0);
+  std::vector<bool> is_valid(static_cast<size_t>(logical_length), true);
+
+  for (int64_t index = 0; index < logical_length; ++index) {
+    const int64_t run_index = index / run_length;
+    if (run_index % 4 == 0) {
+      is_valid[static_cast<size_t>(index)] = false;
+      continue;
+    }
+    data[static_cast<size_t>(index)] = run_index / 2;
+  }
+
+  Int64Builder builder;
+  ABORT_NOT_OK(builder.AppendValues(data, is_valid));
+  return std::static_pointer_cast<Int64Array>(builder.Finish().ValueOrDie());
+}
+
+std::shared_ptr<Int64Array> BuildInt64NeedlesWithNullRuns(int64_t size_bytes,
+                                                          int64_t run_length) {
+  return 
BuildRunHeavyInt64NeedlesWithNullRuns(NeedleLengthFromBytes(size_bytes),
+                                               run_length);
+}
+
+std::shared_ptr<Array> BuildRunEndEncodedInt64Values(int64_t size_bytes,
+                                                     int64_t run_length) {
+  auto values = BuildRunHeavyInt64Values(Int64LengthFromBytes(size_bytes), 
run_length);
+  return RunEndEncode(Datum(values), RunEndEncodeOptions{int32()})
+      .ValueOrDie()
+      .make_array();
+}
+
+std::shared_ptr<Array> BuildRunEndEncodedInt64Needles(int64_t size_bytes,
+                                                      int64_t run_length) {
+  auto needles = BuildRunHeavyInt64Values(NeedleLengthFromBytes(size_bytes), 
run_length);
+  return RunEndEncode(Datum(needles), RunEndEncodeOptions{int32()})
+      .ValueOrDie()
+      .make_array();
+}
+
+std::shared_ptr<Array> BuildRunEndEncodedInt64NeedlesWithNullRuns(int64_t 
size_bytes,
+                                                                  int64_t 
run_length) {
+  auto needles = 
BuildRunHeavyInt64NeedlesWithNullRuns(NeedleLengthFromBytes(size_bytes),
+                                                       run_length);
+  return RunEndEncode(Datum(needles), RunEndEncodeOptions{int32()})
+      .ValueOrDie()
+      .make_array();
+}
+
+void SetBenchmarkCounters(benchmark::State& state, const Datum& values,
+                          const Datum& needles) {
+  const auto values_length = values.length();
+  const auto needles_length = needles.length();
+  state.counters["values_length"] = static_cast<double>(values_length);
+  state.counters["needles_length"] = static_cast<double>(needles_length);
+  state.SetItemsProcessed(state.iterations() * needles_length);
+}
+
+void RunSearchSortedBenchmark(benchmark::State& state, const Datum& values,
+                              const Datum& needles, SearchSortedOptions::Side 
side) {
+  const SearchSortedOptions options(side);
+  for (auto _ : state) {
+    auto result = SearchSorted(values, needles, options);
+    ABORT_NOT_OK(result.status());
+    benchmark::DoNotOptimize(result.ValueUnsafe());
+  }
+  SetBenchmarkCounters(state, values, needles);
+}
+
+static void BM_SearchSortedInt64ArrayNeedles(benchmark::State& state,
+                                             SearchSortedOptions::Side side) {
+  const Datum values(BuildSortedInt64Values(state.range(0)));
+  const Datum needles(BuildInt64Needles(state.range(0)));
+  RunSearchSortedBenchmark(state, values, needles, side);
+}
+
+static void BM_SearchSortedInt64ScalarNeedle(benchmark::State& state,
+                                             SearchSortedOptions::Side side) {
+  const auto values_array = BuildSortedInt64Values(state.range(0));
+  const auto scalar_index = values_array->length() / 2;
+  const Datum values(values_array);
+  const Datum 
needles(std::make_shared<Int64Scalar>(values_array->Value(scalar_index)));
+  RunSearchSortedBenchmark(state, values, needles, side);
+}
+
+static void BM_SearchSortedRunEndEncodedValues(benchmark::State& state,
+                                               SearchSortedOptions::Side side) 
{
+  const Datum values(BuildRunEndEncodedInt64Values(state.range(0), 
kValuesRunLength));
+  const Datum needles(BuildInt64Needles(state.range(0)));
+  RunSearchSortedBenchmark(state, values, needles, side);
+}
+
+static void BM_SearchSortedRunEndEncodedValuesAndNeedles(benchmark::State& 
state,
+                                                         
SearchSortedOptions::Side side) {
+  const Datum values(BuildRunEndEncodedInt64Values(state.range(0), 
kValuesRunLength));
+  const Datum needles(BuildRunEndEncodedInt64Needles(state.range(0), 
kNeedlesRunLength));
+  RunSearchSortedBenchmark(state, values, needles, side);
+}
+
+static void BM_SearchSortedInt64NeedlesWithNullRuns(benchmark::State& state,
+                                                    SearchSortedOptions::Side 
side) {
+  const Datum values(BuildSortedInt64Values(state.range(0)));
+  const Datum needles(BuildInt64NeedlesWithNullRuns(state.range(0), 
kNeedlesRunLength));
+  RunSearchSortedBenchmark(state, values, needles, side);
+}
+
+static void BM_SearchSortedRunEndEncodedNeedlesWithNullRuns(
+    benchmark::State& state, SearchSortedOptions::Side side) {
+  const Datum values(BuildRunEndEncodedInt64Values(state.range(0), 
kValuesRunLength));
+  const Datum needles(
+      BuildRunEndEncodedInt64NeedlesWithNullRuns(state.range(0), 
kNeedlesRunLength));
+  RunSearchSortedBenchmark(state, values, needles, side);
+}
+
+static void BM_SearchSortedStringArrayNeedles(benchmark::State& state,
+                                              SearchSortedOptions::Side side) {
+  const Datum values(BuildSortedStringValues(state.range(0)));
+  const Datum needles(BuildStringNeedles(state.range(0)));
+  RunSearchSortedBenchmark(state, values, needles, side);
+}
+
+static void BM_SearchSortedInt64ArrayNeedlesQuick(benchmark::State& state,

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to