This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new b185dfcbf67 [pick](branch-2.1) pick #41676 #41740 #41857 (#41904)
b185dfcbf67 is described below
commit b185dfcbf6782cd4d2b45c795161ad0a92c4841e
Author: Gabriel <[email protected]>
AuthorDate: Tue Oct 15 22:41:17 2024 +0800
[pick](branch-2.1) pick #41676 #41740 #41857 (#41904)
pick #41676 #41740 #41857
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 45 +++++++++++++++-------
be/src/pipeline/exec/exchange_sink_operator.h | 1 +
be/src/pipeline/exec/result_file_sink_operator.cpp | 4 +-
.../local_exchange_sink_operator.cpp | 14 +++++--
.../local_exchange/local_exchange_sink_operator.h | 1 +
be/src/vec/sink/vdata_stream_sender.cpp | 14 +++----
be/src/vec/sink/vdata_stream_sender.h | 2 +-
7 files changed, 55 insertions(+), 26 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 7584c0b0e45..ada5d5455b0 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -170,19 +170,20 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
_part_type = p._part_type;
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ if (_part_type == TPartitionType::UNPARTITIONED || _part_type ==
TPartitionType::RANDOM ||
+ _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
+ std::random_device rd;
+ std::mt19937 g(rd());
+ shuffle(channels.begin(), channels.end(), g);
+ }
int local_size = 0;
for (int i = 0; i < channels.size(); ++i) {
RETURN_IF_ERROR(channels[i]->open(state));
if (channels[i]->is_local()) {
local_size++;
+ _last_local_channel_idx = i;
}
}
- if (_part_type == TPartitionType::UNPARTITIONED || _part_type ==
TPartitionType::RANDOM ||
- _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
- std::random_device rd;
- std::mt19937 g(rd());
- shuffle(channels.begin(), channels.end(), g);
- }
only_local_exchange = local_size == channels.size();
PUniqueId id;
@@ -446,11 +447,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
if (local_state.only_local_exchange) {
if (!block->empty()) {
Status status;
+ size_t idx = 0;
for (auto* channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
- status = channel->send_local_block(block);
+ // If this channel is the last, we can move this block
to downstream pipeline.
+ // Otherwise, this block also need to be broadcasted
to other channels so should be copied.
+ DCHECK_GE(local_state._last_local_channel_idx, 0);
+ status = channel->send_local_block(
+ block, idx ==
local_state._last_local_channel_idx);
HANDLE_CHANNEL_STATUS(state, channel, status);
}
+ idx++;
}
}
} else {
@@ -471,21 +478,33 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
} else {
block_holder->get_block()->Clear();
}
+ size_t idx = 0;
+ bool moved = false;
for (auto* channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
Status status;
if (channel->is_local()) {
- status = channel->send_local_block(&cur_block);
+ // If this channel is the last, we can move
this block to downstream pipeline.
+ // Otherwise, this block also need to be
broadcasted to other channels so should be copied.
+ DCHECK_GE(local_state._last_local_channel_idx,
0);
+ status = channel->send_local_block(
+ &cur_block, idx ==
local_state._last_local_channel_idx);
+ moved = idx ==
local_state._last_local_channel_idx;
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status =
channel->send_broadcast_block(block_holder, eos);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
}
+ idx++;
+ }
+ if (moved) {
+ local_state._serializer.reset_block();
+ } else {
+ cur_block.clear_column_data();
+
local_state._serializer.get_block()->set_mutable_columns(
+ cur_block.mutate_columns());
}
- cur_block.clear_column_data();
- local_state._serializer.get_block()->set_mutable_columns(
- cur_block.mutate_columns());
}
}
}
@@ -496,7 +515,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
- auto status = current_channel->send_local_block(block);
+ auto status = current_channel->send_local_block(block, true);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@@ -582,7 +601,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
- auto status = current_channel->send_local_block(block);
+ auto status = current_channel->send_local_block(block, true);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index a94392b906d..aeb6a1503b7 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -234,6 +234,7 @@ private:
// for external table sink hash partition
std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
std::atomic<bool> _reach_limit = false;
+ int _last_local_channel_idx = -1;
};
class ExchangeSinkOperatorX final : public
DataSinkOperatorX<ExchangeSinkLocalState> {
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 4ed414a0774..6fe0b7f9e25 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -210,7 +210,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state,
Status exec_status)
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
- status =
channel->send_local_block(_output_block.get());
+ status =
channel->send_local_block(_output_block.get(), false);
HANDLE_CHANNEL_STATUS(state, channel, status);
}
}
@@ -234,7 +234,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state,
Status exec_status)
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
- status =
channel->send_local_block(&cur_block);
+ status =
channel->send_local_block(&cur_block, false);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status =
channel->send_broadcast_block(_block_holder, true);
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
index 78c2761dcc7..c6f675f3c1b 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
@@ -27,6 +27,11 @@ Status LocalExchangeSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
SCOPED_TIMER(_init_timer);
_compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
_distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
+ if (_parent->cast<LocalExchangeSinkOperatorX>()._type ==
ExchangeType::HASH_SHUFFLE) {
+ _profile->add_info_string(
+ "UseGlobalShuffle",
+
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle));
+ }
_channel_id = info.task_idx;
return Status::OK();
}
@@ -61,10 +66,12 @@ Status LocalExchangeSinkLocalState::close(RuntimeState*
state, Status exec_statu
std::string LocalExchangeSinkLocalState::debug_string(int indentation_level)
const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
- "{}, _channel_id: {}, _num_partitions: {}, _num_senders:
{}, _num_sources: {}, "
+ "{}, _use_global_shuffle: {}, _channel_id: {},
_num_partitions: {}, "
+ "_num_senders: {}, _num_sources: {}, "
"_running_sink_operators: {}, _running_source_operators:
{}, _release_count: {}",
- Base::debug_string(indentation_level), _channel_id,
_exchanger->_num_partitions,
- _exchanger->_num_senders, _exchanger->_num_sources,
+ Base::debug_string(indentation_level),
+
_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle, _channel_id,
+ _exchanger->_num_partitions, _exchanger->_num_senders,
_exchanger->_num_sources,
_exchanger->_running_sink_operators,
_exchanger->_running_source_operators,
_release_count);
return fmt::to_string(debug_string_buffer);
@@ -76,6 +83,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type,
const int num_buckets
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) +
")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE) {
+ _use_global_shuffle = should_disable_bucket_shuffle;
// For shuffle join, if data distribution has been broken by previous
operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To
be mentioned,
// we should use map shuffle idx to instance idx because all instances
will be
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index f1d60fc03c4..9b72402abce 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -125,6 +125,7 @@ private:
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
const std::map<int, int> _bucket_seq_to_instance_idx;
std::vector<std::pair<int, int>> _shuffle_idx_to_instance_idx;
+ bool _use_global_shuffle = false;
};
} // namespace doris::pipeline
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 44124ea7954..394005f6adf 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -231,7 +231,7 @@ Status Channel<Parent>::send_local_block(Status
exec_status, bool eos) {
}
template <typename Parent>
-Status Channel<Parent>::send_local_block(Block* block) {
+Status Channel<Parent>::send_local_block(Block* block, bool can_be_moved) {
SCOPED_TIMER(_parent->local_send_timer());
if (_recvr_is_valid()) {
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState,
Parent>) {
@@ -239,7 +239,7 @@ Status Channel<Parent>::send_local_block(Block* block) {
COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
}
- _local_recvr->add_block(block, _parent->sender_id(), false);
+ _local_recvr->add_block(block, _parent->sender_id(), can_be_moved);
return Status::OK();
} else {
return _receiver_status;
@@ -646,7 +646,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block, bool eos) {
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
- status = channel->send_local_block(block);
+ status = channel->send_local_block(block, false);
HANDLE_CHANNEL_STATUS(state, channel, status);
}
}
@@ -671,7 +671,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block, bool eos) {
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
- status = channel->send_local_block(&cur_block);
+ status = channel->send_local_block(&cur_block,
false);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status =
channel->send_broadcast_block(block_holder, eos);
@@ -698,7 +698,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block, bool eos) {
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
- status = channel->send_local_block(&cur_block);
+ status = channel->send_local_block(&cur_block,
false);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_remote_block(_cur_pb_block,
false);
@@ -717,7 +717,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block, bool eos) {
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
- auto status = current_channel->send_local_block(block);
+ auto status = current_channel->send_local_block(block, false);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@@ -829,7 +829,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status
exec_status) {
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
- status = channel->send_local_block(&block);
+ status = channel->send_local_block(&block, false);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_remote_block(_cur_pb_block,
false);
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index b9462434f07..92344b994e0 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -307,7 +307,7 @@ public:
Status send_local_block(Status exec_status, bool eos = false);
- Status send_local_block(Block* block);
+ Status send_local_block(Block* block, bool can_be_moved);
// Flush buffered rows and close channel. This function don't wait the
response
// of close operation, client should call close_wait() to finish channel's
close.
// We split one close operation into two phases in order to make multiple
channels
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]