felipecrv commented on code in PR #42127:
URL: https://github.com/apache/arrow/pull/42127#discussion_r1638503612
##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -536,142 +537,147 @@ Status ExtensionTake(KernelContext* ctx, const
ExecSpan& batch, ExecResult* out)
// R -> RecordBatch
// T -> Table
-Result<std::shared_ptr<ArrayData>> TakeAAA(const std::shared_ptr<ArrayData>&
values,
- const std::shared_ptr<ArrayData>&
indices,
- const TakeOptions& options,
ExecContext* ctx) {
- ARROW_ASSIGN_OR_RAISE(Datum result,
- CallFunction("array_take", {values, indices},
&options, ctx));
- return result.array();
-}
+const FunctionDoc take_doc(
+ "Select values from an input based on indices from another array",
+ ("The output is populated with values from the input at positions\n"
+ "given by `indices`. Nulls in `indices` emit null in the output."),
+ {"input", "indices"}, "TakeOptions");
-Result<std::shared_ptr<ChunkedArray>> TakeCAC(const ChunkedArray& values,
- const Array& indices,
- const TakeOptions& options,
- ExecContext* ctx) {
- std::shared_ptr<Array> values_array;
- if (values.num_chunks() == 1) {
- // Case 1: `values` has a single chunk, so just use it
- values_array = values.chunk(0);
- } else {
- // TODO Case 2: See if all `indices` fall in the same chunk and call Array
Take on it
- // See
- //
https://github.com/apache/arrow/blob/6f2c9041137001f7a9212f244b51bc004efc29af/r/src/compute.cpp#L123-L151
- // TODO Case 3: If indices are sorted, can slice them and call Array Take
- // (these are relevant to TakeCCC as well)
-
- // Case 4: Else, concatenate chunks and call Array Take
- if (values.chunks().empty()) {
- ARROW_ASSIGN_OR_RAISE(
- values_array, MakeArrayOfNull(values.type(), /*length=*/0,
ctx->memory_pool()));
- } else {
- ARROW_ASSIGN_OR_RAISE(values_array,
- Concatenate(values.chunks(), ctx->memory_pool()));
- }
+// Metafunction for dispatching to different Take implementations other than
+// Array-Array.
+//
+// TODO: Revamp approach to executing Take operations. In addition to being
+// overly complex dispatching, there is no parallelization.
+class TakeMetaFunction : public MetaFunction {
+ public:
+ TakeMetaFunction()
+ : MetaFunction("take", Arity::Binary(), take_doc,
GetDefaultTakeOptions()) {}
+
+ static Result<Datum> CallArrayTake(const std::vector<Datum>& args,
+ const TakeOptions& options, ExecContext*
ctx) {
+ ARROW_ASSIGN_OR_RAISE(auto array_take_func,
+ ctx->func_registry()->GetFunction("array_take"));
+ return array_take_func->Execute(args, &options, ctx);
}
- // Call Array Take on our single chunk
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> new_chunk,
- TakeAAA(values_array->data(), indices.data(), options,
ctx));
- std::vector<std::shared_ptr<Array>> chunks = {MakeArray(new_chunk)};
- return std::make_shared<ChunkedArray>(std::move(chunks));
-}
-Result<std::shared_ptr<ChunkedArray>> TakeCCC(const ChunkedArray& values,
- const ChunkedArray& indices,
- const TakeOptions& options,
- ExecContext* ctx) {
- // XXX: for every chunk in indices, values are gathered from all chunks in
values to
- // form a new chunk in the result. Performing this concatenation is not
ideal, but
- // greatly simplifies the implementation before something more efficient is
- // implemented.
- std::shared_ptr<Array> values_array;
- if (values.num_chunks() == 1) {
- values_array = values.chunk(0);
- } else {
- if (values.chunks().empty()) {
- ARROW_ASSIGN_OR_RAISE(
- values_array, MakeArrayOfNull(values.type(), /*length=*/0,
ctx->memory_pool()));
- } else {
- ARROW_ASSIGN_OR_RAISE(values_array,
- Concatenate(values.chunks(), ctx->memory_pool()));
+ static Result<std::shared_ptr<Array>> ChunkedArrayAsArray(
+ const std::shared_ptr<ChunkedArray>& values, MemoryPool* pool) {
+ switch (values->num_chunks()) {
+ case 0:
+ return MakeArrayOfNull(values->type(), /*length=*/0, pool);
+ case 1:
+ return values->chunk(0);
+ default:
+ return Concatenate(values->chunks(), pool);
}
}
- std::vector<std::shared_ptr<Array>> new_chunks;
- new_chunks.resize(indices.num_chunks());
- for (int i = 0; i < indices.num_chunks(); i++) {
- ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(values_array->data(),
- indices.chunk(i)->data(),
options, ctx));
- new_chunks[i] = MakeArray(chunk);
+
+ private:
+ static Result<std::shared_ptr<ArrayData>> TakeAAA(const std::vector<Datum>&
args,
+ const TakeOptions& options,
+ ExecContext* ctx) {
+ DCHECK_EQ(args[0].kind(), Datum::ARRAY);
+ DCHECK_EQ(args[1].kind(), Datum::ARRAY);
+ ARROW_ASSIGN_OR_RAISE(Datum result, CallArrayTake(args, options, ctx));
+ return result.array();
}
- return std::make_shared<ChunkedArray>(std::move(new_chunks), values.type());
-}
-Result<std::shared_ptr<ChunkedArray>> TakeACC(const Array& values,
- const ChunkedArray& indices,
- const TakeOptions& options,
- ExecContext* ctx) {
- auto num_chunks = indices.num_chunks();
- std::vector<std::shared_ptr<Array>> new_chunks(num_chunks);
- for (int i = 0; i < num_chunks; i++) {
- // Take with that indices chunk
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> chunk,
- TakeAAA(values.data(), indices.chunk(i)->data(),
options, ctx));
- new_chunks[i] = MakeArray(chunk);
+ static Result<std::shared_ptr<ArrayData>> TakeCAA(
+ const std::shared_ptr<ChunkedArray>& values, const Array& indices,
+ const TakeOptions& options, ExecContext* ctx) {
+ ARROW_ASSIGN_OR_RAISE(auto values_array,
+ ChunkedArrayAsArray(values, ctx->memory_pool()));
+ std::vector<Datum> args = {std::move(values_array), indices};
+ return TakeAAA(args, options, ctx);
}
- return std::make_shared<ChunkedArray>(std::move(new_chunks), values.type());
-}
-Result<std::shared_ptr<RecordBatch>> TakeRAR(const RecordBatch& batch,
- const Array& indices,
- const TakeOptions& options,
- ExecContext* ctx) {
- auto ncols = batch.num_columns();
- auto nrows = indices.length();
- std::vector<std::shared_ptr<Array>> columns(ncols);
- for (int j = 0; j < ncols; j++) {
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> col_data,
- TakeAAA(batch.column(j)->data(), indices.data(),
options, ctx));
- columns[j] = MakeArray(col_data);
+ static Result<std::shared_ptr<ChunkedArray>> TakeCAC(
+ const std::shared_ptr<ChunkedArray>& values, const Array& indices,
Review Comment:
I want to simplify these signatures further in the next PR as less will
actually have to be handled by the `"take"` MetaFunction as `"array_take"`
learns to handle chunked arrays by itself.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]