bkietz commented on a change in pull request #10705:
URL: https://github.com/apache/arrow/pull/10705#discussion_r668873010



##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -601,5 +618,215 @@ AsyncGenerator<util::optional<ExecBatch>> 
MakeSinkNode(ExecNode* input,
   return out;
 }
 
+std::shared_ptr<RecordBatchReader> MakeSinkNodeReader(ExecNode* input,
+                                                      std::string label) {
+  struct Impl : RecordBatchReader {
+    std::shared_ptr<Schema> schema() const override { return schema_; }
+    Status ReadNext(std::shared_ptr<RecordBatch>* record_batch) override {
+      ARROW_ASSIGN_OR_RAISE(auto batch, iterator_.Next());
+      if (batch) {
+        ARROW_ASSIGN_OR_RAISE(*record_batch, batch->ToRecordBatch(schema_, 
pool_));
+      } else {
+        *record_batch = IterationEnd<std::shared_ptr<RecordBatch>>();
+      }
+      return Status::OK();
+    }
+
+    MemoryPool* pool_;
+    std::shared_ptr<Schema> schema_;
+    Iterator<util::optional<ExecBatch>> iterator_;
+  };
+
+  auto out = std::make_shared<Impl>();
+  out->pool_ = input->plan()->exec_context()->memory_pool();
+  out->schema_ = input->output_schema();
+  out->iterator_ = MakeGeneratorIterator(MakeSinkNode(input, 
std::move(label)));
+  return out;
+}
+
+struct ScalarAggregateNode : ExecNode {
+  ScalarAggregateNode(ExecNode* input, std::string label,
+                      std::shared_ptr<Schema> output_schema,
+                      std::vector<const ScalarAggregateKernel*> kernels,
+                      std::vector<std::vector<std::unique_ptr<KernelState>>> 
states)
+      : ExecNode(input->plan(), std::move(label), {input}, {"target"},
+                 /*output_schema=*/std::move(output_schema),
+                 /*num_outputs=*/1),
+        kernels_(std::move(kernels)),
+        states_(std::move(states)) {}
+
+  const char* kind_name() override { return "ScalarAggregateNode"; }
+
+  Status DoConsume(const ExecBatch& batch,
+                   const std::vector<std::unique_ptr<KernelState>>& states) {
+    for (size_t i = 0; i < states.size(); ++i) {
+      KernelContext batch_ctx{plan()->exec_context()};
+      batch_ctx.SetState(states[i].get());
+      ExecBatch single_column_batch{{batch.values[i]}, batch.length};
+      RETURN_NOT_OK(kernels_[i]->consume(&batch_ctx, single_column_batch));
+    }
+    return Status::OK();
+  }
+
+  void InputReceived(ExecNode* input, int seq, ExecBatch batch) override {

Review comment:
       `seq` is not an indication of order, it's only a tag in the range `[0, 
seq_stop)` (where `seq_stop` is set by `InputFinished`) so we could not use it 
to order results.
   
   As specified in ARROW-12710, the `KernelState` of the string concat agg 
kernel will need to include ordering criteria so that `merge(move(state1), 
&state0)` can be guaranteed equivalent to `merge(move(state0), &state1)`. 
Furthermore, `merge` cannot actually concatenate anything because if we 
happened to first `merge(move(state0), &state3)` we'd have no way to insert 
`state1, state2` in the middle later. Actual concatenation would have to wait 
for `finalize`. 
   
   Those ordering criteria could be synthesized from (for example) 
fragment/batch index information, but the presence of `O(N)` state in a scalar 
agg kernel's State is suspect to me and I'm not sure it's a great match for 
ScalarAggregateKernel.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to