This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6c338e60cd0 [cherry-pick](branch-30) fix exchange of tablet shuffle
send block error (#44102) (#44166)
6c338e60cd0 is described below
commit 6c338e60cd0e5fea31cac2e9efae71840a1942e2
Author: zhangstar333 <[email protected]>
AuthorDate: Mon Nov 18 22:12:37 2024 +0800
[cherry-pick](branch-30) fix exchange of tablet shuffle send block error
(#44102) (#44166)
cherry-pick from master (#44102)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 51 ++++++++++---------------
be/src/pipeline/exec/exchange_sink_operator.h | 3 +-
be/src/vec/sink/vrow_distribution.h | 1 +
3 files changed, 24 insertions(+), 31 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 2d79b0d8b2b..85c87df8f4d 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -58,6 +58,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_local_send_timer = ADD_TIMER(_profile, "LocalSendTime");
_split_block_hash_compute_timer = ADD_TIMER(_profile,
"SplitBlockHashComputeTime");
_distribute_rows_into_channels_timer = ADD_TIMER(_profile,
"DistributeRowsIntoChannelsTime");
+ _send_new_partition_timer = ADD_TIMER(_profile, "SendNewPartitionTime");
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced",
TUnit::UNIT, 1);
_overall_throughput = _profile->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
@@ -276,23 +277,14 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}
-Status ExchangeSinkLocalState::_send_new_partition_batch() {
- if (_row_distribution.need_deal_batching()) { // maybe try_close more than
1 time
- RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
- vectorized::Block tmp_block =
- _row_distribution._batching_block->to_block(); // Borrow out,
for lval ref
- auto& p = _parent->cast<ExchangeSinkOperatorX>();
- // these order is unique.
- // 1. clear batching stats(and flag goes true) so that we won't make
a new batching process in dealing batched block.
- // 2. deal batched block
- // 3. now reuse the column of lval block. cuz write doesn't real
adjust it. it generate a new block from that.
- _row_distribution.clear_batching_stats();
- RETURN_IF_ERROR(p.sink(_state, &tmp_block, false));
- // Recovery back
-
_row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns());
- _row_distribution._batching_block->clear_column_data();
- _row_distribution._deal_batched = false;
- }
+Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block*
input_block) {
+ RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
+ auto& p = _parent->cast<ExchangeSinkOperatorX>();
+ // Recovery back
+ _row_distribution.clear_batching_stats();
+ _row_distribution._batching_block->clear_column_data();
+ _row_distribution._deal_batched = false;
+ RETURN_IF_ERROR(p.sink(_state, input_block, false));
return Status::OK();
}
@@ -512,7 +504,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
old_channel_mem_usage += channel->mem_usage();
}
// check out of limit
- RETURN_IF_ERROR(local_state._send_new_partition_batch());
std::shared_ptr<vectorized::Block> convert_block =
std::make_shared<vectorized::Block>();
const auto& num_channels = local_state._partition_count;
std::vector<std::vector<uint32>> channel2rows;
@@ -527,21 +518,21 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
*block, convert_block, filtered_rows, has_filtered_rows,
local_state._row_part_tablet_ids,
local_state._number_input_rows));
-
- const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids;
- const auto& tablet_ids =
local_state._row_part_tablet_ids[0].tablet_ids;
- for (int idx = 0; idx < row_ids.size(); ++idx) {
- const auto& row = row_ids[idx];
- const auto& tablet_id_hash =
- HashUtil::zlib_crc_hash(&tablet_ids[idx],
sizeof(int64), 0);
- channel2rows[tablet_id_hash % num_channels].emplace_back(row);
+ if (local_state._row_distribution.batching_rows() > 0) {
+ SCOPED_TIMER(local_state._send_new_partition_timer);
+ RETURN_IF_ERROR(local_state._send_new_partition_batch(block));
+ } else {
+ const auto& row_ids =
local_state._row_part_tablet_ids[0].row_ids;
+ const auto& tablet_ids =
local_state._row_part_tablet_ids[0].tablet_ids;
+ for (int idx = 0; idx < row_ids.size(); ++idx) {
+ const auto& row = row_ids[idx];
+ const auto& tablet_id_hash =
+ HashUtil::zlib_crc_hash(&tablet_ids[idx],
sizeof(int64), 0);
+ channel2rows[tablet_id_hash %
num_channels].emplace_back(row);
+ }
}
}
- if (eos) {
- local_state._row_distribution._deal_batched = true;
- RETURN_IF_ERROR(local_state._send_new_partition_batch());
- }
{
SCOPED_TIMER(local_state._distribute_rows_into_channels_timer);
// the convert_block maybe different with block after execute exprs
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index f0cabb1ffde..bee34ad1a85 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -96,7 +96,7 @@ public:
static Status empty_callback_function(void* sender,
TCreatePartitionResult* result) {
return Status::OK();
}
- Status _send_new_partition_batch();
+ Status _send_new_partition_batch(vectorized::Block* input_block);
std::vector<std::shared_ptr<vectorized::Channel>> channels;
int current_channel_idx {0}; // index of current channel to send to if
_random == true
bool only_local_exchange {false};
@@ -127,6 +127,7 @@ private:
// Used to counter send bytes under local data exchange
RuntimeProfile::Counter* _local_bytes_send_counter = nullptr;
RuntimeProfile::Counter* _merge_block_timer = nullptr;
+ RuntimeProfile::Counter* _send_new_partition_timer = nullptr;
RuntimeProfile::Counter* _wait_queue_timer = nullptr;
RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr;
diff --git a/be/src/vec/sink/vrow_distribution.h
b/be/src/vec/sink/vrow_distribution.h
index 248982c0202..9e4cce6b528 100644
--- a/be/src/vec/sink/vrow_distribution.h
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -131,6 +131,7 @@ public:
std::vector<RowPartTabletIds>&
row_part_tablet_ids,
int64_t& rows_stat_val);
bool need_deal_batching() const { return _deal_batched && _batching_rows >
0; }
+ size_t batching_rows() const { return _batching_rows; }
// create partitions when need for auto-partition table using
#_partitions_need_create.
Status automatic_create_partition();
void clear_batching_stats();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]