This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d1e8bebb3c6 [refactor](shuffle) (PART I) Unify all hash-partition
based shuffling (#45256)
d1e8bebb3c6 is described below
commit d1e8bebb3c610bc729b0a088fd693f013aeb5336
Author: Gabriel <[email protected]>
AuthorDate: Wed Dec 11 14:20:41 2024 +0800
[refactor](shuffle) (PART I) Unify all hash-partition based shuffling
(#45256)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 308 +++++----------------
be/src/pipeline/exec/exchange_sink_operator.h | 58 +---
be/src/pipeline/shuffle/writer.cpp | 114 ++++++++
be/src/pipeline/shuffle/writer.h | 53 ++++
be/src/vec/runtime/partitioner.h | 16 +-
.../sink/scale_writer_partitioning_exchanger.hpp | 92 ++++--
be/src/vec/sink/tablet_sink_hash_partitioner.cpp | 152 ++++++++++
be/src/vec/sink/tablet_sink_hash_partitioner.h | 83 ++++++
be/src/vec/sink/vdata_stream_sender.cpp | 12 +-
be/src/vec/sink/vdata_stream_sender.h | 8 +-
10 files changed, 562 insertions(+), 334 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 04b9653e9c8..aa893fc0a26 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -38,6 +38,7 @@
#include "util/uid_util.h"
#include "vec/columns/column_const.h"
#include "vec/exprs/vexpr.h"
+#include "vec/sink/tablet_sink_hash_partitioner.h"
namespace doris::pipeline {
#include "common/compile_check_begin.h"
@@ -120,6 +121,57 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_sink_buffer->set_dependency(state->fragment_instance_id().lo,
_queue_dependency, this);
}
+ if (_part_type == TPartitionType::HASH_PARTITIONED) {
+ _partition_count = channels.size();
+ _partitioner =
+
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+ channels.size());
+ RETURN_IF_ERROR(_partitioner->init(p._texprs));
+ RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
+ _profile->add_info_string("Partitioner",
+ fmt::format("Crc32HashPartitioner({})",
_partition_count));
+ } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+ _partition_count = channels.size();
+ _partitioner =
+
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+ channels.size());
+ RETURN_IF_ERROR(_partitioner->init(p._texprs));
+ RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
+ _profile->add_info_string("Partitioner",
+ fmt::format("Crc32HashPartitioner({})",
_partition_count));
+ } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
+ _partition_count = channels.size();
+ _profile->add_info_string("Partitioner",
+ fmt::format("Crc32HashPartitioner({})",
_partition_count));
+ _partitioner = std::make_unique<vectorized::TabletSinkHashPartitioner>(
+ _partition_count, p._tablet_sink_txn_id, p._tablet_sink_schema,
+ p._tablet_sink_partition, p._tablet_sink_location,
p._tablet_sink_tuple_id, this);
+ RETURN_IF_ERROR(_partitioner->init({}));
+ RETURN_IF_ERROR(_partitioner->prepare(state, {}));
+ } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
+ _partition_count =
+ channels.size() *
config::table_sink_partition_write_max_partition_nums_per_writer;
+ _partitioner = std::make_unique<vectorized::ScaleWriterPartitioner>(
+ channels.size(), _partition_count, channels.size(), 1,
+
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
/
+ state->task_num() ==
+ 0
+ ?
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
+ :
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
/
+ state->task_num(),
+
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
+ state->task_num() ==
+ 0
+ ?
config::table_sink_partition_write_min_data_processed_rebalance_threshold
+ :
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
+ state->task_num());
+
+ RETURN_IF_ERROR(_partitioner->init(p._texprs));
+ RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
+ _profile->add_info_string("Partitioner",
+ fmt::format("Crc32HashPartitioner({})",
_partition_count));
+ }
+
return Status::OK();
}
@@ -145,6 +197,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(Base::open(state));
+ _writer.reset(new Writer());
auto& p = _parent->cast<ExchangeSinkOperatorX>();
if (_part_type == TPartitionType::UNPARTITIONED || _part_type ==
TPartitionType::RANDOM ||
@@ -190,113 +243,16 @@ Status ExchangeSinkLocalState::open(RuntimeState* state)
{
}
}
}
- if (_part_type == TPartitionType::HASH_PARTITIONED) {
- _partition_count = channels.size();
- _partitioner =
-
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
- channels.size());
- RETURN_IF_ERROR(_partitioner->init(p._texprs));
- RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
- _profile->add_info_string("Partitioner",
- fmt::format("Crc32HashPartitioner({})",
_partition_count));
- } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
- _partition_count = channels.size();
- _partitioner =
-
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
- channels.size());
- RETURN_IF_ERROR(_partitioner->init(p._texprs));
- RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
- _profile->add_info_string("Partitioner",
- fmt::format("Crc32HashPartitioner({})",
_partition_count));
- } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
- _partition_count = channels.size();
- _profile->add_info_string("Partitioner",
- fmt::format("Crc32HashPartitioner({})",
_partition_count));
- _txn_id = p._tablet_sink_txn_id;
- _schema = std::make_shared<OlapTableSchemaParam>();
- RETURN_IF_ERROR(_schema->init(p._tablet_sink_schema));
- _vpartition = std::make_unique<VOlapTablePartitionParam>(_schema,
p._tablet_sink_partition);
- RETURN_IF_ERROR(_vpartition->init());
- auto find_tablet_mode =
vectorized::OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
- _tablet_finder =
-
std::make_unique<vectorized::OlapTabletFinder>(_vpartition.get(),
find_tablet_mode);
- _tablet_sink_tuple_desc =
_state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id);
- _tablet_sink_row_desc = p._pool->add(new
RowDescriptor(_tablet_sink_tuple_desc, false));
- _tablet_sink_expr_ctxs.resize(p._tablet_sink_expr_ctxs.size());
- for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) {
- RETURN_IF_ERROR(p._tablet_sink_expr_ctxs[i]->clone(state,
_tablet_sink_expr_ctxs[i]));
- }
- // if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED,
we handle the processing of auto_increment column
- // on exchange node rather than on TabletWriter
- _block_convertor =
-
std::make_unique<vectorized::OlapTableBlockConvertor>(_tablet_sink_tuple_desc);
- _block_convertor->init_autoinc_info(_schema->db_id(),
_schema->table_id(),
- _state->batch_size());
- _location = p._pool->add(new
OlapTableLocationParam(p._tablet_sink_location));
- _row_distribution.init(
- {.state = _state,
- .block_convertor = _block_convertor.get(),
- .tablet_finder = _tablet_finder.get(),
- .vpartition = _vpartition.get(),
- .add_partition_request_timer = _add_partition_request_timer,
- .txn_id = _txn_id,
- .pool = p._pool.get(),
- .location = _location,
- .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs,
- .schema = _schema,
- .caller = (void*)this,
- .create_partition_callback =
&ExchangeSinkLocalState::empty_callback_function});
- } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
- _partition_count =
- channels.size() *
config::table_sink_partition_write_max_partition_nums_per_writer;
- _partitioner =
-
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
- _partition_count);
- _partition_function =
std::make_unique<HashPartitionFunction>(_partitioner.get());
-
- scale_writer_partitioning_exchanger = std::make_unique<
-
vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>(
- channels.size(), *_partition_function, _partition_count,
channels.size(), 1,
-
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
/
- state->task_num() ==
- 0
- ?
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
- :
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
/
- state->task_num(),
-
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
- state->task_num() ==
- 0
- ?
config::table_sink_partition_write_min_data_processed_rebalance_threshold
- :
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
- state->task_num());
-
- RETURN_IF_ERROR(_partitioner->init(p._texprs));
- RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
- _profile->add_info_string("Partitioner",
- fmt::format("Crc32HashPartitioner({})",
_partition_count));
- }
if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
- _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
+ _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED ||
+ _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
RETURN_IF_ERROR(_partitioner->open(state));
- } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
- RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc));
}
return Status::OK();
}
-Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block*
input_block) {
- RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
- auto& p = _parent->cast<ExchangeSinkOperatorX>();
- // Recovery back
- _row_distribution.clear_batching_stats();
- _row_distribution._batching_block->clear_column_data();
- _row_distribution._deal_batched = false;
- RETURN_IF_ERROR(p.sink(_state, input_block, false));
- return Status::OK();
-}
-
std::string ExchangeSinkLocalState::name_suffix() {
std::string name = " (id=" + std::to_string(_parent->node_id());
auto& p = _parent->cast<ExchangeSinkOperatorX>();
@@ -491,105 +447,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
local_state.current_channel_idx =
(local_state.current_channel_idx + 1) %
local_state.channels.size();
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
- _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
- auto rows = block->rows();
- {
- SCOPED_TIMER(local_state._split_block_hash_compute_timer);
- RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state,
block));
- }
- int64_t old_channel_mem_usage = 0;
- for (const auto& channel : local_state.channels) {
- old_channel_mem_usage += channel->mem_usage();
- }
- if (_part_type == TPartitionType::HASH_PARTITIONED) {
- SCOPED_TIMER(local_state._distribute_rows_into_channels_timer);
- RETURN_IF_ERROR(channel_add_rows(
- state, local_state.channels, local_state._partition_count,
-
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
- } else {
- SCOPED_TIMER(local_state._distribute_rows_into_channels_timer);
- RETURN_IF_ERROR(channel_add_rows(
- state, local_state.channels, local_state._partition_count,
-
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
- }
- int64_t new_channel_mem_usage = 0;
- for (const auto& channel : local_state.channels) {
- new_channel_mem_usage += channel->mem_usage();
- }
- COUNTER_UPDATE(local_state.memory_used_counter(),
- new_channel_mem_usage - old_channel_mem_usage);
- } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
- int64_t old_channel_mem_usage = 0;
- for (const auto& channel : local_state.channels) {
- old_channel_mem_usage += channel->mem_usage();
- }
- // check out of limit
- std::shared_ptr<vectorized::Block> convert_block =
std::make_shared<vectorized::Block>();
- const auto& num_channels = local_state._partition_count;
- std::vector<std::vector<uint32>> channel2rows;
- channel2rows.resize(num_channels);
- auto input_rows = block->rows();
-
- if (input_rows > 0) {
- bool has_filtered_rows = false;
- int64_t filtered_rows = 0;
- local_state._number_input_rows += input_rows;
-
-
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
- *block, convert_block, filtered_rows, has_filtered_rows,
- local_state._row_part_tablet_ids,
local_state._number_input_rows));
- if (local_state._row_distribution.batching_rows() > 0) {
- SCOPED_TIMER(local_state._send_new_partition_timer);
- RETURN_IF_ERROR(local_state._send_new_partition_batch(block));
- } else {
- const auto& row_ids =
local_state._row_part_tablet_ids[0].row_ids;
- const auto& tablet_ids =
local_state._row_part_tablet_ids[0].tablet_ids;
- for (int idx = 0; idx < row_ids.size(); ++idx) {
- const auto& row = row_ids[idx];
- const auto& tablet_id_hash =
- HashUtil::zlib_crc_hash(&tablet_ids[idx],
sizeof(int64), 0);
- channel2rows[tablet_id_hash %
num_channels].emplace_back(row);
- }
- }
- }
-
- {
- SCOPED_TIMER(local_state._distribute_rows_into_channels_timer);
- // the convert_block maybe different with block after execute exprs
- // when send data we still use block
- RETURN_IF_ERROR(channel_add_rows_with_idx(state,
local_state.channels, num_channels,
- channel2rows, block,
eos));
- }
- int64_t new_channel_mem_usage = 0;
- for (const auto& channel : local_state.channels) {
- new_channel_mem_usage += channel->mem_usage();
- }
- COUNTER_UPDATE(local_state.memory_used_counter(),
- new_channel_mem_usage - old_channel_mem_usage);
- } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
- int64_t old_channel_mem_usage = 0;
- for (const auto& channel : local_state.channels) {
- old_channel_mem_usage += channel->mem_usage();
- }
- {
- SCOPED_TIMER(local_state._split_block_hash_compute_timer);
- RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state,
block));
- }
- std::vector<std::vector<uint32>> assignments =
- local_state.scale_writer_partitioning_exchanger->accept(block);
- {
- SCOPED_TIMER(local_state._distribute_rows_into_channels_timer);
- RETURN_IF_ERROR(channel_add_rows_with_idx(state,
local_state.channels,
-
local_state.channels.size(), assignments,
- block, eos));
- }
-
- int64_t new_channel_mem_usage = 0;
- for (const auto& channel : local_state.channels) {
- new_channel_mem_usage += channel->mem_usage();
- }
- COUNTER_UPDATE(local_state.memory_used_counter(),
- new_channel_mem_usage - old_channel_mem_usage);
+ _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
+ _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED ||
+ _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
+ RETURN_IF_ERROR(local_state._writer->write(&local_state, state, block,
eos));
} else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
// Control the number of channels according to the flow, thereby
controlling the number of table sink writers.
// 1. select channel
@@ -641,44 +502,6 @@ void
ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer* buf
}
}
-Status ExchangeSinkOperatorX::channel_add_rows(
- RuntimeState* state,
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
- size_t num_channels, const uint32_t* __restrict channel_ids, size_t
rows,
- vectorized::Block* block, bool eos) {
- std::vector<std::vector<uint32_t>> channel2rows;
- channel2rows.resize(num_channels);
- for (uint32_t i = 0; i < rows; i++) {
- channel2rows[channel_ids[i]].emplace_back(i);
- }
-
- RETURN_IF_ERROR(
- channel_add_rows_with_idx(state, channels, num_channels,
channel2rows, block, eos));
- return Status::OK();
-}
-
-Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
- RuntimeState* state,
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
- size_t num_channels, std::vector<std::vector<uint32_t>>& channel2rows,
- vectorized::Block* block, bool eos) {
- Status status = Status::OK();
- for (int i = 0; i < num_channels; ++i) {
- if (!channels[i]->is_receiver_eof() && !channel2rows[i].empty()) {
- status = channels[i]->add_rows(block, channel2rows[i], false);
- HANDLE_CHANNEL_STATUS(state, channels[i], status);
- channel2rows[i].clear();
- }
- }
- if (eos) {
- for (int i = 0; i < num_channels; ++i) {
- if (!channels[i]->is_receiver_eof()) {
- status = channels[i]->add_rows(block, channel2rows[i], true);
- HANDLE_CHANNEL_STATUS(state, channels[i], status);
- }
- }
- }
- return Status::OK();
-}
-
std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
@@ -697,17 +520,10 @@ Status ExchangeSinkLocalState::close(RuntimeState* state,
Status exec_status) {
if (_closed) {
return Status::OK();
}
- if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED &&
- _block_convertor != nullptr && _tablet_finder != nullptr) {
-
_state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
-
_tablet_finder->num_filtered_rows());
- _state->update_num_rows_load_unselected(
- _tablet_finder->num_immutable_partition_filtered_rows());
- // sink won't see those filtered rows, we should compensate here
- _state->set_num_rows_load_total(_state->num_rows_load_filtered() +
- _state->num_rows_load_unselected());
- }
SCOPED_TIMER(exec_time_counter());
+ if (_partitioner) {
+ RETURN_IF_ERROR(_partitioner->close(state));
+ }
SCOPED_TIMER(_close_timer);
if (_queue_dependency) {
COUNTER_UPDATE(_wait_queue_timer,
_queue_dependency->watcher_elapse_time());
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 8d094b43f61..e88389b1d7b 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -26,6 +26,7 @@
#include "common/status.h"
#include "exchange_sink_buffer.h"
#include "operator.h"
+#include "pipeline/shuffle/writer.h"
#include "vec/sink/scale_writer_partitioning_exchanger.hpp"
#include "vec/sink/vdata_stream_sender.h"
@@ -39,20 +40,6 @@ class ExchangeSinkLocalState final : public
PipelineXSinkLocalState<> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
using Base = PipelineXSinkLocalState<>;
-private:
- class HashPartitionFunction {
- public:
- HashPartitionFunction(vectorized::PartitionerBase* partitioner)
- : _partitioner(partitioner) {}
-
- int get_partition(vectorized::Block* block, int position) {
- return _partitioner->get_channel_ids().get<uint32_t>()[position];
- }
-
- private:
- vectorized::PartitionerBase* _partitioner;
- };
-
public:
ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state), _serializer(this) {
@@ -106,19 +93,20 @@ public:
std::string name_suffix() override;
segment_v2::CompressionTypePB compression_type() const;
std::string debug_string(int indentation_level) const override;
- static Status empty_callback_function(void* sender,
TCreatePartitionResult* result) {
- return Status::OK();
+ RuntimeProfile::Counter* send_new_partition_timer() { return
_send_new_partition_timer; }
+ RuntimeProfile::Counter* add_partition_request_timer() { return
_add_partition_request_timer; }
+ RuntimeProfile::Counter* split_block_hash_compute_timer() {
+ return _split_block_hash_compute_timer;
+ }
+ RuntimeProfile::Counter* distribute_rows_into_channels_timer() {
+ return _distribute_rows_into_channels_timer;
}
- Status _send_new_partition_batch(vectorized::Block* input_block);
std::vector<std::shared_ptr<vectorized::Channel>> channels;
int current_channel_idx {0}; // index of current channel to send to if
_random == true
bool only_local_exchange {false};
void on_channel_finished(InstanceLoId channel_id);
-
- // for external table sink hash partition
-
std::unique_ptr<vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>
- scale_writer_partitioning_exchanger;
+ vectorized::PartitionerBase* partitioner() const { return
_partitioner.get(); }
private:
friend class ExchangeSinkOperatorX;
@@ -176,28 +164,16 @@ private:
*/
std::vector<std::shared_ptr<Dependency>> _local_channels_dependency;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
+ std::unique_ptr<Writer> _writer;
size_t _partition_count;
std::shared_ptr<Dependency> _finish_dependency;
// for shuffle data by partition and tablet
- int64_t _txn_id = -1;
- vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
- std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr;
- std::unique_ptr<vectorized::OlapTabletFinder> _tablet_finder = nullptr;
- std::shared_ptr<OlapTableSchemaParam> _schema = nullptr;
- std::unique_ptr<vectorized::OlapTableBlockConvertor> _block_convertor =
nullptr;
- TupleDescriptor* _tablet_sink_tuple_desc = nullptr;
- RowDescriptor* _tablet_sink_row_desc = nullptr;
- OlapTableLocationParam* _location = nullptr;
- vectorized::VRowDistribution _row_distribution;
+
RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
- std::vector<vectorized::RowPartTabletIds> _row_part_tablet_ids;
- int64_t _number_input_rows = 0;
TPartitionType::type _part_type;
- // for external table sink hash partition
- std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
std::atomic<bool> _reach_limit = false;
int _last_local_channel_idx = -1;
@@ -230,6 +206,7 @@ public:
// In a merge sort scenario, there are only n RPCs, so a shared sink
buffer is not needed.
/// TODO: Modify this to let FE handle the judgment instead of BE.
std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(InstanceLoId
sender_ins_id);
+ vectorized::VExprContextSPtrs& tablet_sink_expr_ctxs() { return
_tablet_sink_expr_ctxs; }
private:
friend class ExchangeSinkLocalState;
@@ -237,17 +214,6 @@ private:
template <typename ChannelPtrType>
void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel,
Status st);
- Status channel_add_rows(RuntimeState* state,
- std::vector<std::shared_ptr<vectorized::Channel>>&
channels,
- size_t num_channels, const uint32_t* __restrict
channel_ids,
- size_t rows, vectorized::Block* block, bool eos);
-
- Status channel_add_rows_with_idx(RuntimeState* state,
-
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
- size_t num_channels,
- std::vector<std::vector<uint32_t>>&
channel2rows,
- vectorized::Block* block, bool eos);
-
// Use ExchangeSinkOperatorX to create a sink buffer.
// The sink buffer can be shared among multiple ExchangeSinkLocalState
instances,
// or each ExchangeSinkLocalState can have its own sink buffer.
diff --git a/be/src/pipeline/shuffle/writer.cpp
b/be/src/pipeline/shuffle/writer.cpp
new file mode 100644
index 00000000000..c27fd9a7aeb
--- /dev/null
+++ b/be/src/pipeline/shuffle/writer.cpp
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "writer.h"
+
+#include "pipeline/exec/exchange_sink_operator.h"
+#include "vec/core/block.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+template <typename ChannelPtrType>
+void Writer::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel,
Status st) const {
+ channel->set_receiver_eof(st);
+ // Chanel will not send RPC to the downstream when eof, so close chanel by
OK status.
+ static_cast<void>(channel->close(state));
+}
+
+Status Writer::write(ExchangeSinkLocalState* local_state, RuntimeState* state,
+ vectorized::Block* block, bool eos) const {
+ auto rows = block->rows();
+ {
+ SCOPED_TIMER(local_state->split_block_hash_compute_timer());
+ RETURN_IF_ERROR(local_state->partitioner()->do_partitioning(state,
block));
+ }
+ int64_t old_channel_mem_usage = 0;
+ for (const auto& channel : local_state->channels) {
+ old_channel_mem_usage += channel->mem_usage();
+ }
+ {
+ SCOPED_TIMER(local_state->distribute_rows_into_channels_timer());
+ const auto& channel_filed =
local_state->partitioner()->get_channel_ids();
+ if (channel_filed.len == sizeof(uint32_t)) {
+ RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels,
+ local_state->channels.size(),
+ channel_filed.get<uint32_t>(),
rows, block, eos));
+ } else {
+ RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels,
+ local_state->channels.size(),
+ channel_filed.get<int64_t>(),
rows, block, eos));
+ }
+ }
+ int64_t new_channel_mem_usage = 0;
+ for (const auto& channel : local_state->channels) {
+ new_channel_mem_usage += channel->mem_usage();
+ }
+ COUNTER_UPDATE(local_state->memory_used_counter(),
+ new_channel_mem_usage - old_channel_mem_usage);
+ return Status::OK();
+}
+
+template <typename ChannelIdType>
+Status Writer::_channel_add_rows(RuntimeState* state,
+
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
+ size_t partition_count,
+ const ChannelIdType* __restrict channel_ids,
size_t rows,
+ vectorized::Block* block, bool eos) const {
+ std::vector<uint32_t> partition_rows_histogram;
+ auto row_idx = vectorized::PODArray<uint32_t>(rows);
+ {
+ partition_rows_histogram.assign(partition_count + 2, 0);
+ for (size_t i = 0; i < rows; ++i) {
+ partition_rows_histogram[channel_ids[i] + 1]++;
+ }
+ for (size_t i = 1; i <= partition_count + 1; ++i) {
+ partition_rows_histogram[i] += partition_rows_histogram[i - 1];
+ }
+ for (int32_t i = cast_set<int32_t>(rows) - 1; i >= 0; --i) {
+ row_idx[partition_rows_histogram[channel_ids[i] + 1] - 1] = i;
+ partition_rows_histogram[channel_ids[i] + 1]--;
+ }
+ }
+#define HANDLE_CHANNEL_STATUS(state, channel, status) \
+ do { \
+ if (status.is<ErrorCode::END_OF_FILE>()) { \
+ _handle_eof_channel(state, channel, status); \
+ } else { \
+ RETURN_IF_ERROR(status); \
+ } \
+ } while (0)
+ Status status = Status::OK();
+ for (size_t i = 0; i < partition_count; ++i) {
+ uint32_t start = partition_rows_histogram[i + 1];
+ uint32_t size = partition_rows_histogram[i + 2] - start;
+ if (!channels[i]->is_receiver_eof() && size > 0) {
+ status = channels[i]->add_rows(block, row_idx.data(), start, size,
false);
+ HANDLE_CHANNEL_STATUS(state, channels[i], status);
+ }
+ }
+ if (eos) {
+ for (int i = 0; i < partition_count; ++i) {
+ if (!channels[i]->is_receiver_eof()) {
+ status = channels[i]->add_rows(block, row_idx.data(), 0, 0,
true);
+ HANDLE_CHANNEL_STATUS(state, channels[i], status);
+ }
+ }
+ }
+ return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/shuffle/writer.h b/be/src/pipeline/shuffle/writer.h
new file mode 100644
index 00000000000..0eb77212029
--- /dev/null
+++ b/be/src/pipeline/shuffle/writer.h
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/sink/vdata_stream_sender.h"
+
+namespace doris {
+class RuntimeState;
+class Status;
+namespace vectorized {
+class Block;
+class Channel;
+} // namespace vectorized
+namespace pipeline {
+
+#include "common/compile_check_begin.h"
+class ExchangeSinkLocalState;
+
+class Writer {
+public:
+ Writer() = default;
+
+ Status write(ExchangeSinkLocalState* local_state, RuntimeState* state,
vectorized::Block* block,
+ bool eos) const;
+
+private:
+ template <typename ChannelIdType>
+ Status _channel_add_rows(RuntimeState* state,
+
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
+ size_t partition_count, const ChannelIdType*
__restrict channel_ids,
+ size_t rows, vectorized::Block* block, bool eos)
const;
+
+ template <typename ChannelPtrType>
+ void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel,
Status st) const;
+};
+#include "common/compile_check_end.h"
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h
index d5492c3be87..53d8b84d09c 100644
--- a/be/src/vec/runtime/partitioner.h
+++ b/be/src/vec/runtime/partitioner.h
@@ -21,11 +21,8 @@
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
-namespace doris {
+namespace doris::vectorized {
#include "common/compile_check_begin.h"
-
-namespace vectorized {
-
struct ChannelField {
const void* channel_id;
const uint32_t len;
@@ -48,12 +45,16 @@ public:
virtual Status open(RuntimeState* state) = 0;
+ virtual Status close(RuntimeState* state) = 0;
+
virtual Status do_partitioning(RuntimeState* state, Block* block) const =
0;
virtual ChannelField get_channel_ids() const = 0;
virtual Status clone(RuntimeState* state,
std::unique_ptr<PartitionerBase>& partitioner) = 0;
+ size_t partition_count() const { return _partition_count; }
+
protected:
const size_t _partition_count;
};
@@ -74,6 +75,8 @@ public:
Status open(RuntimeState* state) override { return
VExpr::open(_partition_expr_ctxs, state); }
+ Status close(RuntimeState* state) override { return Status::OK(); }
+
Status do_partitioning(RuntimeState* state, Block* block) const override;
ChannelField get_channel_ids() const override { return {_hash_vals.data(),
sizeof(uint32_t)}; }
@@ -108,8 +111,5 @@ struct SpillPartitionChannelIds {
return ((l >> 16) | (l << 16)) % r;
}
};
-
-} // namespace vectorized
-} // namespace doris
-
#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp
b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp
index f7435249c20..92e52af4c67 100644
--- a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp
+++ b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp
@@ -24,28 +24,51 @@
#include "vec/core/block.h"
#include "vec/exec/skewed_partition_rebalancer.h"
+#include "vec/runtime/partitioner.h"
namespace doris::vectorized {
-template <typename PartitionFunction>
-class ScaleWriterPartitioningExchanger {
+class ScaleWriterPartitioner final : public PartitionerBase {
public:
- ScaleWriterPartitioningExchanger(int channel_size, PartitionFunction&
partition_function,
- int partition_count, int task_count, int
task_bucket_count,
- long
min_partition_data_processed_rebalance_threshold,
- long
min_data_processed_rebalance_threshold)
- : _channel_size(channel_size),
- _partition_function(partition_function),
+ using HashValType = uint32_t;
+ ScaleWriterPartitioner(int channel_size, int partition_count, int
task_count,
+ int task_bucket_count,
+ long
min_partition_data_processed_rebalance_threshold,
+ long min_data_processed_rebalance_threshold)
+ : PartitionerBase(partition_count),
+ _channel_size(channel_size),
_partition_rebalancer(partition_count, task_count,
task_bucket_count,
min_partition_data_processed_rebalance_threshold,
min_data_processed_rebalance_threshold),
_partition_row_counts(partition_count, 0),
_partition_writer_ids(partition_count, -1),
- _partition_writer_indexes(partition_count, 0) {}
+ _partition_writer_indexes(partition_count, 0),
+ _task_count(task_count),
+ _task_bucket_count(task_bucket_count),
+ _min_partition_data_processed_rebalance_threshold(
+ min_partition_data_processed_rebalance_threshold),
+
_min_data_processed_rebalance_threshold(min_data_processed_rebalance_threshold)
{
+ _crc_partitioner =
+
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+ _partition_count);
+ }
+
+ ~ScaleWriterPartitioner() override = default;
+
+ Status init(const std::vector<TExpr>& texprs) override {
+ return _crc_partitioner->init(texprs);
+ }
+
+ Status prepare(RuntimeState* state, const RowDescriptor& row_desc)
override {
+ return _crc_partitioner->prepare(state, row_desc);
+ }
+
+ Status open(RuntimeState* state) override { return
_crc_partitioner->open(state); }
+
+ Status close(RuntimeState* state) override { return
_crc_partitioner->close(state); }
- std::vector<std::vector<uint32_t>> accept(Block* block) {
- std::vector<std::vector<uint32_t>> writerAssignments(_channel_size,
-
std::vector<uint32_t>());
+ Status do_partitioning(RuntimeState* state, Block* block) const override {
+ _hash_vals.resize(block->rows());
for (int partition_id = 0; partition_id <
_partition_row_counts.size(); partition_id++) {
_partition_row_counts[partition_id] = 0;
_partition_writer_ids[partition_id] = -1;
@@ -53,40 +76,59 @@ public:
_partition_rebalancer.rebalance();
- for (int position = 0; position < block->rows(); position++) {
- int partition_id = _partition_function.get_partition(block,
position);
+ RETURN_IF_ERROR(_crc_partitioner->do_partitioning(state, block));
+ const auto* crc_values =
_crc_partitioner->get_channel_ids().get<uint32_t>();
+ for (size_t position = 0; position < block->rows(); position++) {
+ int partition_id = crc_values[position];
_partition_row_counts[partition_id] += 1;
// Get writer id for this partition by looking at the scaling state
int writer_id = _partition_writer_ids[partition_id];
if (writer_id == -1) {
- writer_id = get_next_writer_id(partition_id);
+ writer_id = _get_next_writer_id(partition_id);
_partition_writer_ids[partition_id] = writer_id;
}
- writerAssignments[writer_id].push_back(position);
+ _hash_vals[position] = writer_id;
}
- for (int partition_id = 0; partition_id <
_partition_row_counts.size(); partition_id++) {
+ for (size_t partition_id = 0; partition_id <
_partition_row_counts.size(); partition_id++) {
_partition_rebalancer.add_partition_row_count(partition_id,
_partition_row_counts[partition_id]);
}
_partition_rebalancer.add_data_processed(block->bytes());
- return writerAssignments;
+ return Status::OK();
+ }
+
+ ChannelField get_channel_ids() const override {
+ return {_hash_vals.data(), sizeof(HashValType)};
}
- int get_next_writer_id(int partition_id) {
+ Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>&
partitioner) override {
+ partitioner.reset(new ScaleWriterPartitioner(
+ _channel_size, _partition_count, _task_count,
_task_bucket_count,
+ _min_partition_data_processed_rebalance_threshold,
+ _min_data_processed_rebalance_threshold));
+ return Status::OK();
+ }
+
+private:
+ int _get_next_writer_id(int partition_id) const {
return _partition_rebalancer.get_task_id(partition_id,
_partition_writer_indexes[partition_id]++);
}
-private:
int _channel_size;
- PartitionFunction& _partition_function;
- SkewedPartitionRebalancer _partition_rebalancer;
- std::vector<int> _partition_row_counts;
- std::vector<int> _partition_writer_ids;
- std::vector<int> _partition_writer_indexes;
+ std::unique_ptr<PartitionerBase> _crc_partitioner;
+ mutable SkewedPartitionRebalancer _partition_rebalancer;
+ mutable std::vector<int> _partition_row_counts;
+ mutable std::vector<int> _partition_writer_ids;
+ mutable std::vector<int> _partition_writer_indexes;
+ mutable std::vector<HashValType> _hash_vals;
+ const int _task_count;
+ const int _task_bucket_count;
+ const long _min_partition_data_processed_rebalance_threshold;
+ const long _min_data_processed_rebalance_threshold;
};
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/sink/tablet_sink_hash_partitioner.cpp
b/be/src/vec/sink/tablet_sink_hash_partitioner.cpp
new file mode 100644
index 00000000000..7e854d19add
--- /dev/null
+++ b/be/src/vec/sink/tablet_sink_hash_partitioner.cpp
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/tablet_sink_hash_partitioner.h"
+
+#include "pipeline/exec/operator.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+TabletSinkHashPartitioner::TabletSinkHashPartitioner(
+ size_t partition_count, int64_t txn_id, const TOlapTableSchemaParam&
tablet_sink_schema,
+ const TOlapTablePartitionParam& tablet_sink_partition,
+ const TOlapTableLocationParam& tablet_sink_location, const TTupleId&
tablet_sink_tuple_id,
+ pipeline::ExchangeSinkLocalState* local_state)
+ : PartitionerBase(partition_count),
+ _txn_id(txn_id),
+ _tablet_sink_schema(tablet_sink_schema),
+ _tablet_sink_partition(tablet_sink_partition),
+ _tablet_sink_location(tablet_sink_location),
+ _tablet_sink_tuple_id(tablet_sink_tuple_id),
+ _local_state(local_state) {}
+
+Status TabletSinkHashPartitioner::init(const std::vector<TExpr>& texprs) {
+ return Status::OK();
+}
+
+Status TabletSinkHashPartitioner::prepare(RuntimeState* state, const
RowDescriptor& row_desc) {
+ return Status::OK();
+}
+
+Status TabletSinkHashPartitioner::open(RuntimeState* state) {
+ _schema = std::make_shared<OlapTableSchemaParam>();
+ RETURN_IF_ERROR(_schema->init(_tablet_sink_schema));
+ _vpartition = std::make_unique<VOlapTablePartitionParam>(_schema,
_tablet_sink_partition);
+ RETURN_IF_ERROR(_vpartition->init());
+ auto find_tablet_mode =
vectorized::OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
+ _tablet_finder =
+ std::make_unique<vectorized::OlapTabletFinder>(_vpartition.get(),
find_tablet_mode);
+ _tablet_sink_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_tablet_sink_tuple_id);
+ _tablet_sink_row_desc =
+ state->obj_pool()->add(new RowDescriptor(_tablet_sink_tuple_desc,
false));
+ auto& ctxs =
+
_local_state->parent()->cast<pipeline::ExchangeSinkOperatorX>().tablet_sink_expr_ctxs();
+ _tablet_sink_expr_ctxs.resize(ctxs.size());
+ for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i]));
+ }
+ // if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, we
handle the processing of auto_increment column
+ // on exchange node rather than on TabletWriter
+ _block_convertor =
+
std::make_unique<vectorized::OlapTableBlockConvertor>(_tablet_sink_tuple_desc);
+ _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(),
state->batch_size());
+ _location = state->obj_pool()->add(new
OlapTableLocationParam(_tablet_sink_location));
+ _row_distribution.init(
+ {.state = state,
+ .block_convertor = _block_convertor.get(),
+ .tablet_finder = _tablet_finder.get(),
+ .vpartition = _vpartition.get(),
+ .add_partition_request_timer =
_local_state->add_partition_request_timer(),
+ .txn_id = _txn_id,
+ .pool = state->obj_pool(),
+ .location = _location,
+ .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs,
+ .schema = _schema,
+ .caller = (void*)this,
+ .create_partition_callback =
&TabletSinkHashPartitioner::empty_callback_function});
+ RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc));
+ return Status::OK();
+}
+
+Status TabletSinkHashPartitioner::do_partitioning(RuntimeState* state, Block*
block) const {
+ _hash_vals.resize(block->rows());
+ if (block->empty()) {
+ return Status::OK();
+ }
+ std::fill(_hash_vals.begin(), _hash_vals.end(), -1);
+ bool has_filtered_rows = false;
+ int64_t filtered_rows = 0;
+ int64_t number_input_rows = _local_state->rows_input_counter()->value();
+ std::shared_ptr<vectorized::Block> convert_block =
std::make_shared<vectorized::Block>();
+ RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
+ *block, convert_block, filtered_rows, has_filtered_rows,
_row_part_tablet_ids,
+ number_input_rows));
+ if (_row_distribution.batching_rows() > 0) {
+ SCOPED_TIMER(_local_state->send_new_partition_timer());
+ RETURN_IF_ERROR(_send_new_partition_batch(state, block));
+ } else {
+ const auto& row_ids = _row_part_tablet_ids[0].row_ids;
+ const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids;
+ for (int idx = 0; idx < row_ids.size(); ++idx) {
+ const auto& row = row_ids[idx];
+ const auto& tablet_id_hash =
+ HashUtil::zlib_crc_hash(&tablet_ids[idx],
sizeof(HashValType), 0);
+ _hash_vals[row] = tablet_id_hash % _partition_count;
+ }
+ }
+
+ return Status::OK();
+}
+
+ChannelField TabletSinkHashPartitioner::get_channel_ids() const {
+ return {_hash_vals.data(), sizeof(HashValType)};
+}
+
+Status TabletSinkHashPartitioner::clone(RuntimeState* state,
+ std::unique_ptr<PartitionerBase>&
partitioner) {
+ partitioner.reset(new TabletSinkHashPartitioner(_partition_count, _txn_id,
_tablet_sink_schema,
+ _tablet_sink_partition,
_tablet_sink_location,
+ _tablet_sink_tuple_id,
_local_state));
+ return Status::OK();
+}
+
+Status TabletSinkHashPartitioner::close(RuntimeState* state) {
+ if (_block_convertor != nullptr && _tablet_finder != nullptr) {
+
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
+
_tablet_finder->num_filtered_rows());
+ state->update_num_rows_load_unselected(
+ _tablet_finder->num_immutable_partition_filtered_rows());
+ // sink won't see those filtered rows, we should compensate here
+ state->set_num_rows_load_total(state->num_rows_load_filtered() +
+ state->num_rows_load_unselected());
+ }
+ return Status::OK();
+}
+
+Status TabletSinkHashPartitioner::_send_new_partition_batch(RuntimeState*
state,
+ vectorized::Block*
input_block) const {
+ RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
+ auto& p = _local_state->parent()->cast<pipeline::ExchangeSinkOperatorX>();
+ // Recovery back
+ _row_distribution.clear_batching_stats();
+ _row_distribution._batching_block->clear_column_data();
+ _row_distribution._deal_batched = false;
+ RETURN_IF_ERROR(p.sink(state, input_block, false));
+ return Status::OK();
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/sink/tablet_sink_hash_partitioner.h
b/be/src/vec/sink/tablet_sink_hash_partitioner.h
new file mode 100644
index 00000000000..dbbc7c88926
--- /dev/null
+++ b/be/src/vec/sink/tablet_sink_hash_partitioner.h
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/tablet_info.h"
+#include "pipeline/exec/exchange_sink_operator.h"
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/runtime/partitioner.h"
+#include "vec/sink/vrow_distribution.h"
+#include "vec/sink/vtablet_block_convertor.h"
+#include "vec/sink/vtablet_finder.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+class TabletSinkHashPartitioner final : public PartitionerBase {
+public:
+ using HashValType = int64_t;
+ TabletSinkHashPartitioner(size_t partition_count, int64_t txn_id,
+ const TOlapTableSchemaParam& tablet_sink_schema,
+ const TOlapTablePartitionParam&
tablet_sink_partition,
+ const TOlapTableLocationParam&
tablet_sink_location,
+ const TTupleId& tablet_sink_tuple_id,
+ pipeline::ExchangeSinkLocalState* local_state);
+
+ ~TabletSinkHashPartitioner() override = default;
+
+ Status init(const std::vector<TExpr>& texprs) override;
+
+ Status prepare(RuntimeState* state, const RowDescriptor& row_desc)
override;
+
+ Status open(RuntimeState* state) override;
+
+ Status do_partitioning(RuntimeState* state, Block* block) const override;
+
+ ChannelField get_channel_ids() const override;
+ Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>&
partitioner) override;
+
+ Status close(RuntimeState* state) override;
+
+private:
+ static Status empty_callback_function(void* sender,
TCreatePartitionResult* result) {
+ return Status::OK();
+ }
+
+ Status _send_new_partition_batch(RuntimeState* state, vectorized::Block*
input_block) const;
+
+ const int64_t _txn_id = -1;
+ const TOlapTableSchemaParam _tablet_sink_schema;
+ const TOlapTablePartitionParam _tablet_sink_partition;
+ const TOlapTableLocationParam _tablet_sink_location;
+ const TTupleId _tablet_sink_tuple_id;
+ mutable pipeline::ExchangeSinkLocalState* _local_state;
+ mutable OlapTableLocationParam* _location = nullptr;
+ mutable vectorized::VRowDistribution _row_distribution;
+ mutable vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
+ mutable std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr;
+ mutable std::unique_ptr<vectorized::OlapTabletFinder> _tablet_finder =
nullptr;
+ mutable std::shared_ptr<OlapTableSchemaParam> _schema = nullptr;
+ mutable std::unique_ptr<vectorized::OlapTableBlockConvertor>
_block_convertor = nullptr;
+ mutable TupleDescriptor* _tablet_sink_tuple_desc = nullptr;
+ mutable RowDescriptor* _tablet_sink_row_desc = nullptr;
+ mutable std::vector<vectorized::RowPartTabletIds> _row_part_tablet_ids;
+ mutable std::vector<HashValType> _hash_vals;
+};
+#include "common/compile_check_end.h"
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 66aacc59f6c..abed6133458 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -239,18 +239,18 @@
BlockSerializer::BlockSerializer(pipeline::ExchangeSinkLocalState* parent, bool
: _parent(parent), _is_local(is_local),
_batch_size(parent->state()->batch_size()) {}
Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest,
size_t num_receivers,
- bool* serialized, bool eos,
- const std::vector<uint32_t>*
rows) {
+ bool* serialized, bool eos,
const uint32_t* data,
+ const uint32_t offset, const
uint32_t size) {
if (_mutable_block == nullptr) {
_mutable_block = MutableBlock::create_unique(block->clone_empty());
}
{
SCOPED_TIMER(_parent->merge_block_timer());
- if (rows) {
- if (!rows->empty()) {
- const auto* begin = rows->data();
- RETURN_IF_ERROR(_mutable_block->add_rows(block, begin, begin +
rows->size()));
+ if (data) {
+ if (size > 0) {
+ RETURN_IF_ERROR(
+ _mutable_block->add_rows(block, data + offset, data +
offset + size));
}
} else if (!block->empty()) {
RETURN_IF_ERROR(_mutable_block->merge(*block));
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 16ea49e443c..0ff1f252d54 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -80,7 +80,8 @@ public:
BlockSerializer() : _batch_size(0) {};
#endif
Status next_serialized_block(Block* src, PBlock* dest, size_t
num_receivers, bool* serialized,
- bool eos, const std::vector<uint32_t>* rows =
nullptr);
+ bool eos, const uint32_t* data = nullptr,
+ const uint32_t offset = 0, const uint32_t
size = 0);
Status serialize_block(PBlock* dest, size_t num_receivers = 1);
Status serialize_block(const Block* src, PBlock* dest, size_t
num_receivers = 1);
@@ -150,7 +151,8 @@ public:
Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos =
false);
Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
bool eos = false);
- Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos)
{
+ Status add_rows(Block* block, const uint32_t* data, const uint32_t offset,
const uint32_t size,
+ bool eos) {
if (_fragment_instance_id.lo == -1) {
return Status::OK();
}
@@ -160,7 +162,7 @@ public:
_pblock = std::make_unique<PBlock>();
}
RETURN_IF_ERROR(_serializer.next_serialized_block(block,
_pblock.get(), 1, &serialized, eos,
- &rows));
+ data, offset, size));
if (serialized) {
RETURN_IF_ERROR(_send_current_block(eos));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]