westonpace commented on a change in pull request #11923:
URL: https://github.com/apache/arrow/pull/11923#discussion_r771065253



##########
File path: docs/source/cpp/streaming_execution.rst
##########
@@ -95,8 +95,14 @@ through unchanged::
     class PassthruNode : public ExecNode {
      public:
       // InputReceived is the main entry point for ExecNodes. It is invoked
-      // by an input of this node to push a batch here for processing.
-      void InputReceived(ExecNode* input, ExecBatch batch) override {
+      // by an input of this node to push a task here for processing.
+      // For non-terminating nodes (e.g. filter/project/etc.): the node can 
wrap
+      // its own work with the task (using function composition/fusing) and 
then
+      // call InputReceived on the downstream node.
+      // A "terminating node" (e.g. sink node / pipeline breaker) could then 
submit
+      // the task to a scheduler.
+      void InputReceived(ExecNode* input,
+                         std::function<Result<ExecBatch>()> task) override {

Review comment:
       ```suggestion
         // by an input of this node to push a task here for processing.
         // Non-terminating nodes (e.g. filter/project/etc.): should wrap
         // their own work with the task (using function composition/fusing) 
and then
         // call InputReceived on the downstream node.
         // Terminating nodes (e.g. sink node / pipeline breaker) should submit
         // the task to an executor or task group.
         void InputReceived(ExecNode* input,
                            std::function<Result<ExecBatch>()> task) override {
   ```
   Some minor wording and removing the term scheduler as one doesn't exist yet.

##########
File path: cpp/src/arrow/compute/exec/filter_node.cc
##########
@@ -92,10 +92,13 @@ class FilterNode : public MapNode {
     return ExecBatch::Make(std::move(values));
   }
 
-  void InputReceived(ExecNode* input, ExecBatch batch) override {
+  void InputReceived(ExecNode* input, std::function<Result<ExecBatch>()> task) 
override {
     DCHECK_EQ(input, inputs_[0]);
-    auto func = [this](ExecBatch batch) { return DoFilter(std::move(batch)); };
-    this->SubmitTask(std::move(func), std::move(batch));
+    auto func = [this, task]() -> Result<ExecBatch> {
+      ARROW_ASSIGN_OR_RAISE(auto batch, task());
+      return DoFilter(std::move(batch));
+    };
+    this->SubmitTask(std::move(func));

Review comment:
       // This node should be forwarding a fused task but that will be handled 
in ARROW-XYZ

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -107,19 +109,19 @@ struct SourceNode : ExecNode {
                 ExecBatch batch = std::move(*maybe_batch);
 
                 if (executor) {
-                  auto status =
-                      task_group_.AddTask([this, executor, batch]() -> 
Result<Future<>> {
-                        return executor->Submit([=]() {
-                          outputs_[0]->InputReceived(this, std::move(batch));
-                          return Status::OK();
-                        });
-                      });
+                  auto status = task_group_.AddTask([this, executor,
+                                                     batch]() -> 
Result<Future<>> {
+                    return executor->Submit([=]() {
+                      outputs_[0]->InputReceived(this, 
IdentityTask(std::move(batch)));
+                      return Status::OK();
+                    });
+                  });
                   if (!status.ok()) {
                     outputs_[0]->ErrorReceived(this, std::move(status));
                     return Break(total_batches);
                   }
                 } else {
-                  outputs_[0]->InputReceived(this, std::move(batch));
+                  outputs_[0]->InputReceived(this, 
IdentityTask(std::move(batch)));

Review comment:
       So what will this eventually look like?  If we assume we don't know how 
many batches a scanner will emit then how many "scan tasks" do we submit 
individually?  I suppose we can always "over-submit" and then the final tasks 
will just abandon themselves if the scanner is finished.  Could this be another 
spot for backpressure?  I don't think we have to solve all of these problems 
right now.

##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -188,7 +193,12 @@ class ConsumingSinkNode : public ExecNode {
       return;
     }
 
-    Status consumption_status = consumer_->Consume(std::move(batch));
+    auto batch = task();
+    if (!batch.ok()) {
+      ErrorIfNotOk(batch.status());
+      return;
+    }
+    Status consumption_status = consumer_->Consume(batch.MoveValueUnsafe());

Review comment:
       // This node should be submitting...

##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -274,11 +284,15 @@ struct OrderBySinkNode final : public SinkNode {
                                               sink_options.backpressure);
   }
 
-  void InputReceived(ExecNode* input, ExecBatch batch) override {
+  void InputReceived(ExecNode* input, std::function<Result<ExecBatch>()> task) 
override {
     DCHECK_EQ(input, inputs_[0]);
-
-    auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
-                                           
plan()->exec_context()->memory_pool());
+    auto batch = task();
+    if (!batch.ok()) {
+      ErrorIfNotOk(batch.status());
+      return;
+    }
+    auto maybe_batch = batch.ValueUnsafe().ToRecordBatch(
+        inputs_[0]->output_schema(), plan()->exec_context()->memory_pool());

Review comment:
       // This node should be submitting a task...

##########
File path: cpp/src/arrow/compute/exec/project_node.cc
##########
@@ -88,10 +88,13 @@ class ProjectNode : public MapNode {
     return ExecBatch{std::move(values), target.length};
   }
 
-  void InputReceived(ExecNode* input, ExecBatch batch) override {
+  void InputReceived(ExecNode* input, std::function<Result<ExecBatch>()> task) 
override {
     DCHECK_EQ(input, inputs_[0]);
-    auto func = [this](ExecBatch batch) { return DoProject(std::move(batch)); 
};
-    this->SubmitTask(std::move(func), std::move(batch));
+    auto func = [this, task]() -> Result<ExecBatch> {
+      ARROW_ASSIGN_OR_RAISE(auto batch, task());
+      return DoProject(std::move(batch));
+    };
+    this->SubmitTask(std::move(func));

Review comment:
       // This node should be forwarding a task and not submitting a task and 
that will be handled in ARROW-XYZ

##########
File path: cpp/src/arrow/compute/exec/hash_join_node.cc
##########
@@ -494,7 +494,13 @@ class HashJoinNode : public ExecNode {
     size_t thread_index = thread_indexer_();
     int side = (input == inputs_[0]) ? 0 : 1;
     {
-      Status status = impl_->InputReceived(thread_index, side, 
std::move(batch));
+      auto batch = task();
+      if (!batch.ok()) {
+        StopProducing();
+        ErrorIfNotOk(batch.status());
+        return;
+      }
+      Status status = impl_->InputReceived(thread_index, side, 
batch.MoveValueUnsafe());

Review comment:
       // This node should be submitting a task to an executor / scheduler but 
that will be handled in ARROW-XYZ

##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -95,10 +95,15 @@ class SinkNode : public ExecNode {
 
   Future<> finished() override { return finished_; }
 
-  void InputReceived(ExecNode* input, ExecBatch batch) override {
+  void InputReceived(ExecNode* input, std::function<Result<ExecBatch>()> task) 
override {
     DCHECK_EQ(input, inputs_[0]);
 
-    bool did_push = producer_.Push(std::move(batch));
+    auto batch = task();
+    if (!batch.ok()) {
+      ErrorIfNotOk(batch.status());
+      return;
+    }
+    bool did_push = producer_.Push(batch.MoveValueUnsafe());

Review comment:
       // This node should be submitting...

##########
File path: cpp/src/arrow/compute/exec/aggregate_node.cc
##########
@@ -175,18 +175,21 @@ class ScalarAggregateNode : public ExecNode {
     return Status::OK();
   }
 
-  void InputReceived(ExecNode* input, ExecBatch batch) override {
+  void InputReceived(ExecNode* input, std::function<Result<ExecBatch>()> task) 
override {
     DCHECK_EQ(input, inputs_[0]);
 
     auto thread_index = get_thread_index_();
-
-    if (ErrorIfNotOk(DoConsume(std::move(batch), thread_index))) return;
+    auto prev = task();
+    if (!prev.ok()) {
+      ErrorIfNotOk(prev.status());
+      return;
+    }
+    if (ErrorIfNotOk(DoConsume(prev.MoveValueUnsafe(), thread_index))) return;

Review comment:
       If we aren't going to address this now let's make another JIRA (taskify 
3?)  Something like, "Fix logic in existing nodes so that pipeline breakers 
submit and non-breakers forward" and then add a comment in all of these spots 
along the lines of...
   
   // This node should be forwarding the task downstream but that will be 
addressed in ARROW-XYZ

##########
File path: cpp/src/arrow/compute/exec/aggregate_node.cc
##########
@@ -483,13 +491,18 @@ class GroupByNode : public ExecNode {
     return Status::OK();
   }
 
-  void InputReceived(ExecNode* input, ExecBatch batch) override {
+  void InputReceived(ExecNode* input, std::function<Result<ExecBatch>()> task) 
override {
     // bail if StopProducing was called
     if (finished_.is_finished()) return;
 
     DCHECK_EQ(input, inputs_[0]);
 
-    if (ErrorIfNotOk(Consume(std::move(batch)))) return;
+    auto prev = task();
+    if (!prev.ok()) {
+      ErrorIfNotOk(prev.status());
+      return;
+    }
+    if (ErrorIfNotOk(Consume(prev.MoveValueUnsafe()))) return;

Review comment:
       // This node should be submitting the task to an executor but that will 
be addressed in ARROW-XYZ

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -350,37 +350,38 @@ void MapNode::StopProducing() {
 
 Future<> MapNode::finished() { return finished_; }
 
-void MapNode::SubmitTask(std::function<Result<ExecBatch>(ExecBatch)> map_fn,
-                         ExecBatch batch) {
+void MapNode::SubmitTask(std::function<Result<ExecBatch>()> map_fn) {

Review comment:
       Actually, thinking on this more today I think we might still want it (we 
can do this in a future PR if desired).  It would basically look like...
   
   ```
   virtual Result<ExecBatch> MapNode::Map(ExecBatch batch) = 0;
   
   void MapNode::InputReceived(ExecNode* input, 
std::function<Result<ExecBatch>()> task) override {
       auto fused = [task] () {
           ExecBatch batch = task();
           return Map(batch);
       };
       outputs->InputReceived(this, fused);
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to