felipecrv commented on code in PR #47377:
URL: https://github.com/apache/arrow/pull/47377#discussion_r2625157933


##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -465,6 +480,23 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
     }
   }
 
+  // Then the selection span
+  if (selection_vector_) {
+    DCHECK_NE(selection_span, nullptr);
+    auto indices_begin = selection_vector_->indices() + selection_position_;
+    auto indices_end = selection_vector_->indices() + 
selection_vector_->length();
+    DCHECK_LE(indices_begin, indices_end);
+    auto chunk_row_id_end = position_ + iteration_size;
+    int64_t num_indices = 0;
+    while (indices_begin + num_indices < indices_end &&
+           *(indices_begin + num_indices) < chunk_row_id_end) {
+      ++num_indices;
+    }

Review Comment:
   This is a slow placeholder, right? You will have to do an Exponential search 
from `n - 1` to `0`.
   
   https://en.wikipedia.org/wiki/Exponential_search



##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -811,19 +878,46 @@ class ScalarExecutor : public 
KernelExecutorImpl<ScalarKernel> {
     }
   }
 
-  Datum WrapResults(const std::vector<Datum>& inputs,
-                    const std::vector<Datum>& outputs) override {
-    // If execution yielded multiple chunks (because large arrays were split
-    // based on the ExecContext parameters, then the result is a ChunkedArray
-    if (HaveChunkedArray(inputs) || outputs.size() > 1) {
-      return ToChunkedArray(outputs, output_type_);
-    } else {
-      // Outputs have just one element
-      return outputs[0];
+  // Execute a single batch with a selection vector "densely" for a kernel 
that doesn't
+  // support selective execution. "Densely" here means that we first gather 
the rows
+  // indicated by the selection vector into a contiguous ExecBatch, execute 
that, and
+  // then scatter the result back to the original row positions.

Review Comment:
   ```suggestion
     // then scatter the result back to the original row positions in the 
output.
   ```



##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -367,6 +369,12 @@ Status ExecSpanIterator::Init(const ExecBatch& batch, 
int64_t max_chunksize,
   value_offsets_.clear();
   value_offsets_.resize(args_->size(), 0);
   max_chunksize_ = std::min(length_, max_chunksize);
+  selection_vector_ = batch.selection_vector.get();
+  if (selection_vector_) {
+    selection_length_ = selection_vector_->length();
+  } else {
+    selection_length_ = 0;

Review Comment:
   I think it's less confusing if, without a selection, the "length of the 
selection" be the length of the whole array.



##########
cpp/src/arrow/compute/exec.h:
##########
@@ -131,26 +131,60 @@ class ARROW_EXPORT ExecContext {
 /// implementations. This is especially relevant for aggregations but also
 /// applies to scalar operations.
 ///
-/// We are not yet using this so this is mostly a placeholder for now.
-///
 /// [1]: http://cidrdb.org/cidr2005/papers/P19.pdf

Review Comment:
   Nice!



##########
cpp/src/arrow/compute/exec_internal.h:
##########
@@ -165,6 +171,25 @@ Status PropagateNulls(KernelContext* ctx, const ExecSpan& 
batch, ArrayData* out)
 ARROW_EXPORT
 void PropagateNullsSpans(const ExecSpan& batch, ArraySpan* out);
 
+template <typename OnSelectionFn>
+typename ::arrow::internal::call_traits::enable_if_return<OnSelectionFn, 
Status>::type
+VisitSelectionVectorSpanInline(const SelectionVectorSpan& selection,
+                               OnSelectionFn&& on_selection) {
+  for (int64_t i = 0; i < selection.length(); ++i) {
+    RETURN_NOT_OK(on_selection(selection[i]));

Review Comment:
   In theory, returning a `Status` is a cheap and simple (to the compiler) 
operation, but in practice it's not. Consider requiring a function that returns 
`bool`. If you always returns true, the inlines will remove the branches for 
early-return inside the loop.



##########
cpp/src/arrow/compute/exec_internal.h:
##########
@@ -29,6 +29,7 @@
 #include "arrow/compute/kernel.h"
 #include "arrow/status.h"
 #include "arrow/util/visibility.h"
+#include "arrow/visit_data_inline.h"

Review Comment:
   Is there a more specific header for 
`::arrow::internal::call_traits::enable_if_return`?



##########
cpp/src/arrow/compute/kernel.h:
##########
@@ -555,26 +555,51 @@ struct ARROW_EXPORT Kernel {
 /// employed this may not be possible.
 using ArrayKernelExec = Status (*)(KernelContext*, const ExecSpan&, 
ExecResult*);
 
+/// \brief The optional scalar kernel selective execution API for SCALAR 
kernel types.
+/// It's like ArrayKernelExec but with an additional SelectionVectorSpan 
argument. When a
+/// selection vector is specified in the batch, this API will be preferred, if 
provided,
+/// over ArrayKernelExec.
+using ArrayKernelSelectiveExec = Status (*)(KernelContext*, const ExecSpan&,
+                                            const SelectionVectorSpan&, 
ExecResult*);

Review Comment:
   I don't have a suggestion yet, but if we plan to support bitmaps as well, it 
would probably be better to pass something here that can be either a selection 
vector or a bitmap mask. The alternative being yet another `KernelExec` -- 
`ArrayKernelMaskedExec`.



##########
cpp/src/arrow/compute/kernel.h:
##########
@@ -555,26 +555,51 @@ struct ARROW_EXPORT Kernel {
 /// employed this may not be possible.
 using ArrayKernelExec = Status (*)(KernelContext*, const ExecSpan&, 
ExecResult*);
 
+/// \brief The optional scalar kernel selective execution API for SCALAR 
kernel types.
+/// It's like ArrayKernelExec but with an additional SelectionVectorSpan 
argument. When a
+/// selection vector is specified in the batch, this API will be preferred, if 
provided,
+/// over ArrayKernelExec.
+using ArrayKernelSelectiveExec = Status (*)(KernelContext*, const ExecSpan&,
+                                            const SelectionVectorSpan&, 
ExecResult*);
+
 /// \brief Kernel data structure for implementations of ScalarFunction. In
 /// addition to the members found in Kernel, contains the null handling
 /// and memory pre-allocation preferences.
 struct ARROW_EXPORT ScalarKernel : public Kernel {
   ScalarKernel() = default;
 
+  ScalarKernel(std::shared_ptr<KernelSignature> sig, ArrayKernelExec exec,
+               ArrayKernelSelectiveExec selective_exec, KernelInit init = 
NULLPTR)
+      : Kernel(std::move(sig), std::move(init)),
+        exec(std::move(exec)),
+        selective_exec(std::move(selective_exec)) {}
+
+  ScalarKernel(std::vector<InputType> in_types, OutputType out_type, 
ArrayKernelExec exec,
+               ArrayKernelSelectiveExec selective_exec, KernelInit init = 
NULLPTR)
+      : Kernel(std::move(in_types), std::move(out_type), std::move(init)),
+        exec(std::move(exec)),
+        selective_exec(std::move(selective_exec)) {}
+
   ScalarKernel(std::shared_ptr<KernelSignature> sig, ArrayKernelExec exec,
                KernelInit init = NULLPTR)
-      : Kernel(std::move(sig), init), exec(exec) {}
+      : ScalarKernel(std::move(sig), std::move(exec), NULLPTR, 
std::move(init)) {}
 
   ScalarKernel(std::vector<InputType> in_types, OutputType out_type, 
ArrayKernelExec exec,
                KernelInit init = NULLPTR)
-      : Kernel(std::move(in_types), std::move(out_type), std::move(init)), 
exec(exec) {}
+      : ScalarKernel(std::move(in_types), std::move(out_type), 
std::move(exec), NULLPTR,
+                     std::move(init)) {}
 
   /// \brief Perform a single invocation of this kernel. Depending on the
   /// implementation, it may only write into preallocated memory, while in some
   /// cases it will allocate its own memory. Any required state is managed
   /// through the KernelContext.
   ArrayKernelExec exec;
 
+  /// \brief Perform a single invocation of this kernel with a selection of 
indices. If
+  /// not provided, exec will be used with an enclosing gather/scatter pair 
instead, aka
+  /// "densely".

Review Comment:
   I think a more verbose explanation is warranted here. This is a tricky thing 
:)
   
   ```suggestion
     /// \brief Optional and similar to `exec`, but providing a specialized 
implementation
     /// that takes a selection vector argument and performs the computation 
only on the
     /// selected indices. When this specialized kernel is not provided we 
fallback to
     /// logic that gathers all selected values into a dense array, call `exec` 
on it
     /// and then scather the values on the output array.
   ```



##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -811,19 +878,46 @@ class ScalarExecutor : public 
KernelExecutorImpl<ScalarKernel> {
     }
   }
 
-  Datum WrapResults(const std::vector<Datum>& inputs,
-                    const std::vector<Datum>& outputs) override {
-    // If execution yielded multiple chunks (because large arrays were split
-    // based on the ExecContext parameters, then the result is a ChunkedArray
-    if (HaveChunkedArray(inputs) || outputs.size() > 1) {
-      return ToChunkedArray(outputs, output_type_);
-    } else {
-      // Outputs have just one element
-      return outputs[0];
+  // Execute a single batch with a selection vector "densely" for a kernel 
that doesn't
+  // support selective execution. "Densely" here means that we first gather 
the rows
+  // indicated by the selection vector into a contiguous ExecBatch, execute 
that, and
+  // then scatter the result back to the original row positions.
+  Status ExecuteSelectiveDense(const ExecBatch& batch, ExecListener* listener) 
{
+    DCHECK(batch.selection_vector && !kernel_->selective_exec);
+
+    if (CheckIfAllScalar(batch)) {
+      // For all-scalar batch, we can skip the gather/scatter steps as if 
there is no
+      // selection vector - the result is a scalar anyway.
+      ExecBatch input = batch;
+      input.selection_vector = nullptr;
+      return ExecuteBatch(input, listener);
     }
+
+    std::vector<Datum> values(batch.num_values());
+    for (int i = 0; i < batch.num_values(); ++i) {
+      if (batch[i].is_scalar()) {
+        // XXX: Skip gather for scalars since it is not currently supported by 
Take.

Review Comment:
   The reason for not taking from the scalar is that it's not necessary, right?



##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -694,7 +726,14 @@ std::shared_ptr<ChunkedArray> ToChunkedArray(const 
std::vector<Datum>& values,
       // Skip empty chunks
       continue;
     }
-    arrays.emplace_back(val.make_array());
+    if (val.is_chunked_array()) {
+      for (const auto& chunk : val.chunked_array()->chunks()) {
+        arrays.emplace_back(chunk);
+      }
+    } else {
+      DCHECK(val.is_array());
+      arrays.emplace_back(val.make_array());
+    }

Review Comment:
   This seems unrelated. Why is it necessary?



##########
cpp/src/arrow/compute/function.h:
##########
@@ -304,13 +304,21 @@ class ARROW_EXPORT ScalarFunction : public 
detail::FunctionImpl<ScalarKernel> {
                                            std::move(doc), default_options),
         is_pure_(is_pure) {}
 
-  /// \brief Add a kernel with given input/output types, no required state
-  /// initialization, preallocation for fixed-width types, and default null
-  /// handling (intersect validity bitmaps of inputs).
+  /// \brief Add a kernel with given input/output types and exec API, no 
selective exec
+  /// API, no required state initialization, preallocation for fixed-width 
types, and
+  /// default null handling (intersect validity bitmaps of inputs).

Review Comment:
   I think you can keep this one as is.



##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -811,19 +878,46 @@ class ScalarExecutor : public 
KernelExecutorImpl<ScalarKernel> {
     }
   }
 
-  Datum WrapResults(const std::vector<Datum>& inputs,
-                    const std::vector<Datum>& outputs) override {
-    // If execution yielded multiple chunks (because large arrays were split
-    // based on the ExecContext parameters, then the result is a ChunkedArray
-    if (HaveChunkedArray(inputs) || outputs.size() > 1) {
-      return ToChunkedArray(outputs, output_type_);
-    } else {
-      // Outputs have just one element
-      return outputs[0];
+  // Execute a single batch with a selection vector "densely" for a kernel 
that doesn't
+  // support selective execution. "Densely" here means that we first gather 
the rows
+  // indicated by the selection vector into a contiguous ExecBatch, execute 
that, and
+  // then scatter the result back to the original row positions.
+  Status ExecuteSelectiveDense(const ExecBatch& batch, ExecListener* listener) 
{
+    DCHECK(batch.selection_vector && !kernel_->selective_exec);
+
+    if (CheckIfAllScalar(batch)) {
+      // For all-scalar batch, we can skip the gather/scatter steps as if 
there is no
+      // selection vector - the result is a scalar anyway.
+      ExecBatch input = batch;
+      input.selection_vector = nullptr;
+      return ExecuteBatch(input, listener);
     }
+
+    std::vector<Datum> values(batch.num_values());
+    for (int i = 0; i < batch.num_values(); ++i) {
+      if (batch[i].is_scalar()) {
+        // XXX: Skip gather for scalars since it is not currently supported by 
Take.
+        values[i] = batch[i];
+        continue;

Review Comment:
   an else here would be clearer



##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -781,17 +820,45 @@ class KernelExecutorImpl : public KernelExecutor {
 class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
  public:
   Status Execute(const ExecBatch& batch, ExecListener* listener) override {
-    RETURN_NOT_OK(span_iterator_.Init(batch, 
exec_context()->exec_chunksize()));
-
     if (batch.length == 0) {
       // For zero-length batches, we do nothing except return a zero-length
       // array of the correct output type
       ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> result,
                             MakeArrayOfNull(output_type_.GetSharedPtr(), 
/*length=*/0,
                                             exec_context()->memory_pool()));
+      RETURN_NOT_OK(span_iterator_.Init(batch, 
exec_context()->exec_chunksize()));
       return EmitResult(result->data(), listener);
     }
 
+    if (batch.selection_vector && !kernel_->selective_exec) {
+      // If the batch contains a selection vector but the kernel does not 
support
+      // selective execution, we need to execute the batch in a "dense" manner.
+      return ExecuteSelectiveDense(batch, listener);
+    }
+
+    return ExecuteBatch(batch, listener);
+  }
+
+  Datum WrapResults(const std::vector<Datum>& inputs,

Review Comment:
   This one should be in the `protected:` section as well, right?



##########
cpp/src/arrow/compute/kernel.h:
##########
@@ -555,26 +555,51 @@ struct ARROW_EXPORT Kernel {
 /// employed this may not be possible.
 using ArrayKernelExec = Status (*)(KernelContext*, const ExecSpan&, 
ExecResult*);
 
+/// \brief The optional scalar kernel selective execution API for SCALAR 
kernel types.
+/// It's like ArrayKernelExec but with an additional SelectionVectorSpan 
argument. When a
+/// selection vector is specified in the batch, this API will be preferred, if 
provided,
+/// over ArrayKernelExec.
+using ArrayKernelSelectiveExec = Status (*)(KernelContext*, const ExecSpan&,
+                                            const SelectionVectorSpan&, 
ExecResult*);

Review Comment:
   Starting to think that adding another `KernelExec` will probably be best.
   
   Selection vectors are better than bitmask for very selective filters. 
Bitmasks are better when the filter is not very selective. Bitmaps are less 
important than selection vectors because if the filter is not selected 
computing on every value is not as bad.



##########
cpp/src/arrow/compute/exec.h:
##########
@@ -131,26 +131,60 @@ class ARROW_EXPORT ExecContext {
 /// implementations. This is especially relevant for aggregations but also
 /// applies to scalar operations.
 ///
-/// We are not yet using this so this is mostly a placeholder for now.
-///
 /// [1]: http://cidrdb.org/cidr2005/papers/P19.pdf
 class ARROW_EXPORT SelectionVector {
  public:
   explicit SelectionVector(std::shared_ptr<ArrayData> data);
 
   explicit SelectionVector(const Array& arr);
 
-  /// \brief Create SelectionVector from boolean mask
-  static Result<std::shared_ptr<SelectionVector>> FromMask(const BooleanArray& 
arr);
-
+  std::shared_ptr<ArrayData> data() const { return data_; }
   const int32_t* indices() const { return indices_; }
-  int32_t length() const;
+  int64_t length() const;
+
+  Status Validate(int64_t values_length = -1) const;
 
  private:
   std::shared_ptr<ArrayData> data_;
   const int32_t* indices_;
 };
 
+/// \brief A span of a SelectionVector's indices. Can represent a slice of the
+/// underlying indices.
+///
+/// Note that as an indirection of indices to the data in an ExecBatch, when 
sliced
+/// along with the batch, the indices themselves need to be back-shifted to be 
relative to
+/// the batch slice (ExecSpan). For example, consider an ExecBatch of 10 rows 
with a
+/// SelectionVector [0, 1, 9] is to be executed per-8-rows. The first slice of 
the batch
+/// will have row 0 to row 7 of the original batch with selection slice [0, 
1]. The second
+/// slice of the batch will have row 8 and row 9 of the original batch however 
they are
+/// referred to as row 0 and row 1 by the kernel. Therefore the second 
selection slice
+/// should be [9 - 8] = [1]. This is done by setting index_back_shift to 8 for 
the second
+/// selection slice.
+class ARROW_EXPORT SelectionVectorSpan {
+ public:
+  explicit SelectionVectorSpan(const int32_t* indices = NULLPTR, int64_t 
length = 0,
+                               int64_t offset = 0, int32_t index_back_shift = 
0)
+      : indices_(indices),
+        length_(length),
+        offset_(offset),
+        index_back_shift_(index_back_shift) {}
+
+  void SetSlice(int64_t offset, int64_t length, int32_t index_back_shift = 0);
+
+  int32_t operator[](int64_t i) const {
+    return indices_[i + offset_] - index_back_shift_;

Review Comment:
   When you use this class in loops, you will probably get better assembly if 
it's copied into a local variable (in the "stack") before the loop to get SROA 
[1] to kick in and then you can keep all these members in registers.
   
   [1] https://blog.regehr.org/archives/1603



##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -983,6 +1088,17 @@ class ScalarExecutor : public 
KernelExecutorImpl<ScalarKernel> {
     return Status::OK();
   }
 
+  // Actually invoke the kernel on the given input span, either selectively if 
there is a
+  // selection or non-selectively otherwise.
+  Status ExecuteKernel(const ExecSpan& input, const SelectionVectorSpan* 
selection,
+                       ExecResult* out) {
+    if (selection) {
+      DCHECK_NE(kernel_->selective_exec, nullptr);
+      return kernel_->selective_exec(kernel_ctx_, input, *selection, out);
+    }
+    return kernel_->exec(kernel_ctx_, input, out);
+  }

Review Comment:
   I know this is called twice, but it would probably be better to have it 
inlined at the callsites. This pre-condition that non-null selection implies 
non-null `selective_exec` is very specific.



##########
cpp/src/arrow/compute/exec_benchmark.cc:
##########
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/exec_internal.h"
+#include "arrow/compute/expression.h"
+#include "arrow/compute/function_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/registry.h"
+#include "arrow/testing/generator.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::compute {
+
+namespace {
+
+// A trivial kernel that just keeps the CPU busy for a specified number of 
iterations per
+// input row. Has both regular and selective variants. Used to benchmark the 
overhead of
+// the execution framework.
+
+struct SpinOptions : public FunctionOptions {
+  explicit SpinOptions(int64_t count = 0);
+  static constexpr char kTypeName[] = "SpinOptions";
+  static SpinOptions Defaults() { return SpinOptions(); }
+  int64_t count = 0;
+};
+
+static auto kSpinOptionsType = internal::GetFunctionOptionsType<SpinOptions>(
+    arrow::internal::DataMember("count", &SpinOptions::count));
+
+SpinOptions::SpinOptions(int64_t count)
+    : FunctionOptions(kSpinOptionsType), count(count) {}
+
+const SpinOptions* GetDefaultSpinOptions() {
+  static const auto kDefaultSpinOptions = SpinOptions::Defaults();
+  return &kDefaultSpinOptions;
+}
+
+using SpinState = internal::OptionsWrapper<SpinOptions>;
+
+inline void Spin(volatile int64_t count) {
+  while (count-- > 0) {
+    // Do nothing, just burn CPU cycles.

Review Comment:
   ~compiler probably optimizes this away~
   
   ok, now I see the `volatile`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to