westonpace commented on a change in pull request #9742: URL: https://github.com/apache/arrow/pull/9742#discussion_r598487916
########## File path: cpp/src/arrow/engine/exec_plan.cc ########## @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/engine/exec_plan.h" + +#include "arrow/compute/exec.h" +#include "arrow/engine/query_plan.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/future.h" +#include "arrow/util/logging.h" +#include "arrow/util/thread_pool.h" + +namespace arrow { + +using internal::checked_cast; + +namespace engine { + +ExecNode::InputBatch::InputBatch(std::vector<util::optional<compute::ExecBatch>> batches) + : batches(std::move(batches)), ready_fut(Future<>::Make()) {} + +ExecNode::ExecNode(ExecPlan* plan, const QueryNode* query_node) + : plan_(plan), + query_node_(query_node), + num_inputs_(query_node->num_inputs()), + finished_fut_(Future<>::Make()) {} + +ExecNode::~ExecNode() = default; + +void ExecNode::ReserveBatches(int32_t num_batches) { + // Should be called with mutex locked + if (static_cast<size_t>(num_batches) > input_batches_.size()) { + input_batches_.resize(num_batches); + } +} + +ExecNode::InputBatch* ExecNode::EnsureBatch(int32_t batch_index) { + // Should be called with mutex locked + if (input_batches_[batch_index] == nullptr) { + input_batches_[batch_index].reset( + new InputBatch{std::vector<util::optional<compute::ExecBatch>>(num_inputs_)}); + } + return input_batches_[batch_index].get(); +} + +Status ExecNode::InputReceived(int32_t input_index, int32_t batch_index, + compute::ExecBatch batch) { + auto lock = mutex_.Lock(); + + if (input_index >= num_inputs_) { + return Status::Invalid("Invalid input index"); + } + if (finish_at_ >= 0 && batch_index >= finish_at_) { + return Status::Invalid("Input batch index out of bounds"); + } + + ReserveBatches(batch_index + 1); + auto* input_batch = EnsureBatch(batch_index); + + if (input_batch->batches[input_index].has_value()) { + return Status::Invalid("Batch #", batch_index, " for input #", input_index, + " already received"); + } + input_batch->batches[input_index] = std::move(batch); + if (++input_batch->num_ready == num_inputs_) { + input_batch->ready_fut.MarkFinished(); + } + return Status::OK(); +} + +Status ExecNode::InputFinished(int32_t num_batches) { + auto lock = mutex_.Lock(); + + if (finish_at_ >= 0) { + return Status::Invalid("InputFinished already called"); + } + finish_at_ = num_batches; + ReserveBatches(num_batches); + + std::vector<Future<>> batch_futures; + for (int32_t i = 0; i < num_batches; ++i) { + auto* input_batch = EnsureBatch(i); + batch_futures[i] = input_batch->ready_fut; + } + + // TODO lifetime + AllComplete(std::move(batch_futures)) + .AddCallback([this](const Result<detail::Empty>& res) { + finished_fut_.MarkFinished(res.status()); + }); + return Status::OK(); +} + +Future<> ExecNode::RunAsync(int32_t batch_index, internal::Executor* executor) { + auto lock = mutex_.Lock(); + + ReserveBatches(batch_index + 1); + auto* input_batch = EnsureBatch(batch_index); + + // TODO lifetime (take strong ref to ExecPlan?) + return executor->Transfer(input_batch->ready_fut) Review comment: Would different nodes run on different executors? If not it might be simpler to avoid the transfer. Also, do I have to call RunAsync on every node for every batch index? Couldn't calling RunAsync on the root node(s?) be enough? ########## File path: cpp/src/arrow/engine/query_plan.h ########## @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> +#include <vector> + +#include "arrow/compute/type_fwd.h" +#include "arrow/type_fwd.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace engine { + +class ExecPlan; +class QueryContext; +class QueryNode; + +class ARROW_EXPORT QueryPlan { Review comment: What is the relationship between a query plan and an execution plan? My current understanding is that a query plan is an AST of the query and the execution plan is a possibly optimized tree of workers. Is this correct? ########## File path: cpp/src/arrow/engine/exec_plan.h ########## @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> +#include <vector> + +#include "arrow/compute/type_fwd.h" +#include "arrow/type_fwd.h" +#include "arrow/util/future.h" +#include "arrow/util/macros.h" +#include "arrow/util/mutex.h" +#include "arrow/util/optional.h" +#include "arrow/util/type_fwd.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace engine { + +class ExecNode; +class QueryNode; +class QueryPlan; + +class ARROW_EXPORT ExecPlan { + public: Review comment: How do I actually run this? There doesn't appear to be any public interface. ########## File path: cpp/src/arrow/engine/exec_plan.h ########## @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> +#include <vector> + +#include "arrow/compute/type_fwd.h" +#include "arrow/type_fwd.h" +#include "arrow/util/future.h" +#include "arrow/util/macros.h" +#include "arrow/util/mutex.h" +#include "arrow/util/optional.h" +#include "arrow/util/type_fwd.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace engine { + +class ExecNode; +class QueryNode; +class QueryPlan; + +class ARROW_EXPORT ExecPlan { + public: + using NodeVector = std::vector<ExecNode*>; + + virtual ~ExecPlan() = default; + + /// The query plan this ExecPlan is an instance of + const QueryPlan& query_plan() const { return *query_plan_; } + + compute::ExecContext* context() { return context_; } + + protected: + friend class QueryPlan; Review comment: Shouldn't this be the other way around? The QueryPlan doesn't have any reference to the ExecPlan (and if my understanding of the relationship is correct it shouldn't). What value is there in this declaration? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
