This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a3279a26044 [refine](exchange) Use is_merge from FE for judgment
instead of relying on the operator in BE. (#45592)
a3279a26044 is described below
commit a3279a260441692cd0494c0eb8a93e5f97aa2945
Author: Mryange <[email protected]>
AuthorDate: Thu Dec 19 17:04:03 2024 +0800
[refine](exchange) Use is_merge from FE for judgment instead of relying on
the operator in BE. (#45592)
### What problem does this PR solve?
Previously, determining whether the receiver is a merge exchange relied
on checking if the specific operator was a sort node.
However, this approach is incorrect because there are many types of sort
operators: regular sort, partitioned sort, and spill sort.
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 10 ++--------
be/src/pipeline/exec/exchange_sink_operator.h | 4 +++-
2 files changed, 5 insertions(+), 9 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index e7fed76be8f..cc789f6e25b 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -32,7 +32,6 @@
#include "pipeline/exec/operator.h"
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
-#include "pipeline/local_exchange/local_exchange_source_operator.h"
#include "pipeline/pipeline_fragment_context.h"
#include "util/runtime_profile.h"
#include "util/uid_util.h"
@@ -279,6 +278,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_tablet_sink_txn_id(sink.tablet_sink_txn_id),
_t_tablet_sink_exprs(&sink.tablet_sink_exprs),
_enable_local_merge_sort(state->enable_local_merge_sort()),
+ _dest_is_merge(sink.__isset.is_merge && sink.is_merge),
_fragment_instance_ids(fragment_instance_ids) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
@@ -571,19 +571,13 @@ std::shared_ptr<ExchangeSinkBuffer>
ExchangeSinkOperatorX::_create_buffer(
// Therefore, a shared sink buffer is used here to limit the number of
concurrent RPCs.
// (Note: This does not reduce the total number of RPCs.)
// In a merge sort scenario, there are only n RPCs, so a shared sink buffer is
not needed.
-/// TODO: Modify this to let FE handle the judgment instead of BE.
std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer(
InstanceLoId sender_ins_id) {
- if (!_child) {
- throw doris::Exception(ErrorCode::INTERNAL_ERROR,
- "ExchangeSinkOperatorX did not correctly set
the child.");
- }
// When the child is SortSourceOperatorX or LocalExchangeSourceOperatorX,
// it is an order-by scenario.
// In this case, there is only one target instance, and no n * n RPC
concurrency will occur.
// Therefore, sharing a sink buffer is not necessary.
- if (std::dynamic_pointer_cast<SortSourceOperatorX>(_child) ||
- std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child)) {
+ if (_dest_is_merge) {
return _create_buffer({sender_ins_id});
}
if (_state->enable_shared_exchange_sink_buffer()) {
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 85575beb9f7..3d6eeb4b39e 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -205,7 +205,6 @@ public:
// Therefore, a shared sink buffer is used here to limit the number of
concurrent RPCs.
// (Note: This does not reduce the total number of RPCs.)
// In a merge sort scenario, there are only n RPCs, so a shared sink
buffer is not needed.
- /// TODO: Modify this to let FE handle the judgment instead of BE.
std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(InstanceLoId
sender_ins_id);
vectorized::VExprContextSPtrs& tablet_sink_expr_ctxs() { return
_tablet_sink_expr_ctxs; }
@@ -260,6 +259,9 @@ private:
size_t _data_processed = 0;
int _writer_count = 1;
const bool _enable_local_merge_sort;
+ // If dest_is_merge is true, it indicates that the corresponding receiver
is a VMERGING-EXCHANGE.
+ // The receiver will sort the collected data, so the sender must ensure
that the data sent is ordered.
+ const bool _dest_is_merge;
const std::vector<TUniqueId>& _fragment_instance_ids;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]