lidavidm commented on a change in pull request #10397: URL: https://github.com/apache/arrow/pull/10397#discussion_r640912315
########## File path: cpp/src/arrow/compute/exec/exec_plan.cc ########## @@ -170,48 +165,409 @@ Status ExecPlan::Validate() { return ToDerived(this)->Validate(); } Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); } -ExecNode::ExecNode(ExecPlan* plan, std::string label, - std::vector<BatchDescr> input_descrs, +ExecNode::ExecNode(ExecPlan* plan, std::string label, NodeVector inputs, std::vector<std::string> input_labels, BatchDescr output_descr, int num_outputs) : plan_(plan), label_(std::move(label)), - input_descrs_(std::move(input_descrs)), + inputs_(std::move(inputs)), input_labels_(std::move(input_labels)), output_descr_(std::move(output_descr)), - num_outputs_(num_outputs) {} + num_outputs_(num_outputs) { + for (auto input : inputs_) { + input->outputs_.push_back(this); + } +} Status ExecNode::Validate() const { - if (inputs_.size() != input_descrs_.size()) { + if (inputs_.size() != input_labels_.size()) { return Status::Invalid("Invalid number of inputs for '", label(), "' (expected ", - num_inputs(), ", actual ", inputs_.size(), ")"); + num_inputs(), ", actual ", input_labels_.size(), ")"); } if (static_cast<int>(outputs_.size()) != num_outputs_) { return Status::Invalid("Invalid number of outputs for '", label(), "' (expected ", num_outputs(), ", actual ", outputs_.size(), ")"); } - DCHECK_EQ(input_descrs_.size(), input_labels_.size()); - for (auto out : outputs_) { auto input_index = GetNodeIndex(out->inputs(), this); if (!input_index) { return Status::Invalid("Node '", label(), "' outputs to node '", out->label(), "' but is not listed as an input."); } + } - const auto& in_descr = out->input_descrs_[*input_index]; - if (in_descr != output_descr_) { - return Status::Invalid( - "Node '", label(), "' (bound to input ", input_labels_[*input_index], - ") produces batches with type '", ValueDescr::ToString(output_descr_), - "' inconsistent with consumer '", out->label(), "' which accepts '", - ValueDescr::ToString(in_descr), "'"); + return Status::OK(); +} + +struct SourceNode : ExecNode { + SourceNode(ExecPlan* plan, std::string label, ExecNode::BatchDescr output_descr, + AsyncGenerator<util::optional<ExecBatch>> generator) + : ExecNode(plan, std::move(label), {}, {}, std::move(output_descr), + /*num_outputs=*/1), + generator_(std::move(generator)) {} + + const char* kind_name() override { return "SourceNode"; } + + static void NoInputs() { DCHECK(false) << "no inputs; this should never be called"; } + void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); } + void ErrorReceived(ExecNode*, Status) override { NoInputs(); } + void InputFinished(ExecNode*, int) override { NoInputs(); } + + Status StartProducing() override { + if (finished_) { + return Status::Invalid("Restarted SourceNode '", label(), "'"); } + + auto gen = std::move(generator_); + + /// XXX should we wait on this future anywhere? In StopProducing() maybe? + auto done_fut = + Loop([gen, this] { + std::unique_lock<std::mutex> lock(mutex_); + int seq = next_batch_index_++; + if (finished_) { + return Future<ControlFlow<int>>::MakeFinished(Break(seq)); + } + lock.unlock(); + + return gen().Then( + [=](const util::optional<ExecBatch>& batch) -> ControlFlow<int> { + std::unique_lock<std::mutex> lock(mutex_); + if (!batch || finished_) { + finished_ = true; + return Break(seq); + } + lock.unlock(); + + outputs_[0]->InputReceived(this, seq, *batch); + return Continue(); + }, + [=](const Status& error) -> ControlFlow<int> { + std::unique_lock<std::mutex> lock(mutex_); + if (!finished_) { + finished_ = true; + lock.unlock(); + // unless we were already finished, push the error to our output + // XXX is this correct? Is it reasonable for a consumer to ignore errors + // from a finished producer? Review comment: Ah, I see. I think I have the same inclination then; except for maybe a sink node that's already gotten all its results, in which case subsequent errors are probably irrelevant, propagating errors even when otherwise 'finished' makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org