Gabriel39 commented on code in PR #44850:
URL: https://github.com/apache/doris/pull/44850#discussion_r1868661486
##########
be/src/pipeline/exec/exchange_sink_operator.cpp:
##########
@@ -724,4 +744,33 @@ DataDistribution
ExchangeSinkOperatorX::required_data_distribution() const {
return
DataSinkOperatorX<ExchangeSinkLocalState>::required_data_distribution();
}
+std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::_create_buffer() {
+ PUniqueId id;
+ id.set_hi(_state->query_id().hi);
+ id.set_lo(_state->query_id().lo);
+ auto sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id,
state());
+ for (const auto& _dest : _dests) {
+ const auto& dest_fragment_instance_id = _dest.fragment_instance_id;
+ // There is no need to check for duplicate dest_fragment_instance_id
here.
+ // The construct_request function already handles this check
internally.
+ sink_buffer->construct_request(dest_fragment_instance_id);
+ }
+ return sink_buffer;
+}
+
+std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer() {
+ if (_child) {
Review Comment:
Is `_child` always true?
##########
be/src/pipeline/exec/exchange_sink_operator.cpp:
##########
@@ -724,4 +744,33 @@ DataDistribution
ExchangeSinkOperatorX::required_data_distribution() const {
return
DataSinkOperatorX<ExchangeSinkLocalState>::required_data_distribution();
}
+std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::_create_buffer() {
+ PUniqueId id;
+ id.set_hi(_state->query_id().hi);
+ id.set_lo(_state->query_id().lo);
+ auto sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id,
state());
+ for (const auto& _dest : _dests) {
+ const auto& dest_fragment_instance_id = _dest.fragment_instance_id;
+ // There is no need to check for duplicate dest_fragment_instance_id
here.
+ // The construct_request function already handles this check
internally.
+ sink_buffer->construct_request(dest_fragment_instance_id);
+ }
+ return sink_buffer;
+}
+
+std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer() {
+ if (_child) {
+ if (std::dynamic_pointer_cast<SortSourceOperatorX>(_child)) {
+ return _create_buffer();
+ }
+ if (std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child)) {
Review Comment:
Make abstraction
##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -1133,6 +1135,9 @@ public enum IgnoreSplitType {
@VariableMgr.VarAttr(name = ENABLE_LOCAL_MERGE_SORT)
private boolean enableLocalMergeSort = true;
+ @VariableMgr.VarAttr(name = ENABLE_SHARED_EXCHANGE_SINK_BUFFER)
+ private boolean enableSharedExchangeSinkBuffer = true;
Review Comment:
Add it to fuzzy variables.
##########
be/src/vec/sink/vdata_stream_sender.h:
##########
@@ -166,10 +166,9 @@ class Channel {
return Status::OK();
}
- void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
- _buffer = buffer;
- _buffer->register_sink(_fragment_instance_id);
- }
+ void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
_buffer = buffer; }
+
+ InstanceLoId ins_id() const { return _fragment_instance_id.lo; }
Review Comment:
```suggestion
InstanceLoId ins_lo_id() const { return _fragment_instance_id.lo; }
```
##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -158,12 +156,15 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&&
request) {
if (request.block) {
RETURN_IF_ERROR(
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
- COUNTER_UPDATE(_parent->memory_used_counter(),
request.block->ByteSizeLong());
+ auto* parent = request.channel->_parent;
+ COUNTER_UPDATE(parent->memory_used_counter(),
request.block->ByteSizeLong());
Review Comment:
```suggestion
COUNTER_UPDATE(request.channel->_parent->memory_used_counter(),
request.block->ByteSizeLong());
```
##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -209,23 +207,27 @@ Status
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
- DCHECK(_rpc_channel_is_idle[id] == false);
-
std::queue<TransmitInfo, std::list<TransmitInfo>>& q =
_instance_to_package_queue[id];
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>&
broadcast_q =
_instance_to_broadcast_package_queue[id];
- if (_is_finishing) {
+ if (_is_failed) {
_turn_off_channel(id, lock);
return Status::OK();
}
+ if (_instance_to_receiver_eof[id]) {
+ DCHECK(_rpc_channel_is_turn_off[id]);
+ return Status::OK();
+ }
if (!q.empty()) {
// If we have data to shuffle which is not broadcasted
auto& request = q.front();
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
+ brpc_request->set_sender_id(request.channel->_parent->sender_id());
+ brpc_request->set_be_number(request.channel->_parent->be_number());
Review Comment:
Add some comments to explain what is `be_number` is used for
##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -411,48 +413,25 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
__builtin_unreachable();
} else {
std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[id]);
- _turn_off_channel(id, lock);
+ _running_sink_count[id]--;
+ if (_running_sink_count[id] == 0) {
+ _turn_off_channel(id, lock);
+ }
}
}
void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
- _is_finishing = true;
+ _is_failed = true;
_context->cancel(Status::Cancelled(err));
}
void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
+ // When the receiving side reaches eof, it means the receiver has finished
early.
+ // The remaining data in the current rpc_channel does not need to be sent,
+ // and the rpc_channel should be turned off immediately.
_turn_off_channel(id, lock);
- std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>&
broadcast_q =
Review Comment:
Keep this code block
##########
be/src/vec/sink/vdata_stream_sender.h:
##########
@@ -166,10 +166,9 @@ class Channel {
return Status::OK();
}
- void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
- _buffer = buffer;
- _buffer->register_sink(_fragment_instance_id);
- }
+ void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
_buffer = buffer; }
Review Comment:
change `register_exchange_buffer` to `set_exchange_buffer`
##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -129,24 +125,26 @@ void ExchangeSinkBuffer::register_sink(TUniqueId
fragment_instance_id) {
finst_id.set_hi(fragment_instance_id.hi);
finst_id.set_lo(fragment_instance_id.lo);
_rpc_channel_is_idle[low_id] = true;
+ _rpc_channel_is_turn_off[low_id] = false;
_instance_to_receiver_eof[low_id] = false;
_instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id));
_instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get();
- _construct_request(low_id, finst_id);
+ _instance_to_request[low_id] = std::make_shared<PTransmitDataParams>();
+ _instance_to_request[low_id]->mutable_finst_id()->CopyFrom(finst_id);
+ _instance_to_request[low_id]->mutable_query_id()->CopyFrom(_query_id);
+
+ _instance_to_request[low_id]->set_node_id(_dest_node_id);
}
Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
- if (_is_finishing) {
+ if (_is_failed) {
return Status::OK();
}
- auto ins_id = request.channel->_fragment_instance_id.lo;
+ auto ins_id = request.channel->ins_id();
if (!_instance_to_package_queue_mutex.contains(ins_id)) {
return Status::InternalError("fragment_instance_id {} not do
register_sink",
print_id(request.channel->_fragment_instance_id));
}
- if (_is_receiver_eof(ins_id)) {
Review Comment:
Should not delete this judgement.
##########
be/src/pipeline/exec/exchange_sink_buffer.h:
##########
@@ -184,13 +223,10 @@ class ExchangeSinkBuffer final : public
HasTaskExecutionCtx {
void update_profile(RuntimeProfile* profile);
void set_dependency(std::shared_ptr<Dependency> queue_dependency,
- std::shared_ptr<Dependency> finish_dependency) {
- _queue_dependency = queue_dependency;
- _finish_dependency = finish_dependency;
- }
-
- void set_broadcast_dependency(std::shared_ptr<Dependency>
broadcast_dependency) {
- _broadcast_dependency = broadcast_dependency;
+ ExchangeSinkLocalState* local_state) {
+ std::lock_guard lc(_init_lock);
Review Comment:
Better to reduce locking scope
##########
be/src/pipeline/exec/exchange_sink_buffer.h:
##########
@@ -169,13 +169,52 @@ class ExchangeSendCallback : public
::doris::DummyBrpcCallback<Response> {
bool _eos;
};
-// Each ExchangeSinkOperator have one ExchangeSinkBuffer
+// ExchangeSinkBuffer can either be shared among multiple
ExchangeSinkLocalState instances
+// or be individually owned by each ExchangeSinkLocalState.
+// The following describes the scenario where ExchangeSinkBuffer is shared
among multiple ExchangeSinkLocalState instances.
+// Of course, individual ownership can be seen as a special case where only
one ExchangeSinkLocalState shares the buffer.
+
+// A sink buffer contains multiple rpc_channels.
+// Each rpc_channel corresponds to a target instance on the receiving side.
+// Data is sent using a ping-pong mode within each rpc_channel,
+// meaning that at most one RPC can exist in a single rpc_channel at a time.
+// The next RPC can only be sent after the previous one has completed.
+//
+// Each exchange sink sends data to all target instances on the receiving side.
+// If the concurrency is 3, a single rpc_channel will be used simultaneously
by three exchange sinks.
+
+/*
+ +-----------+ +-----------+
+-----------+
+ |dest ins id| |dest ins id| |dest
ins id|
+ | | | | |
|
+ +----+------+ +-----+-----+
+------+----+
+ | | |
+ | | |
+ +----------------+ +----------------+
+----------------+
+ | | | | |
|
+ sink buffer -------- | rpc_channel | | rpc_channel | |
rpc_channel |
+ | | | | |
|
+ +-------+--------+ +----------------+
+----------------+
+ | |
|
+
|------------------------+----------------------+
+ | |
|
+ | |
|
+ +-----------------+ +-------+---------+
+-------+---------+
+ | | | | |
|
+ | exchange sink | | exchange sink | |
exchange sink |
+ | | | | |
|
+ +-----------------+ +-----------------+
+-----------------+
+*/
+
class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
public:
- ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int
send_id, int be_number,
- RuntimeState* state, ExchangeSinkLocalState* parent);
+ ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id,
RuntimeState* state);
~ExchangeSinkBuffer() override = default;
- void register_sink(TUniqueId);
+ void register_sink(InstanceLoId id) {
+ std::lock_guard lc(_init_lock);
Review Comment:
Delete this lock.
##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -174,17 +175,14 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&&
request) {
}
Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
- if (_is_finishing) {
+ if (_is_failed) {
return Status::OK();
}
- auto ins_id = request.channel->_fragment_instance_id.lo;
+ auto ins_id = request.channel->ins_id();
if (!_instance_to_package_queue_mutex.contains(ins_id)) {
return Status::InternalError("fragment_instance_id {} not do
register_sink",
print_id(request.channel->_fragment_instance_id));
}
- if (_is_receiver_eof(ins_id)) {
Review Comment:
Should not delete this judgement.
##########
be/src/pipeline/exec/exchange_sink_buffer.h:
##########
@@ -214,6 +250,10 @@ class ExchangeSinkBuffer final : public
HasTaskExecutionCtx {
// One channel is corresponding to a downstream instance.
phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;
+ // There could be multiple situations that cause an rpc_channel to be
turned off,
+ // such as receiving the eof, manual cancellation by the user, or all
sinks reaching eos.
+ // Therefore, it is necessary to prevent an rpc_channel from being turned
off multiple times.
+ phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_turn_off;
Review Comment:
Could we unify these 2 maps?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]