lidavidm commented on a change in pull request #10397:
URL: https://github.com/apache/arrow/pull/10397#discussion_r640912315



##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -170,48 +165,409 @@ Status ExecPlan::Validate() { return 
ToDerived(this)->Validate(); }
 
 Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); }
 
-ExecNode::ExecNode(ExecPlan* plan, std::string label,
-                   std::vector<BatchDescr> input_descrs,
+ExecNode::ExecNode(ExecPlan* plan, std::string label, NodeVector inputs,
                    std::vector<std::string> input_labels, BatchDescr 
output_descr,
                    int num_outputs)
     : plan_(plan),
       label_(std::move(label)),
-      input_descrs_(std::move(input_descrs)),
+      inputs_(std::move(inputs)),
       input_labels_(std::move(input_labels)),
       output_descr_(std::move(output_descr)),
-      num_outputs_(num_outputs) {}
+      num_outputs_(num_outputs) {
+  for (auto input : inputs_) {
+    input->outputs_.push_back(this);
+  }
+}
 
 Status ExecNode::Validate() const {
-  if (inputs_.size() != input_descrs_.size()) {
+  if (inputs_.size() != input_labels_.size()) {
     return Status::Invalid("Invalid number of inputs for '", label(), "' 
(expected ",
-                           num_inputs(), ", actual ", inputs_.size(), ")");
+                           num_inputs(), ", actual ", input_labels_.size(), 
")");
   }
 
   if (static_cast<int>(outputs_.size()) != num_outputs_) {
     return Status::Invalid("Invalid number of outputs for '", label(), "' 
(expected ",
                            num_outputs(), ", actual ", outputs_.size(), ")");
   }
 
-  DCHECK_EQ(input_descrs_.size(), input_labels_.size());
-
   for (auto out : outputs_) {
     auto input_index = GetNodeIndex(out->inputs(), this);
     if (!input_index) {
       return Status::Invalid("Node '", label(), "' outputs to node '", 
out->label(),
                              "' but is not listed as an input.");
     }
+  }
 
-    const auto& in_descr = out->input_descrs_[*input_index];
-    if (in_descr != output_descr_) {
-      return Status::Invalid(
-          "Node '", label(), "' (bound to input ", input_labels_[*input_index],
-          ") produces batches with type '", 
ValueDescr::ToString(output_descr_),
-          "' inconsistent with consumer '", out->label(), "' which accepts '",
-          ValueDescr::ToString(in_descr), "'");
+  return Status::OK();
+}
+
+struct SourceNode : ExecNode {
+  SourceNode(ExecPlan* plan, std::string label, ExecNode::BatchDescr 
output_descr,
+             AsyncGenerator<util::optional<ExecBatch>> generator)
+      : ExecNode(plan, std::move(label), {}, {}, std::move(output_descr),
+                 /*num_outputs=*/1),
+        generator_(std::move(generator)) {}
+
+  const char* kind_name() override { return "SourceNode"; }
+
+  static void NoInputs() { DCHECK(false) << "no inputs; this should never be 
called"; }
+  void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+  void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+  void InputFinished(ExecNode*, int) override { NoInputs(); }
+
+  Status StartProducing() override {
+    if (finished_) {
+      return Status::Invalid("Restarted SourceNode '", label(), "'");
     }
+
+    auto gen = std::move(generator_);
+
+    /// XXX should we wait on this future anywhere? In StopProducing() maybe?
+    auto done_fut =
+        Loop([gen, this] {
+          std::unique_lock<std::mutex> lock(mutex_);
+          int seq = next_batch_index_++;
+          if (finished_) {
+            return Future<ControlFlow<int>>::MakeFinished(Break(seq));
+          }
+          lock.unlock();
+
+          return gen().Then(
+              [=](const util::optional<ExecBatch>& batch) -> ControlFlow<int> {
+                std::unique_lock<std::mutex> lock(mutex_);
+                if (!batch || finished_) {
+                  finished_ = true;
+                  return Break(seq);
+                }
+                lock.unlock();
+
+                outputs_[0]->InputReceived(this, seq, *batch);
+                return Continue();
+              },
+              [=](const Status& error) -> ControlFlow<int> {
+                std::unique_lock<std::mutex> lock(mutex_);
+                if (!finished_) {
+                  finished_ = true;
+                  lock.unlock();
+                  // unless we were already finished, push the error to our 
output
+                  // XXX is this correct? Is it reasonable for a consumer to 
ignore errors
+                  // from a finished producer?

Review comment:
       Ah, I see. I think I have the same inclination then; except for maybe a 
sink node that's already gotten all its results, in which case subsequent 
errors are probably irrelevant, propagating errors even when otherwise 
'finished' makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to