zanmato1984 commented on code in PR #45918:
URL: https://github.com/apache/arrow/pull/45918#discussion_r2011932484
##########
cpp/src/arrow/acero/swiss_join.cc:
##########
@@ -2414,12 +2438,20 @@ Status JoinProbeProcessor::OnFinished() {
// Flush all instances of materialize that have non-zero accumulated output
// rows.
//
- for (size_t i = 0; i < materialize_.size(); ++i) {
- JoinResultMaterialize& materialize = *materialize_[i];
- RETURN_NOT_OK(materialize.Flush(
- [&](ExecBatch batch) { return output_batch_fn_(i, std::move(batch));
}));
+ if (!is_parallel_) {
Review Comment:
We don't have to keep the serial execution code path. The "task group"
abstraction makes it independent of the underlying threading model - i.e. even
running in a non-parallel setup.
##########
cpp/src/arrow/acero/swiss_join.cc:
##########
@@ -2240,6 +2241,29 @@ void JoinProbeProcessor::Init(int num_key_columns,
JoinType join_type,
}
cmp_ = cmp;
output_batch_fn_ = output_batch_fn;
+
Review Comment:
Instead of here, could you move the "flush task" logic into `SwissJoin`? It
is a more centralized place for such control flow and contains all the context
you'll need (e.g. `materialize_`). You can check
`PartitionTask`/`BuildTask`/`MergeTask`/`ScanTask` in `SwissJoin` as references.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]