zanmato1984 commented on code in PR #47377:
URL: https://github.com/apache/arrow/pull/47377#discussion_r2626261054
##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -465,6 +480,23 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
}
}
+ // Then the selection span
+ if (selection_vector_) {
+ DCHECK_NE(selection_span, nullptr);
+ auto indices_begin = selection_vector_->indices() + selection_position_;
+ auto indices_end = selection_vector_->indices() +
selection_vector_->length();
+ DCHECK_LE(indices_begin, indices_end);
+ auto chunk_row_id_end = position_ + iteration_size;
+ int64_t num_indices = 0;
+ while (indices_begin + num_indices < indices_end &&
+ *(indices_begin + num_indices) < chunk_row_id_end) {
+ ++num_indices;
+ }
Review Comment:
Good catch. Replaced with a `log(N)` complexity `std::lower_bound()`.
##########
cpp/src/arrow/compute/kernel.h:
##########
@@ -555,26 +555,51 @@ struct ARROW_EXPORT Kernel {
/// employed this may not be possible.
using ArrayKernelExec = Status (*)(KernelContext*, const ExecSpan&,
ExecResult*);
+/// \brief The optional scalar kernel selective execution API for SCALAR
kernel types.
+/// It's like ArrayKernelExec but with an additional SelectionVectorSpan
argument. When a
+/// selection vector is specified in the batch, this API will be preferred, if
provided,
+/// over ArrayKernelExec.
+using ArrayKernelSelectiveExec = Status (*)(KernelContext*, const ExecSpan&,
+ const SelectionVectorSpan&,
ExecResult*);
Review Comment:
Good suggestion. Shall we do that in follow-up PRs?
##########
cpp/src/arrow/compute/function.h:
##########
@@ -304,13 +304,21 @@ class ARROW_EXPORT ScalarFunction : public
detail::FunctionImpl<ScalarKernel> {
std::move(doc), default_options),
is_pure_(is_pure) {}
- /// \brief Add a kernel with given input/output types, no required state
- /// initialization, preallocation for fixed-width types, and default null
- /// handling (intersect validity bitmaps of inputs).
+ /// \brief Add a kernel with given input/output types and exec API, no
selective exec
+ /// API, no required state initialization, preallocation for fixed-width
types, and
+ /// default null handling (intersect validity bitmaps of inputs).
Review Comment:
I think this is being as clear as the rest of the style ("no required state"
etc.)
##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -983,6 +1088,17 @@ class ScalarExecutor : public
KernelExecutorImpl<ScalarKernel> {
return Status::OK();
}
+ // Actually invoke the kernel on the given input span, either selectively if
there is a
+ // selection or non-selectively otherwise.
+ Status ExecuteKernel(const ExecSpan& input, const SelectionVectorSpan*
selection,
+ ExecResult* out) {
+ if (selection) {
+ DCHECK_NE(kernel_->selective_exec, nullptr);
+ return kernel_->selective_exec(kernel_ctx_, input, *selection, out);
+ }
+ return kernel_->exec(kernel_ctx_, input, out);
+ }
Review Comment:
> This pre-condition that non-null selection implies non-null
`selective_exec` is very specific.
Sorry I don't get it. The two callsites both have the possibility that
`selection` is non-null, and we need to make sure that `selective_exec` is also
non-null. In other word, if we inline it, the code would be exactly the same in
these two places.
Or are you suggesting something performance-wise?
##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -694,7 +726,14 @@ std::shared_ptr<ChunkedArray> ToChunkedArray(const
std::vector<Datum>& values,
// Skip empty chunks
continue;
}
- arrays.emplace_back(val.make_array());
+ if (val.is_chunked_array()) {
+ for (const auto& chunk : val.chunked_array()->chunks()) {
+ arrays.emplace_back(chunk);
+ }
+ } else {
+ DCHECK(val.is_array());
+ arrays.emplace_back(val.make_array());
+ }
Review Comment:
Plus, as a quite independent free function, I think it's no harm to extend
it a little bit to support chunked array?
##########
cpp/src/arrow/compute/exec_internal.h:
##########
@@ -29,6 +29,7 @@
#include "arrow/compute/kernel.h"
#include "arrow/status.h"
#include "arrow/util/visibility.h"
+#include "arrow/visit_data_inline.h"
Review Comment:
Indeed. `arrow/util/functional.h` is the one.
##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -811,19 +878,46 @@ class ScalarExecutor : public
KernelExecutorImpl<ScalarKernel> {
}
}
- Datum WrapResults(const std::vector<Datum>& inputs,
- const std::vector<Datum>& outputs) override {
- // If execution yielded multiple chunks (because large arrays were split
- // based on the ExecContext parameters, then the result is a ChunkedArray
- if (HaveChunkedArray(inputs) || outputs.size() > 1) {
- return ToChunkedArray(outputs, output_type_);
- } else {
- // Outputs have just one element
- return outputs[0];
+ // Execute a single batch with a selection vector "densely" for a kernel
that doesn't
+ // support selective execution. "Densely" here means that we first gather
the rows
+ // indicated by the selection vector into a contiguous ExecBatch, execute
that, and
+ // then scatter the result back to the original row positions.
+ Status ExecuteSelectiveDense(const ExecBatch& batch, ExecListener* listener)
{
+ DCHECK(batch.selection_vector && !kernel_->selective_exec);
+
+ if (CheckIfAllScalar(batch)) {
+ // For all-scalar batch, we can skip the gather/scatter steps as if
there is no
+ // selection vector - the result is a scalar anyway.
+ ExecBatch input = batch;
+ input.selection_vector = nullptr;
+ return ExecuteBatch(input, listener);
}
+
+ std::vector<Datum> values(batch.num_values());
+ for (int i = 0; i < batch.num_values(); ++i) {
+ if (batch[i].is_scalar()) {
+ // XXX: Skip gather for scalars since it is not currently supported by
Take.
+ values[i] = batch[i];
+ continue;
+ }
+ ARROW_ASSIGN_OR_RAISE(values[i],
+ Take(batch[i], *batch.selection_vector->data(),
+ TakeOptions{/*boundcheck=*/false},
exec_context()));
+ }
+ ARROW_ASSIGN_OR_RAISE(
+ ExecBatch input,
+ ExecBatch::Make(std::move(values), batch.selection_vector->length()));
+
+ DatumAccumulator dense_listener;
+ RETURN_NOT_OK(ExecuteBatch(input, &dense_listener));
+ Datum dense_result = WrapResults(input.values, dense_listener.values());
Review Comment:
Here we need to actually obtain the "dense" result datum (as opposed to
passing them into the eventual `listener`), to call scatter on. Thus we need to
call `WrapResults` to fully reuse the existing result obtaining logic. This
applies one extra round of "faking a chunked array". Then in the final
`WrapResults` called by the caller (function executor or expression
evaluation), we may see an already faked chunked array.
##########
cpp/src/arrow/compute/exec_internal.h:
##########
@@ -165,6 +171,25 @@ Status PropagateNulls(KernelContext* ctx, const ExecSpan&
batch, ArrayData* out)
ARROW_EXPORT
void PropagateNullsSpans(const ExecSpan& batch, ArraySpan* out);
+template <typename OnSelectionFn>
+typename ::arrow::internal::call_traits::enable_if_return<OnSelectionFn,
Status>::type
+VisitSelectionVectorSpanInline(const SelectionVectorSpan& selection,
+ OnSelectionFn&& on_selection) {
+ for (int64_t i = 0; i < selection.length(); ++i) {
+ RETURN_NOT_OK(on_selection(selection[i]));
Review Comment:
Sorry, I don't get it. Could you elaborate a bit?
##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -694,7 +726,14 @@ std::shared_ptr<ChunkedArray> ToChunkedArray(const
std::vector<Datum>& values,
// Skip empty chunks
continue;
}
- arrays.emplace_back(val.make_array());
+ if (val.is_chunked_array()) {
+ for (const auto& chunk : val.chunked_array()->chunks()) {
+ arrays.emplace_back(chunk);
+ }
+ } else {
+ DCHECK(val.is_array());
+ arrays.emplace_back(val.make_array());
+ }
Review Comment:
Explained in https://github.com/apache/arrow/pull/47377/changes#r2626184747
##########
cpp/src/arrow/compute/exec.h:
##########
@@ -131,26 +131,60 @@ class ARROW_EXPORT ExecContext {
/// implementations. This is especially relevant for aggregations but also
/// applies to scalar operations.
///
-/// We are not yet using this so this is mostly a placeholder for now.
-///
/// [1]: http://cidrdb.org/cidr2005/papers/P19.pdf
class ARROW_EXPORT SelectionVector {
public:
explicit SelectionVector(std::shared_ptr<ArrayData> data);
explicit SelectionVector(const Array& arr);
- /// \brief Create SelectionVector from boolean mask
- static Result<std::shared_ptr<SelectionVector>> FromMask(const BooleanArray&
arr);
-
+ std::shared_ptr<ArrayData> data() const { return data_; }
const int32_t* indices() const { return indices_; }
- int32_t length() const;
+ int64_t length() const;
+
+ Status Validate(int64_t values_length = -1) const;
private:
std::shared_ptr<ArrayData> data_;
const int32_t* indices_;
};
+/// \brief A span of a SelectionVector's indices. Can represent a slice of the
+/// underlying indices.
+///
+/// Note that as an indirection of indices to the data in an ExecBatch, when
sliced
+/// along with the batch, the indices themselves need to be back-shifted to be
relative to
+/// the batch slice (ExecSpan). For example, consider an ExecBatch of 10 rows
with a
+/// SelectionVector [0, 1, 9] is to be executed per-8-rows. The first slice of
the batch
+/// will have row 0 to row 7 of the original batch with selection slice [0,
1]. The second
+/// slice of the batch will have row 8 and row 9 of the original batch however
they are
+/// referred to as row 0 and row 1 by the kernel. Therefore the second
selection slice
+/// should be [9 - 8] = [1]. This is done by setting index_back_shift to 8 for
the second
+/// selection slice.
+class ARROW_EXPORT SelectionVectorSpan {
+ public:
+ explicit SelectionVectorSpan(const int32_t* indices = NULLPTR, int64_t
length = 0,
+ int64_t offset = 0, int32_t index_back_shift =
0)
+ : indices_(indices),
+ length_(length),
+ offset_(offset),
+ index_back_shift_(index_back_shift) {}
+
+ void SetSlice(int64_t offset, int64_t length, int32_t index_back_shift = 0);
+
+ int32_t operator[](int64_t i) const {
+ return indices_[i + offset_] - index_back_shift_;
Review Comment:
My concern is that exposing the `index_back_shift` would be too verbose and
error-prone. Better use some encapsulation to hide it. Maybe let the span
accept a lambda, within which we can write more compiler-friendly code
meanwhile keep the `index_back_shift` hidden?
##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -811,19 +878,46 @@ class ScalarExecutor : public
KernelExecutorImpl<ScalarKernel> {
}
}
- Datum WrapResults(const std::vector<Datum>& inputs,
- const std::vector<Datum>& outputs) override {
- // If execution yielded multiple chunks (because large arrays were split
- // based on the ExecContext parameters, then the result is a ChunkedArray
- if (HaveChunkedArray(inputs) || outputs.size() > 1) {
- return ToChunkedArray(outputs, output_type_);
- } else {
- // Outputs have just one element
- return outputs[0];
+ // Execute a single batch with a selection vector "densely" for a kernel
that doesn't
+ // support selective execution. "Densely" here means that we first gather
the rows
+ // indicated by the selection vector into a contiguous ExecBatch, execute
that, and
+ // then scatter the result back to the original row positions.
+ Status ExecuteSelectiveDense(const ExecBatch& batch, ExecListener* listener)
{
+ DCHECK(batch.selection_vector && !kernel_->selective_exec);
+
+ if (CheckIfAllScalar(batch)) {
+ // For all-scalar batch, we can skip the gather/scatter steps as if
there is no
+ // selection vector - the result is a scalar anyway.
+ ExecBatch input = batch;
+ input.selection_vector = nullptr;
+ return ExecuteBatch(input, listener);
}
+
+ std::vector<Datum> values(batch.num_values());
+ for (int i = 0; i < batch.num_values(); ++i) {
+ if (batch[i].is_scalar()) {
+ // XXX: Skip gather for scalars since it is not currently supported by
Take.
Review Comment:
Technically it's not necessary. But the drawback is we lose the ability to
uniformly call `Take` on any `Datum` - have to make sure it's not scalar and go
through a special path, like here, for scalar.
I think maybe we can simply return the scalar as is for `Take` (to allow the
uniform invoking on arbitrary `Datum`). Or we insist that taking scalar makes
no sense and we do special checks everywhere.
##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -781,17 +820,45 @@ class KernelExecutorImpl : public KernelExecutor {
class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
public:
Status Execute(const ExecBatch& batch, ExecListener* listener) override {
- RETURN_NOT_OK(span_iterator_.Init(batch,
exec_context()->exec_chunksize()));
-
if (batch.length == 0) {
// For zero-length batches, we do nothing except return a zero-length
// array of the correct output type
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> result,
MakeArrayOfNull(output_type_.GetSharedPtr(),
/*length=*/0,
exec_context()->memory_pool()));
+ RETURN_NOT_OK(span_iterator_.Init(batch,
exec_context()->exec_chunksize()));
return EmitResult(result->data(), listener);
}
+ if (batch.selection_vector && !kernel_->selective_exec) {
+ // If the batch contains a selection vector but the kernel does not
support
+ // selective execution, we need to execute the batch in a "dense" manner.
+ return ExecuteSelectiveDense(batch, listener);
+ }
+
+ return ExecuteBatch(batch, listener);
+ }
+
+ Datum WrapResults(const std::vector<Datum>& inputs,
Review Comment:
It's override of a public method of its parent class
`KernelExecutor::WrapResults()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]