felipeblazing commented on a change in pull request #11210:
URL: https://github.com/apache/arrow/pull/11210#discussion_r713942122



##########
File path: cpp/src/arrow/compute/exec/filter_node.cc
##########
@@ -99,11 +105,35 @@ class FilterNode : public ExecNode {
   void InputReceived(ExecNode* input, ExecBatch batch) override {
     DCHECK_EQ(input, inputs_[0]);
 
-    auto maybe_filtered = DoFilter(std::move(batch));
-    if (ErrorIfNotOk(maybe_filtered.status())) return;
+    ARROW_LOG(DEBUG) << "FilterNode: >> input";
+
+    auto executor = plan()->exec_context()->executor();
+    if (executor) {
+      auto maybe_future = executor->Submit([this, batch] {
+        ARROW_LOG(DEBUG) << "FilterNode: >> filter ready: " + batch.ToString();
+
+        auto maybe_filtered = DoFilter(std::move(batch));
+        if (ErrorIfNotOk(maybe_filtered.status())) Status::Invalid("not ok");
+
+        maybe_filtered->guarantee = batch.guarantee;
+        this->outputs_[0]->InputReceived(this, 
maybe_filtered.MoveValueUnsafe());

Review comment:
       This feels like you would struggle to assure thread affinity as you went 
from task to task here.

##########
File path: cpp/src/arrow/compute/exec/filter_node.cc
##########
@@ -99,11 +105,35 @@ class FilterNode : public ExecNode {
   void InputReceived(ExecNode* input, ExecBatch batch) override {
     DCHECK_EQ(input, inputs_[0]);
 
-    auto maybe_filtered = DoFilter(std::move(batch));
-    if (ErrorIfNotOk(maybe_filtered.status())) return;
+    ARROW_LOG(DEBUG) << "FilterNode: >> input";
+
+    auto executor = plan()->exec_context()->executor();
+    if (executor) {
+      auto maybe_future = executor->Submit([this, batch] {
+        ARROW_LOG(DEBUG) << "FilterNode: >> filter ready: " + batch.ToString();
+
+        auto maybe_filtered = DoFilter(std::move(batch));
+        if (ErrorIfNotOk(maybe_filtered.status())) Status::Invalid("not ok");
+
+        maybe_filtered->guarantee = batch.guarantee;
+        this->outputs_[0]->InputReceived(this, 
maybe_filtered.MoveValueUnsafe());

Review comment:
       The other proposal was to have the task build up and input_received 
receive a task with the first node in some plan receiving a task that is 
something akin to, my task is giving you this data. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to