This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 610054c77b9 [cherry-pick](branch-21) fix exchange of tablet shuffle
send block error (#44102) (#44230)
610054c77b9 is described below
commit 610054c77b9d4a74fcb522d4451ab52139cce6e0
Author: zhangstar333 <[email protected]>
AuthorDate: Tue Nov 19 17:31:06 2024 +0800
[cherry-pick](branch-21) fix exchange of tablet shuffle send block error
(#44102) (#44230)
cherry-pick from master (#44102)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 51 ++++++++++---------------
be/src/pipeline/exec/exchange_sink_operator.h | 3 +-
be/src/vec/sink/vrow_distribution.h | 1 +
3 files changed, 24 insertions(+), 31 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index b26c69ad560..ff8bcdd9236 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -119,6 +119,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_split_block_hash_compute_timer = ADD_TIMER(_profile,
"SplitBlockHashComputeTime");
_split_block_distribute_by_channel_timer =
ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime");
+ _send_new_partition_timer = ADD_TIMER(_profile, "SendNewPartitionTime");
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced",
TUnit::UNIT, 1);
_rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced",
TUnit::UNIT, 1);
_overall_throughput = _profile->add_derived_counter(
@@ -318,23 +319,14 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}
-Status ExchangeSinkLocalState::_send_new_partition_batch() {
- if (_row_distribution.need_deal_batching()) { // maybe try_close more than
1 time
- RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
- vectorized::Block tmp_block =
- _row_distribution._batching_block->to_block(); // Borrow out,
for lval ref
- auto& p = _parent->cast<ExchangeSinkOperatorX>();
- // these order is unique.
- // 1. clear batching stats(and flag goes true) so that we won't make
a new batching process in dealing batched block.
- // 2. deal batched block
- // 3. now reuse the column of lval block. cuz write doesn't real
adjust it. it generate a new block from that.
- _row_distribution.clear_batching_stats();
- RETURN_IF_ERROR(p.sink(_state, &tmp_block, false));
- // Recovery back
-
_row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns());
- _row_distribution._batching_block->clear_column_data();
- _row_distribution._deal_batched = false;
- }
+Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block*
input_block) {
+ RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
+ auto& p = _parent->cast<ExchangeSinkOperatorX>();
+ // Recovery back
+ _row_distribution.clear_batching_stats();
+ _row_distribution._batching_block->clear_column_data();
+ _row_distribution._deal_batched = false;
+ RETURN_IF_ERROR(p.sink(_state, input_block, false));
return Status::OK();
}
@@ -551,7 +543,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
}
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
// check out of limit
- RETURN_IF_ERROR(local_state._send_new_partition_batch());
std::shared_ptr<vectorized::Block> convert_block =
std::make_shared<vectorized::Block>();
const auto& num_channels = local_state._partition_count;
std::vector<std::vector<uint32>> channel2rows;
@@ -566,21 +557,21 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
*block, convert_block, filtered_rows, has_filtered_rows,
local_state._row_part_tablet_ids,
local_state._number_input_rows));
-
- const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids;
- const auto& tablet_ids =
local_state._row_part_tablet_ids[0].tablet_ids;
- for (int idx = 0; idx < row_ids.size(); ++idx) {
- const auto& row = row_ids[idx];
- const auto& tablet_id_hash =
- HashUtil::zlib_crc_hash(&tablet_ids[idx],
sizeof(int64), 0);
- channel2rows[tablet_id_hash % num_channels].emplace_back(row);
+ if (local_state._row_distribution.batching_rows() > 0) {
+ SCOPED_TIMER(local_state._send_new_partition_timer);
+ RETURN_IF_ERROR(local_state._send_new_partition_batch(block));
+ } else {
+ const auto& row_ids =
local_state._row_part_tablet_ids[0].row_ids;
+ const auto& tablet_ids =
local_state._row_part_tablet_ids[0].tablet_ids;
+ for (int idx = 0; idx < row_ids.size(); ++idx) {
+ const auto& row = row_ids[idx];
+ const auto& tablet_id_hash =
+ HashUtil::zlib_crc_hash(&tablet_ids[idx],
sizeof(int64), 0);
+ channel2rows[tablet_id_hash %
num_channels].emplace_back(row);
+ }
}
}
- if (eos) {
- local_state._row_distribution._deal_batched = true;
- RETURN_IF_ERROR(local_state._send_new_partition_batch());
- }
// the convert_block maybe different with block after execute exprs
// when send data we still use block
RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels,
num_channels,
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 8d9382dadd0..c055b131d8a 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -144,7 +144,7 @@ public:
static Status empty_callback_function(void* sender,
TCreatePartitionResult* result) {
return Status::OK();
}
- Status _send_new_partition_batch();
+ Status _send_new_partition_batch(vectorized::Block* input_block);
std::vector<vectorized::PipChannel<ExchangeSinkLocalState>*> channels;
std::vector<std::shared_ptr<vectorized::PipChannel<ExchangeSinkLocalState>>>
channel_shared_ptrs;
@@ -179,6 +179,7 @@ private:
// Used to counter send bytes under local data exchange
RuntimeProfile::Counter* _local_bytes_send_counter = nullptr;
RuntimeProfile::Counter* _merge_block_timer = nullptr;
+ RuntimeProfile::Counter* _send_new_partition_timer = nullptr;
RuntimeProfile::Counter* _wait_queue_timer = nullptr;
RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr;
diff --git a/be/src/vec/sink/vrow_distribution.h
b/be/src/vec/sink/vrow_distribution.h
index 248982c0202..9e4cce6b528 100644
--- a/be/src/vec/sink/vrow_distribution.h
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -131,6 +131,7 @@ public:
std::vector<RowPartTabletIds>&
row_part_tablet_ids,
int64_t& rows_stat_val);
bool need_deal_batching() const { return _deal_batched && _batching_rows >
0; }
+ size_t batching_rows() const { return _batching_rows; }
// create partitions when need for auto-partition table using
#_partitions_need_create.
Status automatic_create_partition();
void clear_batching_stats();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]