westonpace commented on a change in pull request #10705:
URL: https://github.com/apache/arrow/pull/10705#discussion_r668255841
##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -601,5 +618,215 @@ AsyncGenerator<util::optional<ExecBatch>>
MakeSinkNode(ExecNode* input,
return out;
}
+std::shared_ptr<RecordBatchReader> MakeSinkNodeReader(ExecNode* input,
+ std::string label) {
+ struct Impl : RecordBatchReader {
+ std::shared_ptr<Schema> schema() const override { return schema_; }
+ Status ReadNext(std::shared_ptr<RecordBatch>* record_batch) override {
+ ARROW_ASSIGN_OR_RAISE(auto batch, iterator_.Next());
+ if (batch) {
+ ARROW_ASSIGN_OR_RAISE(*record_batch, batch->ToRecordBatch(schema_,
pool_));
+ } else {
+ *record_batch = IterationEnd<std::shared_ptr<RecordBatch>>();
+ }
+ return Status::OK();
+ }
+
+ MemoryPool* pool_;
+ std::shared_ptr<Schema> schema_;
+ Iterator<util::optional<ExecBatch>> iterator_;
+ };
+
+ auto out = std::make_shared<Impl>();
+ out->pool_ = input->plan()->exec_context()->memory_pool();
+ out->schema_ = input->output_schema();
+ out->iterator_ = MakeGeneratorIterator(MakeSinkNode(input,
std::move(label)));
+ return out;
+}
+
+struct ScalarAggregateNode : ExecNode {
+ ScalarAggregateNode(ExecNode* input, std::string label,
+ std::shared_ptr<Schema> output_schema,
+ std::vector<const ScalarAggregateKernel*> kernels,
+ std::vector<std::vector<std::unique_ptr<KernelState>>>
states)
+ : ExecNode(input->plan(), std::move(label), {input}, {"target"},
+ /*output_schema=*/std::move(output_schema),
+ /*num_outputs=*/1),
+ kernels_(std::move(kernels)),
+ states_(std::move(states)) {}
+
+ const char* kind_name() override { return "ScalarAggregateNode"; }
+
+ Status DoConsume(const ExecBatch& batch,
+ const std::vector<std::unique_ptr<KernelState>>& states) {
+ for (size_t i = 0; i < states.size(); ++i) {
+ KernelContext batch_ctx{plan()->exec_context()};
+ batch_ctx.SetState(states[i].get());
+ ExecBatch single_column_batch{{batch.values[i]}, batch.length};
+ RETURN_NOT_OK(kernels_[i]->consume(&batch_ctx, single_column_batch));
+ }
+ return Status::OK();
+ }
+
+ void InputReceived(ExecNode* input, int seq, ExecBatch batch) override {
+ DCHECK_EQ(input, inputs_[0]);
+
+ std::unique_lock<std::mutex> lock(mutex_);
+ auto it =
+ thread_indices_.emplace(std::this_thread::get_id(),
thread_indices_.size()).first;
+ ++num_received_;
+ auto thread_index = it->second;
+
+ lock.unlock();
+
+ const auto& thread_local_state = states_[thread_index];
+ Status st = DoConsume(std::move(batch), thread_local_state);
+ if (!st.ok()) {
+ outputs_[0]->ErrorReceived(this, std::move(st));
+ return;
+ }
+
+ lock.lock();
+ st = MaybeFinish(&lock);
+ if (!st.ok()) {
+ outputs_[0]->ErrorReceived(this, std::move(st));
+ }
+ }
+
+ void ErrorReceived(ExecNode* input, Status error) override {
+ DCHECK_EQ(input, inputs_[0]);
+ outputs_[0]->ErrorReceived(this, std::move(error));
+ }
+
+ void InputFinished(ExecNode* input, int seq) override {
+ DCHECK_EQ(input, inputs_[0]);
+ std::unique_lock<std::mutex> lock(mutex_);
+ num_total_ = seq;
+ Status st = MaybeFinish(&lock);
+
+ if (!st.ok()) {
+ outputs_[0]->ErrorReceived(this, std::move(st));
+ }
+ }
+
+ Status StartProducing() override {
+ finished_ = Future<>::Make();
+ // Scalar aggregates will only output a single batch
+ outputs_[0]->InputFinished(this, 1);
+ return Status::OK();
+ }
+
+ void PauseProducing(ExecNode* output) override {}
+
+ void ResumeProducing(ExecNode* output) override {}
+
+ void StopProducing(ExecNode* output) override {
+ DCHECK_EQ(output, outputs_[0]);
+ StopProducing();
+ }
+
+ void StopProducing() override {
+ inputs_[0]->StopProducing(this);
+ finished_.MarkFinished();
+ }
+
+ Future<> finished() override { return finished_; }
+
+ private:
+ Status MaybeFinish(std::unique_lock<std::mutex>* lock) {
+ if (num_received_ != num_total_) return Status::OK();
+
+ if (finished_.is_finished()) return Status::OK();
+
+ ExecBatch batch{{}, 1};
+ batch.values.resize(kernels_.size());
+
+ for (size_t i = 0; i < kernels_.size(); ++i) {
+ KernelContext ctx{plan()->exec_context()};
+ ctx.SetState(states_[0][i].get());
+
+ for (size_t thread_index = 1; thread_index < thread_indices_.size();
+ ++thread_index) {
+ RETURN_NOT_OK(
+ kernels_[i]->merge(&ctx, std::move(*states_[thread_index][i]),
ctx.state()));
+ }
+ RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i]));
+ }
+ lock->unlock();
+
+ outputs_[0]->InputReceived(this, 0, batch);
+
+ finished_.MarkFinished();
+ return Status::OK();
+ }
+
+ Future<> finished_ = Future<>::MakeFinished();
+ std::vector<const ScalarAggregateKernel*> kernels_;
+ std::vector<std::vector<std::unique_ptr<KernelState>>> states_;
+ std::unordered_map<std::thread::id, size_t> thread_indices_;
+ std::mutex mutex_;
+ int num_received_ = 0, num_total_;
+};
+
+Result<ExecNode*> MakeScalarAggregateNode(ExecNode* input, std::string label,
+ std::vector<internal::Aggregate>
aggregates) {
+ if (input->output_schema()->num_fields() !=
static_cast<int>(aggregates.size())) {
+ return Status::Invalid("Provided ", aggregates.size(),
+ " aggregates, expected one for each field of ",
+ input->output_schema()->ToString());
+ }
+
+ auto exec_ctx = input->plan()->exec_context();
+
+ std::vector<std::shared_ptr<ScalarAggregateFunction>>
functions(aggregates.size());
+ std::vector<const ScalarAggregateKernel*> kernels(aggregates.size());
+ std::vector<std::vector<std::unique_ptr<KernelState>>> states(
+ exec_ctx->executor() ? exec_ctx->executor()->GetCapacity() : 1);
+ FieldVector fields(aggregates.size());
+
+ for (size_t i = 0; i < aggregates.size(); ++i) {
+ ARROW_ASSIGN_OR_RAISE(auto function,
+
exec_ctx->func_registry()->GetFunction(aggregates[i].function));
+ if (function->kind() != Function::SCALAR_AGGREGATE) {
+ return Status::Invalid("Provided non ScalarAggregateFunction ",
+ aggregates[i].function);
+ }
+
+ functions[i] =
checked_pointer_cast<ScalarAggregateFunction>(std::move(function));
Review comment:
Nit: Isn't `function` sufficient? I don't see it referenced outside the
loop scope. Do you need to gather the vector of functions for some other
reason?
##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -601,5 +618,215 @@ AsyncGenerator<util::optional<ExecBatch>>
MakeSinkNode(ExecNode* input,
return out;
}
+std::shared_ptr<RecordBatchReader> MakeSinkNodeReader(ExecNode* input,
+ std::string label) {
+ struct Impl : RecordBatchReader {
+ std::shared_ptr<Schema> schema() const override { return schema_; }
+ Status ReadNext(std::shared_ptr<RecordBatch>* record_batch) override {
+ ARROW_ASSIGN_OR_RAISE(auto batch, iterator_.Next());
+ if (batch) {
+ ARROW_ASSIGN_OR_RAISE(*record_batch, batch->ToRecordBatch(schema_,
pool_));
+ } else {
+ *record_batch = IterationEnd<std::shared_ptr<RecordBatch>>();
+ }
+ return Status::OK();
+ }
+
+ MemoryPool* pool_;
+ std::shared_ptr<Schema> schema_;
+ Iterator<util::optional<ExecBatch>> iterator_;
+ };
+
+ auto out = std::make_shared<Impl>();
+ out->pool_ = input->plan()->exec_context()->memory_pool();
+ out->schema_ = input->output_schema();
+ out->iterator_ = MakeGeneratorIterator(MakeSinkNode(input,
std::move(label)));
+ return out;
+}
+
+struct ScalarAggregateNode : ExecNode {
+ ScalarAggregateNode(ExecNode* input, std::string label,
+ std::shared_ptr<Schema> output_schema,
+ std::vector<const ScalarAggregateKernel*> kernels,
+ std::vector<std::vector<std::unique_ptr<KernelState>>>
states)
+ : ExecNode(input->plan(), std::move(label), {input}, {"target"},
+ /*output_schema=*/std::move(output_schema),
+ /*num_outputs=*/1),
+ kernels_(std::move(kernels)),
+ states_(std::move(states)) {}
+
+ const char* kind_name() override { return "ScalarAggregateNode"; }
+
+ Status DoConsume(const ExecBatch& batch,
+ const std::vector<std::unique_ptr<KernelState>>& states) {
+ for (size_t i = 0; i < states.size(); ++i) {
+ KernelContext batch_ctx{plan()->exec_context()};
+ batch_ctx.SetState(states[i].get());
+ ExecBatch single_column_batch{{batch.values[i]}, batch.length};
+ RETURN_NOT_OK(kernels_[i]->consume(&batch_ctx, single_column_batch));
+ }
+ return Status::OK();
+ }
+
+ void InputReceived(ExecNode* input, int seq, ExecBatch batch) override {
+ DCHECK_EQ(input, inputs_[0]);
+
+ std::unique_lock<std::mutex> lock(mutex_);
+ auto it =
+ thread_indices_.emplace(std::this_thread::get_id(),
thread_indices_.size()).first;
+ ++num_received_;
+ auto thread_index = it->second;
+
+ lock.unlock();
+
+ const auto& thread_local_state = states_[thread_index];
+ Status st = DoConsume(std::move(batch), thread_local_state);
+ if (!st.ok()) {
+ outputs_[0]->ErrorReceived(this, std::move(st));
+ return;
+ }
+
+ lock.lock();
Review comment:
This lock could probably be removed. We might want to make a note to
measure this with micro benchmarks someday. Only one thread should be
finishing anyways and the "what state blocks have we used" map could probably
be a lock free structure.
##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -601,5 +618,215 @@ AsyncGenerator<util::optional<ExecBatch>>
MakeSinkNode(ExecNode* input,
return out;
}
+std::shared_ptr<RecordBatchReader> MakeSinkNodeReader(ExecNode* input,
+ std::string label) {
+ struct Impl : RecordBatchReader {
+ std::shared_ptr<Schema> schema() const override { return schema_; }
+ Status ReadNext(std::shared_ptr<RecordBatch>* record_batch) override {
+ ARROW_ASSIGN_OR_RAISE(auto batch, iterator_.Next());
+ if (batch) {
+ ARROW_ASSIGN_OR_RAISE(*record_batch, batch->ToRecordBatch(schema_,
pool_));
+ } else {
+ *record_batch = IterationEnd<std::shared_ptr<RecordBatch>>();
+ }
+ return Status::OK();
+ }
+
+ MemoryPool* pool_;
+ std::shared_ptr<Schema> schema_;
+ Iterator<util::optional<ExecBatch>> iterator_;
+ };
+
+ auto out = std::make_shared<Impl>();
+ out->pool_ = input->plan()->exec_context()->memory_pool();
+ out->schema_ = input->output_schema();
+ out->iterator_ = MakeGeneratorIterator(MakeSinkNode(input,
std::move(label)));
+ return out;
+}
+
+struct ScalarAggregateNode : ExecNode {
+ ScalarAggregateNode(ExecNode* input, std::string label,
+ std::shared_ptr<Schema> output_schema,
+ std::vector<const ScalarAggregateKernel*> kernels,
+ std::vector<std::vector<std::unique_ptr<KernelState>>>
states)
+ : ExecNode(input->plan(), std::move(label), {input}, {"target"},
+ /*output_schema=*/std::move(output_schema),
+ /*num_outputs=*/1),
+ kernels_(std::move(kernels)),
+ states_(std::move(states)) {}
+
+ const char* kind_name() override { return "ScalarAggregateNode"; }
+
+ Status DoConsume(const ExecBatch& batch,
+ const std::vector<std::unique_ptr<KernelState>>& states) {
+ for (size_t i = 0; i < states.size(); ++i) {
+ KernelContext batch_ctx{plan()->exec_context()};
+ batch_ctx.SetState(states[i].get());
+ ExecBatch single_column_batch{{batch.values[i]}, batch.length};
+ RETURN_NOT_OK(kernels_[i]->consume(&batch_ctx, single_column_batch));
+ }
+ return Status::OK();
+ }
+
+ void InputReceived(ExecNode* input, int seq, ExecBatch batch) override {
+ DCHECK_EQ(input, inputs_[0]);
+
+ std::unique_lock<std::mutex> lock(mutex_);
+ auto it =
+ thread_indices_.emplace(std::this_thread::get_id(),
thread_indices_.size()).first;
+ ++num_received_;
+ auto thread_index = it->second;
+
+ lock.unlock();
+
+ const auto& thread_local_state = states_[thread_index];
+ Status st = DoConsume(std::move(batch), thread_local_state);
+ if (!st.ok()) {
+ outputs_[0]->ErrorReceived(this, std::move(st));
+ return;
+ }
+
+ lock.lock();
+ st = MaybeFinish(&lock);
+ if (!st.ok()) {
+ outputs_[0]->ErrorReceived(this, std::move(st));
+ }
+ }
+
+ void ErrorReceived(ExecNode* input, Status error) override {
+ DCHECK_EQ(input, inputs_[0]);
+ outputs_[0]->ErrorReceived(this, std::move(error));
+ }
+
+ void InputFinished(ExecNode* input, int seq) override {
+ DCHECK_EQ(input, inputs_[0]);
+ std::unique_lock<std::mutex> lock(mutex_);
+ num_total_ = seq;
+ Status st = MaybeFinish(&lock);
+
+ if (!st.ok()) {
+ outputs_[0]->ErrorReceived(this, std::move(st));
+ }
+ }
+
+ Status StartProducing() override {
+ finished_ = Future<>::Make();
+ // Scalar aggregates will only output a single batch
+ outputs_[0]->InputFinished(this, 1);
+ return Status::OK();
+ }
+
+ void PauseProducing(ExecNode* output) override {}
+
+ void ResumeProducing(ExecNode* output) override {}
+
+ void StopProducing(ExecNode* output) override {
+ DCHECK_EQ(output, outputs_[0]);
+ StopProducing();
+ }
+
+ void StopProducing() override {
+ inputs_[0]->StopProducing(this);
+ finished_.MarkFinished();
+ }
+
+ Future<> finished() override { return finished_; }
+
+ private:
+ Status MaybeFinish(std::unique_lock<std::mutex>* lock) {
+ if (num_received_ != num_total_) return Status::OK();
+
+ if (finished_.is_finished()) return Status::OK();
+
+ ExecBatch batch{{}, 1};
+ batch.values.resize(kernels_.size());
+
+ for (size_t i = 0; i < kernels_.size(); ++i) {
+ KernelContext ctx{plan()->exec_context()};
+ ctx.SetState(states_[0][i].get());
+
+ for (size_t thread_index = 1; thread_index < thread_indices_.size();
+ ++thread_index) {
+ RETURN_NOT_OK(
+ kernels_[i]->merge(&ctx, std::move(*states_[thread_index][i]),
ctx.state()));
+ }
+ RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i]));
+ }
+ lock->unlock();
+
+ outputs_[0]->InputReceived(this, 0, batch);
+
+ finished_.MarkFinished();
+ return Status::OK();
+ }
+
+ Future<> finished_ = Future<>::MakeFinished();
+ std::vector<const ScalarAggregateKernel*> kernels_;
+ std::vector<std::vector<std::unique_ptr<KernelState>>> states_;
+ std::unordered_map<std::thread::id, size_t> thread_indices_;
+ std::mutex mutex_;
+ int num_received_ = 0, num_total_;
+};
+
+Result<ExecNode*> MakeScalarAggregateNode(ExecNode* input, std::string label,
+ std::vector<internal::Aggregate>
aggregates) {
+ if (input->output_schema()->num_fields() !=
static_cast<int>(aggregates.size())) {
+ return Status::Invalid("Provided ", aggregates.size(),
+ " aggregates, expected one for each field of ",
+ input->output_schema()->ToString());
+ }
+
+ auto exec_ctx = input->plan()->exec_context();
+
+ std::vector<std::shared_ptr<ScalarAggregateFunction>>
functions(aggregates.size());
+ std::vector<const ScalarAggregateKernel*> kernels(aggregates.size());
+ std::vector<std::vector<std::unique_ptr<KernelState>>> states(
+ exec_ctx->executor() ? exec_ctx->executor()->GetCapacity() : 1);
+ FieldVector fields(aggregates.size());
+
+ for (size_t i = 0; i < aggregates.size(); ++i) {
+ ARROW_ASSIGN_OR_RAISE(auto function,
+
exec_ctx->func_registry()->GetFunction(aggregates[i].function));
+ if (function->kind() != Function::SCALAR_AGGREGATE) {
+ return Status::Invalid("Provided non ScalarAggregateFunction ",
+ aggregates[i].function);
+ }
+
+ functions[i] =
checked_pointer_cast<ScalarAggregateFunction>(std::move(function));
+
+ auto in_type =
ValueDescr::Array(input->output_schema()->fields()[i]->type());
+
+ ARROW_ASSIGN_OR_RAISE(const Kernel* kernel,
functions[i]->DispatchExact({in_type}));
+ kernels[i] = static_cast<const ScalarAggregateKernel*>(kernel);
+
+ if (aggregates[i].options == nullptr) {
+ aggregates[i].options = functions[i]->default_options();
+ }
+
+ KernelContext kernel_ctx{exec_ctx};
+ for (auto& thread_local_states : states) {
+ thread_local_states.resize(kernels.size());
+ ARROW_ASSIGN_OR_RAISE(
+ thread_local_states[i],
+ kernels[i]->init(&kernel_ctx, KernelInitArgs{kernels[i],
Review comment:
Nit: Minor thing but could this maybe be a helper function on the kernel
`init_all` which takes in a vector of states and runs init on each one?
##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -601,5 +618,215 @@ AsyncGenerator<util::optional<ExecBatch>>
MakeSinkNode(ExecNode* input,
return out;
}
+std::shared_ptr<RecordBatchReader> MakeSinkNodeReader(ExecNode* input,
+ std::string label) {
+ struct Impl : RecordBatchReader {
+ std::shared_ptr<Schema> schema() const override { return schema_; }
+ Status ReadNext(std::shared_ptr<RecordBatch>* record_batch) override {
+ ARROW_ASSIGN_OR_RAISE(auto batch, iterator_.Next());
+ if (batch) {
+ ARROW_ASSIGN_OR_RAISE(*record_batch, batch->ToRecordBatch(schema_,
pool_));
+ } else {
+ *record_batch = IterationEnd<std::shared_ptr<RecordBatch>>();
+ }
+ return Status::OK();
+ }
+
+ MemoryPool* pool_;
+ std::shared_ptr<Schema> schema_;
+ Iterator<util::optional<ExecBatch>> iterator_;
+ };
+
+ auto out = std::make_shared<Impl>();
+ out->pool_ = input->plan()->exec_context()->memory_pool();
+ out->schema_ = input->output_schema();
+ out->iterator_ = MakeGeneratorIterator(MakeSinkNode(input,
std::move(label)));
+ return out;
+}
+
+struct ScalarAggregateNode : ExecNode {
+ ScalarAggregateNode(ExecNode* input, std::string label,
+ std::shared_ptr<Schema> output_schema,
+ std::vector<const ScalarAggregateKernel*> kernels,
+ std::vector<std::vector<std::unique_ptr<KernelState>>>
states)
+ : ExecNode(input->plan(), std::move(label), {input}, {"target"},
+ /*output_schema=*/std::move(output_schema),
+ /*num_outputs=*/1),
+ kernels_(std::move(kernels)),
+ states_(std::move(states)) {}
+
+ const char* kind_name() override { return "ScalarAggregateNode"; }
+
+ Status DoConsume(const ExecBatch& batch,
+ const std::vector<std::unique_ptr<KernelState>>& states) {
+ for (size_t i = 0; i < states.size(); ++i) {
+ KernelContext batch_ctx{plan()->exec_context()};
+ batch_ctx.SetState(states[i].get());
+ ExecBatch single_column_batch{{batch.values[i]}, batch.length};
+ RETURN_NOT_OK(kernels_[i]->consume(&batch_ctx, single_column_batch));
+ }
+ return Status::OK();
+ }
+
+ void InputReceived(ExecNode* input, int seq, ExecBatch batch) override {
+ DCHECK_EQ(input, inputs_[0]);
+
+ std::unique_lock<std::mutex> lock(mutex_);
+ auto it =
+ thread_indices_.emplace(std::this_thread::get_id(),
thread_indices_.size()).first;
+ ++num_received_;
+ auto thread_index = it->second;
+
+ lock.unlock();
+
+ const auto& thread_local_state = states_[thread_index];
+ Status st = DoConsume(std::move(batch), thread_local_state);
+ if (!st.ok()) {
+ outputs_[0]->ErrorReceived(this, std::move(st));
+ return;
+ }
+
+ lock.lock();
+ st = MaybeFinish(&lock);
+ if (!st.ok()) {
+ outputs_[0]->ErrorReceived(this, std::move(st));
+ }
+ }
+
+ void ErrorReceived(ExecNode* input, Status error) override {
+ DCHECK_EQ(input, inputs_[0]);
+ outputs_[0]->ErrorReceived(this, std::move(error));
+ }
+
+ void InputFinished(ExecNode* input, int seq) override {
+ DCHECK_EQ(input, inputs_[0]);
+ std::unique_lock<std::mutex> lock(mutex_);
+ num_total_ = seq;
+ Status st = MaybeFinish(&lock);
+
+ if (!st.ok()) {
+ outputs_[0]->ErrorReceived(this, std::move(st));
+ }
+ }
+
+ Status StartProducing() override {
+ finished_ = Future<>::Make();
+ // Scalar aggregates will only output a single batch
+ outputs_[0]->InputFinished(this, 1);
+ return Status::OK();
+ }
+
+ void PauseProducing(ExecNode* output) override {}
+
+ void ResumeProducing(ExecNode* output) override {}
+
+ void StopProducing(ExecNode* output) override {
+ DCHECK_EQ(output, outputs_[0]);
+ StopProducing();
+ }
+
+ void StopProducing() override {
+ inputs_[0]->StopProducing(this);
+ finished_.MarkFinished();
+ }
+
+ Future<> finished() override { return finished_; }
+
+ private:
+ Status MaybeFinish(std::unique_lock<std::mutex>* lock) {
+ if (num_received_ != num_total_) return Status::OK();
+
+ if (finished_.is_finished()) return Status::OK();
+
+ ExecBatch batch{{}, 1};
+ batch.values.resize(kernels_.size());
+
+ for (size_t i = 0; i < kernels_.size(); ++i) {
Review comment:
Maybe someday in the future we could merge each kernel on its own thread
but that can be for a future PR.
##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -601,5 +618,215 @@ AsyncGenerator<util::optional<ExecBatch>>
MakeSinkNode(ExecNode* input,
return out;
}
+std::shared_ptr<RecordBatchReader> MakeSinkNodeReader(ExecNode* input,
Review comment:
Very minor thought, but having utility Iterator<RB> + schema => RBR and
Generator<RB> + schema => RBR could be useful and then it would be more clear
that the main goal here is ExecBatch -> RecordBatch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]