This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 82f2878bd4e [fix](scan) Avoid memory allocated by buffered_reader from
being traced (#42558)
82f2878bd4e is described below
commit 82f2878bd4e632ec4564b5e61f51b84fcce7056c
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Oct 28 15:07:06 2024 +0800
[fix](scan) Avoid memory allocated by buffered_reader from being traced
(#42558)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---------
Co-authored-by: Pxl <[email protected]>
Co-authored-by: Gabriel <[email protected]>
---
be/src/io/fs/buffered_reader.cpp | 19 +-
be/src/io/fs/buffered_reader.h | 11 +-
be/src/pipeline/exec/exchange_sink_buffer.cpp | 49 +---
be/src/pipeline/exec/exchange_sink_buffer.h | 28 +-
be/src/pipeline/exec/exchange_sink_operator.cpp | 135 +++++-----
be/src/pipeline/exec/exchange_sink_operator.h | 29 +-
be/src/pipeline/exec/result_file_sink_operator.cpp | 12 +-
be/src/pipeline/exec/result_file_sink_operator.h | 9 -
be/src/util/slice.h | 6 +
be/src/vec/runtime/partitioner.cpp | 41 +--
be/src/vec/runtime/partitioner.h | 49 +---
be/src/vec/sink/vdata_stream_sender.cpp | 291 +++++----------------
be/src/vec/sink/vdata_stream_sender.h | 232 +++++-----------
13 files changed, 290 insertions(+), 621 deletions(-)
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 43445ed42ef..ce4767dd040 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -23,6 +23,7 @@
#include <algorithm>
#include <chrono>
+#include <memory>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
@@ -31,6 +32,7 @@
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/runtime_profile.h"
+#include "util/slice.h"
#include "util/threadpool.h"
namespace doris {
@@ -270,7 +272,7 @@ void MergeRangeFileReader::_read_in_box(RangeCachedData&
cached_data, size_t off
}
if (copy_out != nullptr) {
memcpy(copy_out + to_handle - remaining,
- _boxes[box_index] + cached_data.box_start_offset[i],
box_to_handle);
+ _boxes[box_index].data() +
cached_data.box_start_offset[i], box_to_handle);
}
remaining -= box_to_handle;
cached_data.box_start_offset[i] += box_to_handle;
@@ -307,14 +309,15 @@ void MergeRangeFileReader::_read_in_box(RangeCachedData&
cached_data, size_t off
Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset,
size_t to_read,
size_t* bytes_read, const IOContext*
io_ctx) {
- if (_read_slice == nullptr) {
- _read_slice = new char[READ_SLICE_SIZE];
+ if (!_read_slice) {
+ _read_slice = std::make_unique<OwnedSlice>(READ_SLICE_SIZE);
}
+
*bytes_read = 0;
{
SCOPED_RAW_TIMER(&_statistics.read_time);
- RETURN_IF_ERROR(
- _reader->read_at(start_offset, Slice(_read_slice, to_read),
bytes_read, io_ctx));
+ RETURN_IF_ERROR(_reader->read_at(start_offset,
Slice(_read_slice->data(), to_read),
+ bytes_read, io_ctx));
_statistics.merged_io++;
_statistics.merged_bytes += *bytes_read;
}
@@ -328,8 +331,8 @@ Status MergeRangeFileReader::_fill_box(int range_index,
size_t start_offset, siz
auto fill_box = [&](int16 fill_box_ref, uint32 box_usage, size_t
box_copy_end) {
size_t copy_size = std::min(box_copy_end - copy_start, BOX_SIZE -
box_usage);
- memcpy(_boxes[fill_box_ref] + box_usage, _read_slice + copy_start -
start_offset,
- copy_size);
+ memcpy(_boxes[fill_box_ref].data() + box_usage,
+ _read_slice->data() + copy_start - start_offset, copy_size);
filled_boxes.emplace_back(fill_box_ref, box_usage, copy_start,
copy_start + copy_size);
copy_start += copy_size;
_last_box_ref = fill_box_ref;
@@ -367,7 +370,7 @@ Status MergeRangeFileReader::_fill_box(int range_index,
size_t start_offset, siz
}
// apply for new box to copy data
while (copy_start < range_copy_end && _boxes.size() < NUM_BOX) {
- _boxes.emplace_back(new char[BOX_SIZE]);
+ _boxes.emplace_back(BOX_SIZE);
_box_ref.emplace_back(0);
fill_box(_boxes.size() - 1, 0, range_copy_end);
}
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 70c8445db23..907ea11b216 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -168,12 +168,7 @@ public:
}
}
- ~MergeRangeFileReader() override {
- delete[] _read_slice;
- for (char* box : _boxes) {
- delete[] box;
- }
- }
+ ~MergeRangeFileReader() override = default;
Status close() override {
if (!_closed) {
@@ -244,8 +239,8 @@ private:
bool _closed = false;
size_t _remaining;
- char* _read_slice = nullptr;
- std::vector<char*> _boxes;
+ std::unique_ptr<OwnedSlice> _read_slice;
+ std::vector<OwnedSlice> _boxes;
int16 _last_box_ref = -1;
uint32 _last_box_usage = 0;
std::vector<int16> _box_ref;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index f0f66774092..31973997059 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -86,14 +86,13 @@ void BroadcastPBlockHolderMemLimiter::release(const
BroadcastPBlockHolder& holde
} // namespace vectorized
namespace pipeline {
-
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId
dest_node_id, int send_id,
PlanNodeId node_id, int be_number,
RuntimeState* state,
ExchangeSinkLocalState* parent)
: HasTaskExecutionCtx(state),
_queue_capacity(0),
_is_finishing(false),
- _query_id(query_id),
+ _query_id(std::move(query_id)),
_dest_node_id(dest_node_id),
_sender_id(send_id),
_node_id(node_id),
@@ -111,12 +110,6 @@ void ExchangeSinkBuffer::close() {
//_instance_to_request.clear();
}
-void ExchangeSinkBuffer::_set_ready_to_finish(bool all_done) {
- if (_finish_dependency && _should_stop && all_done) {
- _finish_dependency->set_ready();
- }
-}
-
void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
if (_is_finishing) {
return;
@@ -136,7 +129,6 @@ void ExchangeSinkBuffer::register_sink(TUniqueId
fragment_instance_id) {
finst_id.set_hi(fragment_instance_id.hi);
finst_id.set_lo(fragment_instance_id.lo);
_rpc_channel_is_idle[low_id] = true;
- _instance_to_rpc_ctx[low_id] = {};
_instance_to_receiver_eof[low_id] = false;
_instance_to_rpc_time[low_id] = 0;
_construct_request(low_id, finst_id);
@@ -161,7 +153,6 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&&
request) {
if (_rpc_channel_is_idle[ins_id]) {
send_now = true;
_rpc_channel_is_idle[ins_id] = false;
- _busy_channels++;
}
if (request.block) {
RETURN_IF_ERROR(
@@ -202,7 +193,6 @@ Status
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
if (_rpc_channel_is_idle[ins_id]) {
send_now = true;
_rpc_channel_is_idle[ins_id] = false;
- _busy_channels++;
}
if (request.block_holder->get_block()) {
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
@@ -227,7 +217,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
_instance_to_broadcast_package_queue[id];
if (_is_finishing) {
- _turn_off_channel(id);
+ _turn_off_channel(id, lock);
return Status::OK();
}
@@ -245,9 +235,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
auto send_callback = request.channel->get_send_callback(id,
request.eos);
- _instance_to_rpc_ctx[id]._send_callback = send_callback;
- _instance_to_rpc_ctx[id].is_cancelled = false;
-
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
if (config::exchange_sink_ignore_eovercrowded) {
send_callback->cntl_->ignore_eovercrowded();
@@ -326,12 +313,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
brpc_request->set_allocated_block(request.block_holder->get_block());
}
auto send_callback = request.channel->get_send_callback(id,
request.eos);
-
- ExchangeRpcContext rpc_ctx;
- rpc_ctx._send_callback = send_callback;
- rpc_ctx.is_cancelled = false;
- _instance_to_rpc_ctx[id] = rpc_ctx;
-
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
if (config::exchange_sink_ignore_eovercrowded) {
send_callback->cntl_->ignore_eovercrowded();
@@ -395,7 +376,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
broadcast_q.pop();
} else {
- _turn_off_channel(id);
+ _rpc_channel_is_idle[id] = true;
}
return Status::OK();
@@ -425,7 +406,7 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
__builtin_unreachable();
} else {
std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[id]);
- _turn_off_channel(id);
+ _turn_off_channel(id, lock);
}
}
@@ -434,14 +415,12 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const
std::string& err) {
<< ",_sender_id: " << _sender_id << ", node id: " << _node_id <<
", err: " << err;
_is_finishing = true;
_context->cancel(Status::Cancelled(err));
- std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
- _turn_off_channel(id, true);
}
void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
- _turn_off_channel(id, true);
+ _turn_off_channel(id, lock);
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>&
broadcast_q =
_instance_to_broadcast_package_queue[id];
for (; !broadcast_q.empty(); broadcast_q.pop()) {
@@ -473,17 +452,17 @@ bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId
id) {
return _instance_to_receiver_eof[id];
}
-void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, bool cleanup) {
+// The unused parameter `with_lock` is to ensure that the function is called
when the lock is held.
+void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id,
+ std::unique_lock<std::mutex>&
/*with_lock*/) {
if (!_rpc_channel_is_idle[id]) {
_rpc_channel_is_idle[id] = true;
- auto all_done = _busy_channels.fetch_sub(1) == 1;
- _set_ready_to_finish(all_done);
- if (cleanup && all_done) {
- auto weak_task_ctx = weak_task_exec_ctx();
- if (auto pip_ctx = weak_task_ctx.lock()) {
- _parent->set_reach_limit();
- }
- }
+ }
+ _instance_to_receiver_eof[id] = true;
+
+ auto weak_task_ctx = weak_task_exec_ctx();
+ if (auto pip_ctx = weak_task_ctx.lock()) {
+ _parent->on_channel_finished(id);
}
}
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 60193b1d7e6..972e366c027 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -24,6 +24,7 @@
#include <parallel_hashmap/phmap.h>
#include <atomic>
+#include <cstdint>
#include <list>
#include <memory>
#include <mutex>
@@ -50,7 +51,7 @@ class ExchangeSinkLocalState;
} // namespace pipeline
namespace vectorized {
-class PipChannel;
+class Channel;
// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast
shuffle, one PBlock
// will be shared between different channel, so we have to use a ref count to
mark if this
@@ -110,14 +111,14 @@ private:
namespace pipeline {
struct TransmitInfo {
- vectorized::PipChannel* channel = nullptr;
+ vectorized::Channel* channel = nullptr;
std::unique_ptr<PBlock> block;
bool eos;
Status exec_status;
};
struct BroadcastTransmitInfo {
- vectorized::PipChannel* channel = nullptr;
+ vectorized::Channel* channel = nullptr;
std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr;
bool eos;
};
@@ -177,11 +178,6 @@ private:
bool _eos;
};
-struct ExchangeRpcContext {
- std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> _send_callback;
- bool is_cancelled = false;
-};
-
// Each ExchangeSinkOperator have one ExchangeSinkBuffer
class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
public:
@@ -202,12 +198,10 @@ public:
_finish_dependency = finish_dependency;
}
- void set_should_stop() {
- _should_stop = true;
- _set_ready_to_finish(_busy_channels == 0);
- }
-
void set_low_memory_mode() { _queue_capacity = 8; }
+ void set_broadcast_dependency(std::shared_ptr<Dependency>
broadcast_dependency) {
+ _broadcast_dependency = broadcast_dependency;
+ }
private:
friend class ExchangeSinkLocalState;
@@ -230,11 +224,9 @@ private:
phmap::flat_hash_map<InstanceLoId, std::shared_ptr<PTransmitDataParams>>
_instance_to_request;
// One channel is corresponding to a downstream instance.
phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;
- // Number of busy channels;
- std::atomic<int> _busy_channels = 0;
+
phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
- phmap::flat_hash_map<InstanceLoId, ExchangeRpcContext>
_instance_to_rpc_ctx;
std::atomic<bool> _is_finishing;
PUniqueId _query_id;
@@ -254,14 +246,14 @@ private:
inline void _failed(InstanceLoId id, const std::string& err);
inline void _set_receiver_eof(InstanceLoId id);
inline bool _is_receiver_eof(InstanceLoId id);
- inline void _turn_off_channel(InstanceLoId id, bool cleanup = false);
+ inline void _turn_off_channel(InstanceLoId id,
std::unique_lock<std::mutex>& with_lock);
void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
int64_t get_sum_rpc_time();
std::atomic<int> _total_queue_size = 0;
std::shared_ptr<Dependency> _queue_dependency = nullptr;
std::shared_ptr<Dependency> _finish_dependency = nullptr;
- std::atomic<bool> _should_stop = false;
+ std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
ExchangeSinkLocalState* _parent = nullptr;
};
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 3cfc5e45cfe..14290145664 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -23,6 +23,7 @@
#include <gen_cpp/types.pb.h>
#include <memory>
+#include <mutex>
#include <random>
#include "common/status.h"
@@ -31,6 +32,8 @@
#include "pipeline/exec/operator.h"
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
+#include "util/runtime_profile.h"
+#include "util/uid_util.h"
#include "vec/columns/column_const.h"
#include "vec/exprs/vexpr.h"
@@ -68,8 +71,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced",
TUnit::UNIT, 1);
_overall_throughput = _profile->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
- std::bind<int64_t>(&RuntimeProfile::units_per_second,
_bytes_sent_counter,
- _profile->total_time_counter()),
+ [this]() {
+ return RuntimeProfile::units_per_second(_bytes_sent_counter,
+
_profile->total_time_counter());
+ },
"");
_merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime");
_local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent",
TUnit::BYTES);
@@ -84,15 +89,15 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
const auto& fragment_instance_id = p._dests[i].fragment_instance_id;
if (fragment_id_to_channel_index.find(fragment_instance_id.lo) ==
fragment_id_to_channel_index.end()) {
- channel_shared_ptrs.emplace_back(
- new vectorized::PipChannel(this, p._row_desc,
p._dests[i].brpc_server,
- fragment_instance_id,
p._dest_node_id));
- fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
- channel_shared_ptrs.size() -
1);
- channels.push_back(channel_shared_ptrs.back().get());
+ channels.push_back(std::make_shared<vectorized::Channel>(
+ this, p._dests[i].brpc_server, fragment_instance_id,
p._dest_node_id));
+ fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
channels.size() - 1);
+
+ if (fragment_instance_id.hi != -1 && fragment_instance_id.lo !=
-1) {
+ _working_channels_count++;
+ }
} else {
- channel_shared_ptrs.emplace_back(
-
channel_shared_ptrs[fragment_id_to_channel_index[fragment_instance_id.lo]]);
+
channels.emplace_back(channels[fragment_id_to_channel_index[fragment_instance_id.lo]]);
}
}
@@ -106,6 +111,24 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
return Status::OK();
}
+void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) {
+ std::lock_guard<std::mutex> lock(_finished_channels_mutex);
+
+ if (_finished_channels.contains(channel_id)) {
+ LOG(WARNING) << "query: " << print_id(_state->query_id())
+ << ", on_channel_finished on already finished channel: "
<< channel_id;
+ return;
+ } else {
+ _finished_channels.emplace(channel_id);
+ if (_working_channels_count.fetch_sub(1) == 1) {
+ set_reach_limit();
+ if (_finish_dependency) {
+ _finish_dependency->set_ready();
+ }
+ }
+ }
+}
+
Status ExchangeSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
@@ -140,7 +163,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
_queue_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
"ExchangeSinkQueueDependency", true);
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
- _finish_dependency->block();
}
if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1)
&&
@@ -151,7 +173,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency);
} else if (local_size > 0) {
size_t dep_id = 0;
- for (auto* channel : channels) {
+ for (auto& channel : channels) {
if (channel->is_local()) {
if (auto dep = channel->get_local_channel_dependency()) {
_local_channels_dependency.push_back(dep);
@@ -166,16 +188,18 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
}
if (_part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();
- _partitioner.reset(new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
- channels.size()));
+ _partitioner =
+
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+ channels.size());
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})",
_partition_count));
} else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
- _partition_count = channel_shared_ptrs.size();
- _partitioner.reset(new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
- channel_shared_ptrs.size()));
+ _partition_count = channels.size();
+ _partitioner =
+
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+ channels.size());
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
@@ -221,12 +245,13 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
_partition_count =
channels.size() *
config::table_sink_partition_write_max_partition_nums_per_writer;
- _partitioner.reset(new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
- _partition_count));
- _partition_function.reset(new
HashPartitionFunction(_partitioner.get()));
+ _partitioner =
+
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+ _partition_count);
+ _partition_function =
std::make_unique<HashPartitionFunction>(_partitioner.get());
- scale_writer_partitioning_exchanger.reset(new
vectorized::ScaleWriterPartitioningExchanger<
- HashPartitionFunction>(
+ scale_writer_partitioning_exchanger = std::make_unique<
+
vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>(
channels.size(), *_partition_function, _partition_count,
channels.size(), 1,
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
/
state->task_num() ==
@@ -239,7 +264,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
0
?
config::table_sink_partition_write_min_data_processed_rebalance_threshold
:
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
- state->task_num()));
+ state->task_num());
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
@@ -358,7 +383,7 @@ void
ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrT
Status st) {
channel->set_receiver_eof(st);
// Chanel will not send RPC to the downstream when eof, so close chanel by
OK status.
- static_cast<void>(channel->close(state, Status::OK()));
+ static_cast<void>(channel->close(state));
}
Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
block, bool eos) {
@@ -367,7 +392,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows());
SCOPED_TIMER(local_state.exec_time_counter());
bool all_receiver_eof = true;
- for (auto* channel : local_state.channels) {
+ for (auto& channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
all_receiver_eof = false;
break;
@@ -394,13 +419,13 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
if (!block->empty()) {
Status status;
size_t idx = 0;
- for (auto* channel : local_state.channels) {
+ for (auto& channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
// If this channel is the last, we can move this block
to downstream pipeline.
// Otherwise, this block also need to be broadcasted
to other channels so should be copied.
DCHECK_GE(local_state._last_local_channel_idx, 0);
status = channel->send_local_block(
- block, idx ==
local_state._last_local_channel_idx);
+ block, eos, idx ==
local_state._last_local_channel_idx);
HANDLE_CHANNEL_STATUS(state, channel, status);
}
idx++;
@@ -430,7 +455,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
size_t idx = 0;
bool moved = false;
- for (auto* channel : local_state.channels) {
+ for (auto& channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
Status status;
if (channel->is_local()) {
@@ -438,7 +463,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
// Otherwise, this block also need to be
broadcasted to other channels so should be copied.
DCHECK_GE(local_state._last_local_channel_idx,
0);
status = channel->send_local_block(
- &cur_block, idx ==
local_state._last_local_channel_idx);
+ &cur_block, eos,
+ idx ==
local_state._last_local_channel_idx);
moved = idx ==
local_state._last_local_channel_idx;
} else {
status =
channel->send_broadcast_block(block_holder, eos);
@@ -459,20 +485,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
}
} else if (_part_type == TPartitionType::RANDOM) {
// 1. select channel
- vectorized::PipChannel* current_channel =
- local_state.channels[local_state.current_channel_idx];
+ auto& current_channel =
local_state.channels[local_state.current_channel_idx];
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
- auto status = current_channel->send_local_block(block, true);
+ auto status = current_channel->send_local_block(block, eos,
true);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
- RETURN_IF_ERROR(local_state._serializer.serialize_block(
- block, current_channel->ch_cur_pb_block()));
- auto status =
-
current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos);
+ auto pblock = std::make_unique<PBlock>();
+ RETURN_IF_ERROR(local_state._serializer.serialize_block(block,
pblock.get()));
+ auto status =
current_channel->send_remote_block(std::move(pblock), eos);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
- current_channel->ch_roll_pb_block();
}
}
local_state.current_channel_idx =
@@ -494,7 +517,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
} else {
RETURN_IF_ERROR(channel_add_rows(
- state, local_state.channel_shared_ptrs,
local_state._partition_count,
+ state, local_state.channels, local_state._partition_count,
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
}
int64_t new_channel_mem_usage = 0;
@@ -578,20 +601,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
} else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
// Control the number of channels according to the flow, thereby
controlling the number of table sink writers.
// 1. select channel
- vectorized::PipChannel* current_channel =
- local_state.channels[local_state.current_channel_idx];
+ auto& current_channel =
local_state.channels[local_state.current_channel_idx];
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
- auto status = current_channel->send_local_block(block, true);
+ auto status = current_channel->send_local_block(block, eos,
true);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
- RETURN_IF_ERROR(local_state._serializer.serialize_block(
- block, current_channel->ch_cur_pb_block()));
- auto status =
-
current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos);
+ auto pblock = std::make_unique<PBlock>();
+ RETURN_IF_ERROR(local_state._serializer.serialize_block(block,
pblock.get()));
+ auto status =
current_channel->send_remote_block(std::move(pblock), eos);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
- current_channel->ch_roll_pb_block();
}
_data_processed += block->bytes();
}
@@ -613,15 +633,12 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
Status final_st = Status::OK();
if (eos) {
local_state._serializer.reset_block();
- for (int i = 0; i < local_state.channels.size(); ++i) {
- Status st = local_state.channels[i]->close(state, Status::OK());
+ for (auto& channel : local_state.channels) {
+ Status st = channel->close(state);
if (!st.ok() && final_st.ok()) {
final_st = st;
}
}
- if (local_state._sink_buffer) {
- local_state._sink_buffer->set_should_stop();
- }
}
return final_st;
}
@@ -645,8 +662,8 @@ Status
ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vec
}
void ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer*
buffer) {
- for (auto channel : channels) {
- ((vectorized::PipChannel*)channel)->register_exchange_buffer(buffer);
+ for (auto& channel : channels) {
+ channel->register_exchange_buffer(buffer);
}
}
@@ -693,12 +710,12 @@ std::string ExchangeSinkLocalState::debug_string(int
indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
if (_sink_buffer) {
- fmt::format_to(
- debug_string_buffer,
- ", Sink Buffer: (_should_stop = {}, _busy_channels = {},
_is_finishing = {}), "
- "_reach_limit: {}",
- _sink_buffer->_should_stop.load(),
_sink_buffer->_busy_channels.load(),
- _sink_buffer->_is_finishing.load(), _reach_limit.load());
+ fmt::format_to(debug_string_buffer,
+ ", Sink Buffer: (_is_finishing = {}, blocks in queue:
{}, queue capacity: "
+ "{}, queue dep: {}), _reach_limit: {}, working
channels: {}",
+ _sink_buffer->_is_finishing.load(),
_sink_buffer->_total_queue_size,
+ _sink_buffer->_queue_capacity,
(void*)_sink_buffer->_queue_dependency.get(),
+ _reach_limit.load(), _working_channels_count.load());
}
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index a34237145a7..8af944728a2 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -19,7 +19,9 @@
#include <stdint.h>
+#include <atomic>
#include <memory>
+#include <mutex>
#include "common/status.h"
#include "exchange_sink_buffer.h"
@@ -53,13 +55,10 @@ private:
public:
ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
- : Base(parent, state),
- current_channel_idx(0),
- only_local_exchange(false),
- _serializer(this) {
+ : Base(parent, state), _serializer(this) {
_finish_dependency =
std::make_shared<Dependency>(parent->operator_id(),
parent->node_id(),
- parent->get_name() +
"_FINISH_DEPENDENCY", true);
+ parent->get_name() +
"_FINISH_DEPENDENCY", false);
}
std::vector<Dependency*> dependencies() const override {
@@ -112,10 +111,11 @@ public:
return Status::OK();
}
Status _send_new_partition_batch();
- std::vector<vectorized::PipChannel*> channels;
- std::vector<std::shared_ptr<vectorized::PipChannel>> channel_shared_ptrs;
- int current_channel_idx; // index of current channel to send to if _random
== true
- bool only_local_exchange;
+ std::vector<std::shared_ptr<vectorized::Channel>> channels;
+ int current_channel_idx {0}; // index of current channel to send to if
_random == true
+ bool only_local_exchange {false};
+
+ void on_channel_finished(InstanceLoId channel_id);
// for external table sink hash partition
std::unique_ptr<vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>
@@ -123,9 +123,8 @@ public:
private:
friend class ExchangeSinkOperatorX;
- friend class vectorized::Channel<ExchangeSinkLocalState>;
- friend class vectorized::PipChannel;
- friend class vectorized::BlockSerializer<ExchangeSinkLocalState>;
+ friend class vectorized::Channel;
+ friend class vectorized::BlockSerializer;
std::unique_ptr<ExchangeSinkBuffer> _sink_buffer = nullptr;
RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
@@ -154,7 +153,7 @@ private:
int _sender_id;
std::shared_ptr<vectorized::BroadcastPBlockHolderMemLimiter>
_broadcast_pb_mem_limiter;
- vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer;
+ vectorized::BlockSerializer _serializer;
std::shared_ptr<Dependency> _queue_dependency = nullptr;
std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
@@ -203,6 +202,10 @@ private:
std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
std::atomic<bool> _reach_limit = false;
int _last_local_channel_idx = -1;
+
+ std::atomic_int _working_channels_count = 0;
+ std::set<InstanceLoId> _finished_channels;
+ std::mutex _finished_channels_mutex;
};
class ExchangeSinkOperatorX final : public
DataSinkOperatorX<ExchangeSinkLocalState> {
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index a11c4df6625..93026427b86 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -31,9 +31,7 @@ namespace doris::pipeline {
ResultFileSinkLocalState::ResultFileSinkLocalState(DataSinkOperatorXBase*
parent,
RuntimeState* state)
- : AsyncWriterSink<vectorized::VFileResultWriter,
ResultFileSinkOperatorX>(parent, state),
- _serializer(
-
std::make_unique<vectorized::BlockSerializer<ResultFileSinkLocalState>>(this))
{}
+ : AsyncWriterSink<vectorized::VFileResultWriter,
ResultFileSinkOperatorX>(parent, state) {}
ResultFileSinkOperatorX::ResultFileSinkOperatorX(int operator_id, const
RowDescriptor& row_desc,
const std::vector<TExpr>&
t_output_expr)
@@ -145,14 +143,6 @@ Status ResultFileSinkLocalState::close(RuntimeState*
state, Status exec_status)
return Base::close(state, exec_status);
}
-template <typename ChannelPtrType>
-void ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state,
ChannelPtrType channel,
- Status st) {
- channel->set_receiver_eof(st);
- // Chanel will not send RPC to the downstream when eof, so close chanel by
OK status.
- static_cast<void>(channel->close(state, Status::OK()));
-}
-
Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
in_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h
b/be/src/pipeline/exec/result_file_sink_operator.h
index e99eb709a9f..7268efe4de4 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -21,10 +21,6 @@
#include "vec/sink/writer/vfile_result_writer.h"
namespace doris::vectorized {
-template <typename Parent>
-class BlockSerializer;
-template <typename Parent>
-class Channel;
class BroadcastPBlockHolder;
} // namespace doris::vectorized
@@ -55,13 +51,8 @@ public:
private:
friend class ResultFileSinkOperatorX;
- template <typename ChannelPtrType>
- void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel,
Status st);
-
std::shared_ptr<BufferControlBlock> _sender;
- std::vector<vectorized::Channel<ResultFileSinkLocalState>*> _channels;
- std::unique_ptr<vectorized::BlockSerializer<ResultFileSinkLocalState>>
_serializer;
std::shared_ptr<vectorized::BroadcastPBlockHolder> _block_holder;
RuntimeProfile::Counter* _brpc_wait_timer = nullptr;
RuntimeProfile::Counter* _local_send_timer = nullptr;
diff --git a/be/src/util/slice.h b/be/src/util/slice.h
index b38b1147894..fd6bcf0adfb 100644
--- a/be/src/util/slice.h
+++ b/be/src/util/slice.h
@@ -344,6 +344,10 @@ class OwnedSlice : private Allocator<false, false, false,
DefaultMemoryAllocator
public:
OwnedSlice() : _slice((uint8_t*)nullptr, 0) {}
+ OwnedSlice(size_t length)
+ : _slice(reinterpret_cast<char*>(Allocator::alloc(length)),
length),
+ _capacity(length) {}
+
OwnedSlice(OwnedSlice&& src) : _slice(src._slice),
_capacity(src._capacity) {
src._slice.data = nullptr;
src._slice.size = 0;
@@ -369,6 +373,8 @@ public:
}
}
+ char* data() const { return _slice.data; }
+
const Slice& slice() const { return _slice; }
private:
diff --git a/be/src/vec/runtime/partitioner.cpp
b/be/src/vec/runtime/partitioner.cpp
index 89656a74508..660ffe51a83 100644
--- a/be/src/vec/runtime/partitioner.cpp
+++ b/be/src/vec/runtime/partitioner.cpp
@@ -24,9 +24,8 @@
namespace doris::vectorized {
-template <typename HashValueType, typename ChannelIds>
-Status Partitioner<HashValueType, ChannelIds>::do_partitioning(RuntimeState*
state,
- Block* block)
const {
+template <typename ChannelIds>
+Status Crc32HashPartitioner<ChannelIds>::do_partitioning(RuntimeState* state,
Block* block) const {
int rows = block->rows();
if (rows > 0) {
@@ -55,47 +54,23 @@ Status Partitioner<HashValueType,
ChannelIds>::do_partitioning(RuntimeState* sta
template <typename ChannelIds>
void Crc32HashPartitioner<ChannelIds>::_do_hash(const ColumnPtr& column,
uint32_t* __restrict result,
int idx) const {
- column->update_crcs_with_value(result,
Base::_partition_expr_ctxs[idx]->root()->type().type,
+ column->update_crcs_with_value(result,
_partition_expr_ctxs[idx]->root()->type().type,
column->size());
}
-template <typename ChannelIds>
-void XXHashPartitioner<ChannelIds>::_do_hash(const ColumnPtr& column,
uint64_t* __restrict result,
- int /*idx*/) const {
- column->update_hashes_with_value(result);
-}
-
-template <typename ChannelIds>
-Status XXHashPartitioner<ChannelIds>::clone(RuntimeState* state,
- std::unique_ptr<PartitionerBase>&
partitioner) {
- auto* new_partitioner = new XXHashPartitioner(Base::_partition_count);
- partitioner.reset(new_partitioner);
-
new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size());
- for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) {
- RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone(
- state, new_partitioner->_partition_expr_ctxs[i]));
- }
- return Status::OK();
-}
-
template <typename ChannelIds>
Status Crc32HashPartitioner<ChannelIds>::clone(RuntimeState* state,
std::unique_ptr<PartitionerBase>& partitioner) {
- auto* new_partitioner = new Crc32HashPartitioner(Base::_partition_count);
+ auto* new_partitioner = new
Crc32HashPartitioner<ChannelIds>(_partition_count);
partitioner.reset(new_partitioner);
-
new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size());
- for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) {
- RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone(
- state, new_partitioner->_partition_expr_ctxs[i]));
+ new_partitioner->_partition_expr_ctxs.resize(_partition_expr_ctxs.size());
+ for (size_t i = 0; i < _partition_expr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(
+ _partition_expr_ctxs[i]->clone(state,
new_partitioner->_partition_expr_ctxs[i]));
}
return Status::OK();
}
-template class Partitioner<size_t, pipeline::LocalExchangeChannelIds>;
-template class XXHashPartitioner<pipeline::LocalExchangeChannelIds>;
-template class Partitioner<size_t, ShuffleChannelIds>;
-template class XXHashPartitioner<ShuffleChannelIds>;
-template class Partitioner<uint32_t, ShuffleChannelIds>;
template class Crc32HashPartitioner<ShuffleChannelIds>;
template class Crc32HashPartitioner<SpillPartitionChannelIds>;
diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h
index 5607a83327b..e8feb74335a 100644
--- a/be/src/vec/runtime/partitioner.h
+++ b/be/src/vec/runtime/partitioner.h
@@ -58,11 +58,11 @@ protected:
const size_t _partition_count;
};
-template <typename HashValueType, typename ChannelIds>
-class Partitioner : public PartitionerBase {
+template <typename ChannelIds>
+class Crc32HashPartitioner : public PartitionerBase {
public:
- Partitioner(int partition_count) : PartitionerBase(partition_count) {}
- ~Partitioner() override = default;
+ Crc32HashPartitioner(int partition_count) :
PartitionerBase(partition_count) {}
+ ~Crc32HashPartitioner() override = default;
Status init(const std::vector<TExpr>& texprs) override {
return VExpr::create_expr_trees(texprs, _partition_expr_ctxs);
@@ -76,9 +76,9 @@ public:
Status do_partitioning(RuntimeState* state, Block* block) const override;
- ChannelField get_channel_ids() const override {
- return {_hash_vals.data(), sizeof(HashValueType)};
- }
+ ChannelField get_channel_ids() const override { return {_hash_vals.data(),
sizeof(uint32_t)}; }
+
+ Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>&
partitioner) override;
protected:
Status _get_partition_column_result(Block* block, std::vector<int>&
result) const {
@@ -89,38 +89,17 @@ protected:
return Status::OK();
}
- virtual void _do_hash(const ColumnPtr& column, HashValueType* __restrict
result,
- int idx) const = 0;
+ void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int
idx) const;
VExprContextSPtrs _partition_expr_ctxs;
- mutable std::vector<HashValueType> _hash_vals;
+ mutable std::vector<uint32_t> _hash_vals;
};
-template <typename ChannelIds>
-class XXHashPartitioner final : public Partitioner<uint64_t, ChannelIds> {
-public:
- using Base = Partitioner<uint64_t, ChannelIds>;
- XXHashPartitioner(int partition_count) : Partitioner<uint64_t,
ChannelIds>(partition_count) {}
- ~XXHashPartitioner() override = default;
-
- Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>&
partitioner) override;
-
-private:
- void _do_hash(const ColumnPtr& column, uint64_t* __restrict result, int
idx) const override;
-};
-
-template <typename ChannelIds>
-class Crc32HashPartitioner final : public Partitioner<uint32_t, ChannelIds> {
-public:
- using Base = Partitioner<uint32_t, ChannelIds>;
- Crc32HashPartitioner(int partition_count)
- : Partitioner<uint32_t, ChannelIds>(partition_count) {}
- ~Crc32HashPartitioner() override = default;
-
- Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>&
partitioner) override;
-
-private:
- void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int
idx) const override;
+struct ShuffleChannelIds {
+ template <typename HashValueType>
+ HashValueType operator()(HashValueType l, size_t r) {
+ return l % r;
+ }
};
struct SpillPartitionChannelIds {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 9c9a11d60aa..9e437afa8e7 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -54,8 +54,7 @@
namespace doris::vectorized {
-template <typename Parent>
-Status Channel<Parent>::init(RuntimeState* state) {
+Status Channel::init(RuntimeState* state) {
if (_brpc_dest_addr.hostname.empty()) {
LOG(WARNING) << "there is no brpc destination address's hostname"
", maybe version is not compatible.";
@@ -64,9 +63,11 @@ Status Channel<Parent>::init(RuntimeState* state) {
if (state->query_options().__isset.enable_local_exchange) {
_is_local &= state->query_options().enable_local_exchange;
}
+
if (_is_local) {
return Status::OK();
}
+
if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
_brpc_stub =
state->exec_env()->brpc_internal_client_cache()->get_client(
"127.0.0.1", _brpc_dest_addr.port);
@@ -83,8 +84,7 @@ Status Channel<Parent>::init(RuntimeState* state) {
return Status::OK();
}
-template <typename Parent>
-Status Channel<Parent>::open(RuntimeState* state) {
+Status Channel::open(RuntimeState* state) {
if (_is_local) {
auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr(
_fragment_instance_id, _dest_node_id, &_local_recvr);
@@ -94,19 +94,6 @@ Status Channel<Parent>::open(RuntimeState* state) {
}
}
_be_number = state->be_number();
- _brpc_request = std::make_shared<PTransmitDataParams>();
- // initialize brpc request
- _brpc_request->mutable_finst_id()->set_hi(_fragment_instance_id.hi);
- _brpc_request->mutable_finst_id()->set_lo(_fragment_instance_id.lo);
- _finst_id = _brpc_request->finst_id();
-
- _brpc_request->mutable_query_id()->set_hi(state->query_id().hi);
- _brpc_request->mutable_query_id()->set_lo(state->query_id().lo);
- _query_id = _brpc_request->query_id();
-
- _brpc_request->set_node_id(_dest_node_id);
- _brpc_request->set_sender_id(_parent->sender_id());
- _brpc_request->set_be_number(_be_number);
const auto& query_options = state->query_options();
if (query_options.__isset.query_timeout) {
@@ -121,28 +108,26 @@ Status Channel<Parent>::open(RuntimeState* state) {
// to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
// so the empty channel not need call function close_internal()
_need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo
!= -1);
+
_state = state;
return Status::OK();
}
-std::shared_ptr<pipeline::Dependency>
PipChannel::get_local_channel_dependency() {
- if (!Channel<pipeline::ExchangeSinkLocalState>::_local_recvr) {
+std::shared_ptr<pipeline::Dependency> Channel::get_local_channel_dependency() {
+ if (!_local_recvr) {
return nullptr;
}
- return
Channel<pipeline::ExchangeSinkLocalState>::_local_recvr->get_local_channel_dependency(
- Channel<pipeline::ExchangeSinkLocalState>::_parent->sender_id());
+ return _local_recvr->get_local_channel_dependency(_parent->sender_id());
}
-int64_t PipChannel::mem_usage() const {
- auto* mutable_block =
Channel<pipeline::ExchangeSinkLocalState>::_serializer.get_block();
+int64_t Channel::mem_usage() const {
+ auto* mutable_block = _serializer.get_block();
int64_t mem_usage = mutable_block ? mutable_block->allocated_bytes() : 0;
return mem_usage;
}
-Status PipChannel::send_remote_block(PBlock* block, bool eos, Status
exec_status) {
-
COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(),
1);
- std::unique_ptr<PBlock> pblock_ptr;
- pblock_ptr.reset(block);
+Status Channel::send_remote_block(std::unique_ptr<PBlock>&& block, bool eos) {
+ COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
if (eos) {
if (_eos_send) {
@@ -152,13 +137,13 @@ Status PipChannel::send_remote_block(PBlock* block, bool
eos, Status exec_status
}
}
if (eos || block->column_metas_size()) {
- RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock_ptr), eos,
exec_status}));
+ RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos,
Status::OK()}));
}
return Status::OK();
}
-Status
PipChannel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
bool eos) {
-
COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(),
1);
+Status Channel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>&
block, bool eos) {
+ COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
if (eos) {
if (_eos_send) {
return Status::OK();
@@ -171,210 +156,88 @@ Status
PipChannel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>&
return Status::OK();
}
-Status PipChannel::send_current_block(bool eos, Status exec_status) {
- if (Channel<pipeline::ExchangeSinkLocalState>::is_local()) {
- return
Channel<pipeline::ExchangeSinkLocalState>::send_local_block(exec_status, eos);
+Status Channel::_send_current_block(bool eos) {
+ if (is_local()) {
+ return _send_local_block(eos);
}
- RETURN_IF_ERROR(send_remote_block(_pblock.release(), eos, exec_status));
- return Status::OK();
+ return send_remote_block(std::move(_pblock), eos);
}
-template <typename Parent>
-Status Channel<Parent>::send_current_block(bool eos, Status exec_status) {
- // FIXME: Now, local exchange will cause the performance problem is in a
multi-threaded scenario
- // so this feature is turned off here by default. We need to re-examine
this logic
- if (is_local()) {
- return send_local_block(exec_status, eos);
+Status Channel::_send_local_block(bool eos) {
+ Block block;
+ if (_serializer.get_block() != nullptr) {
+ block = _serializer.get_block()->to_block();
+
_serializer.get_block()->set_mutable_columns(block.clone_empty_columns());
}
- if (eos) {
- RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1));
+
+ if (!block.empty() || eos) {
+ RETURN_IF_ERROR(send_local_block(&block, eos, true));
}
- RETURN_IF_ERROR(send_remote_block(_ch_cur_pb_block, eos, exec_status));
- ch_roll_pb_block();
return Status::OK();
}
-template <typename Parent>
-Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
+Status Channel::send_local_block(Block* block, bool eos, bool can_be_moved) {
SCOPED_TIMER(_parent->local_send_timer());
- Block block = _serializer.get_block()->to_block();
- _serializer.get_block()->set_mutable_columns(block.clone_empty_columns());
- if (_recvr_is_valid()) {
- if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState,
Parent>) {
- COUNTER_UPDATE(_parent->local_bytes_send_counter(), block.bytes());
- COUNTER_UPDATE(_parent->local_sent_rows(), block.rows());
- COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
- }
- _local_recvr->add_block(&block, _parent->sender_id(), true);
- if (eos) {
- _local_recvr->remove_sender(_parent->sender_id(), _be_number,
exec_status);
+ if (eos) {
+ if (_eos_send) {
+ return Status::OK();
+ } else {
+ _eos_send = true;
}
- return Status::OK();
- } else {
- _serializer.reset_block();
- return _receiver_status;
}
-}
-template <typename Parent>
-Status Channel<Parent>::send_local_block(Block* block, bool can_be_moved) {
- SCOPED_TIMER(_parent->local_send_timer());
- if (_recvr_is_valid()) {
- if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState,
Parent>) {
- COUNTER_UPDATE(_parent->local_bytes_send_counter(),
block->bytes());
- COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
- COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
- }
- _local_recvr->add_block(block, _parent->sender_id(), can_be_moved);
- return Status::OK();
- } else {
+ if (is_receiver_eof()) {
return _receiver_status;
}
-}
-template <typename Parent>
-Status Channel<Parent>::send_remote_block(PBlock* block, bool eos, Status
exec_status) {
- if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>)
{
+ auto receiver_status = _recvr_status();
+ if (receiver_status.ok()) {
+ COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
+ COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
- }
- SCOPED_TIMER(_parent->brpc_send_timer());
- if (_send_remote_block_callback == nullptr) {
- _send_remote_block_callback =
DummyBrpcCallback<PTransmitDataResult>::create_shared();
- } else {
- RETURN_IF_ERROR(_wait_last_brpc());
- _send_remote_block_callback->cntl_->Reset();
- }
- VLOG_ROW << "Channel<Parent>::send_batch() instance_id=" <<
print_id(_fragment_instance_id)
- << " dest_node=" << _dest_node_id << " to_host=" <<
_brpc_dest_addr.hostname
- << " _packet_seq=" << _packet_seq << " row_desc=" <<
_row_desc.debug_string();
-
- _brpc_request->set_eos(eos);
- if (!exec_status.ok()) {
- exec_status.to_protobuf(_brpc_request->mutable_exec_status());
- }
- if (block != nullptr && !block->column_metas().empty()) {
- _brpc_request->set_allocated_block(block);
- }
- _brpc_request->set_packet_seq(_packet_seq++);
-
- _send_remote_block_callback->cntl_->set_timeout_ms(_brpc_timeout_ms);
- if (config::exchange_sink_ignore_eovercrowded) {
- _send_remote_block_callback->cntl_->ignore_eovercrowded();
- }
-
- {
- auto send_remote_block_closure =
- AutoReleaseClosure<PTransmitDataParams,
DummyBrpcCallback<PTransmitDataResult>>::
- create_unique(_brpc_request,
_send_remote_block_callback);
- if (enable_http_send_block(*_brpc_request)) {
- RETURN_IF_ERROR(transmit_block_httpv2(
- _state->exec_env(), std::move(send_remote_block_closure),
_brpc_dest_addr));
- } else {
- transmit_blockv2(*_brpc_stub,
std::move(send_remote_block_closure));
+ const auto sender_id = _parent->sender_id();
+ if (!block->empty()) [[likely]] {
+ _local_recvr->add_block(block, sender_id, can_be_moved);
}
- }
-
- if (block != nullptr) {
- static_cast<void>(_brpc_request->release_block());
- }
- return Status::OK();
-}
-template <typename Parent>
-Status Channel<Parent>::add_rows(Block* block, const std::vector<uint32_t>&
rows, bool eos) {
- if (_fragment_instance_id.lo == -1) {
+ if (eos) [[unlikely]] {
+ _local_recvr->remove_sender(sender_id, _be_number, Status::OK());
+ _parent->on_channel_finished(_fragment_instance_id.lo);
+ }
return Status::OK();
+ } else {
+ _receiver_status = std::move(receiver_status);
+ _parent->on_channel_finished(_fragment_instance_id.lo);
+ return _receiver_status;
}
-
- bool serialized = false;
- RETURN_IF_ERROR(
- _serializer.next_serialized_block(block, _ch_cur_pb_block, 1,
&serialized, eos, &rows));
- if (serialized) {
- RETURN_IF_ERROR(send_current_block(false, Status::OK()));
- }
-
- return Status::OK();
}
-template <typename Parent>
-Status Channel<Parent>::close_wait(RuntimeState* state) {
- if (_need_close) {
- Status st = _wait_last_brpc();
- if (st.is<ErrorCode::END_OF_FILE>()) {
- st = Status::OK();
- } else if (!st.ok()) {
- state->log_error(st.to_string());
- }
- _need_close = false;
- return st;
+Status Channel::close(RuntimeState* state) {
+ if (_closed) {
+ return Status::OK();
}
- _serializer.reset_block();
- return Status::OK();
-}
+ _closed = true;
-template <typename Parent>
-Status Channel<Parent>::close_internal(Status exec_status) {
if (!_need_close) {
return Status::OK();
}
- VLOG_RPC << "Channel::close_internal() instance_id=" <<
print_id(_fragment_instance_id)
- << " dest_node=" << _dest_node_id << " #rows= "
- << ((_serializer.get_block() == nullptr) ? 0 :
_serializer.get_block()->rows())
- << " receiver status: " << _receiver_status << ", exec_status: "
<< exec_status;
+
if (is_receiver_eof()) {
_serializer.reset_block();
return Status::OK();
- }
- Status status;
- if (_serializer.get_block() != nullptr && _serializer.get_block()->rows()
> 0) {
- status = send_current_block(true, exec_status);
- } else {
- if (is_local()) {
- if (_recvr_is_valid()) {
- _local_recvr->remove_sender(_parent->sender_id(), _be_number,
exec_status);
- }
- } else {
- // Non pipeline engine will send an empty eos block
- status = send_remote_block((PBlock*)nullptr, true, exec_status);
- }
- }
- // Don't wait for the last packet to finish, left it to close_wait.
- if (status.is<ErrorCode::END_OF_FILE>()) {
- return Status::OK();
} else {
- return status;
+ return _send_current_block(true);
}
}
-template <typename Parent>
-Status Channel<Parent>::close(RuntimeState* state, Status exec_status) {
- if (_closed) {
- return Status::OK();
- }
- _closed = true;
-
- Status st = close_internal(exec_status);
- if (!st.ok()) {
- state->log_error(st.to_string());
- }
- return st;
-}
-
-template <typename Parent>
-void Channel<Parent>::ch_roll_pb_block() {
- _ch_cur_pb_block = (_ch_cur_pb_block == &_ch_pb_block1 ? &_ch_pb_block2 :
&_ch_pb_block1);
-}
-
-template <typename Parent>
-BlockSerializer<Parent>::BlockSerializer(Parent* parent, bool is_local)
+BlockSerializer::BlockSerializer(pipeline::ExchangeSinkLocalState* parent,
bool is_local)
: _parent(parent), _is_local(is_local),
_batch_size(parent->state()->batch_size()) {}
-template <typename Parent>
-Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock*
dest, int num_receivers,
- bool* serialized, bool
eos,
- const
std::vector<uint32_t>* rows) {
+Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int
num_receivers,
+ bool* serialized, bool eos,
+ const std::vector<uint32_t>*
rows) {
if (_mutable_block == nullptr) {
_mutable_block = MutableBlock::create_unique(block->clone_empty());
}
@@ -403,8 +266,7 @@ Status
BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest
return Status::OK();
}
-template <typename Parent>
-Status BlockSerializer<Parent>::serialize_block(PBlock* dest, int
num_receivers) {
+Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) {
if (_mutable_block && _mutable_block->rows() > 0) {
auto block = _mutable_block->to_block();
RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers));
@@ -415,29 +277,20 @@ Status BlockSerializer<Parent>::serialize_block(PBlock*
dest, int num_receivers)
return Status::OK();
}
-template <typename Parent>
-Status BlockSerializer<Parent>::serialize_block(const Block* src, PBlock*
dest, int num_receivers) {
- if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>)
{
- SCOPED_TIMER(_parent->_serialize_batch_timer);
- dest->Clear();
- size_t uncompressed_bytes = 0, compressed_bytes = 0;
- RETURN_IF_ERROR(src->serialize(
- _parent->_state->be_exec_version(), dest, &uncompressed_bytes,
&compressed_bytes,
- _parent->compression_type(),
_parent->transfer_large_data_by_brpc()));
- COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes *
num_receivers);
- COUNTER_UPDATE(_parent->_uncompressed_bytes_counter,
uncompressed_bytes * num_receivers);
- COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
-
_parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes *
-
num_receivers);
- _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows()
* num_receivers);
- }
+Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, int
num_receivers) {
+ SCOPED_TIMER(_parent->_serialize_batch_timer);
+ dest->Clear();
+ size_t uncompressed_bytes = 0, compressed_bytes = 0;
+ RETURN_IF_ERROR(src->serialize(_parent->_state->be_exec_version(), dest,
&uncompressed_bytes,
+ &compressed_bytes,
_parent->compression_type(),
+ _parent->transfer_large_data_by_brpc()));
+ COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes *
num_receivers);
+ COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes *
num_receivers);
+ COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
+
_parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes *
num_receivers);
+ _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows() *
num_receivers);
return Status::OK();
}
-template class Channel<pipeline::ExchangeSinkLocalState>;
-template class Channel<pipeline::ResultFileSinkLocalState>;
-template class BlockSerializer<pipeline::ResultFileSinkLocalState>;
-template class BlockSerializer<pipeline::ExchangeSinkLocalState>;
-
} // namespace doris::vectorized
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 43d00b0164a..da0ee22ac14 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -72,13 +72,10 @@ class ExchangeSinkLocalState;
} // namespace pipeline
namespace vectorized {
-template <typename>
-class Channel;
-template <typename Parent>
class BlockSerializer {
public:
- BlockSerializer(Parent* parent, bool is_local = true);
+ BlockSerializer(pipeline::ExchangeSinkLocalState* parent, bool is_local =
true);
Status next_serialized_block(Block* src, PBlock* dest, int num_receivers,
bool* serialized,
bool eos, const std::vector<uint32_t>* rows =
nullptr);
Status serialize_block(PBlock* dest, int num_receivers = 1);
@@ -91,21 +88,13 @@ public:
void set_is_local(bool is_local) { _is_local = is_local; }
private:
- Parent* _parent;
+ pipeline::ExchangeSinkLocalState* _parent;
std::unique_ptr<MutableBlock> _mutable_block;
bool _is_local;
const int _batch_size;
};
-struct ShuffleChannelIds {
- template <typename HashValueType>
- HashValueType operator()(HashValueType l, size_t r) {
- return l % r;
- }
-};
-
-template <typename Parent>
class Channel {
public:
friend class pipeline::ExchangeSinkBuffer;
@@ -113,23 +102,15 @@ public:
// combination. buffer_size is specified in bytes and a soft limit on
// how much tuple data is getting accumulated before being sent; it only
applies
// when data is added via add_row() and not sent directly via send_batch().
- Channel(Parent* parent, const RowDescriptor& row_desc, TNetworkAddress
brpc_dest,
+ Channel(pipeline::ExchangeSinkLocalState* parent, TNetworkAddress
brpc_dest,
TUniqueId fragment_instance_id, PlanNodeId dest_node_id)
: _parent(parent),
- _row_desc(row_desc),
_fragment_instance_id(std::move(fragment_instance_id)),
_dest_node_id(dest_node_id),
- _need_close(false),
- _closed(false),
_brpc_dest_addr(std::move(brpc_dest)),
_is_local((_brpc_dest_addr.hostname ==
BackendOptions::get_localhost()) &&
(_brpc_dest_addr.port == config::brpc_port)),
- _serializer(_parent, _is_local) {
- if (_is_local) {
- VLOG_NOTICE << "will use local Exchange, dest_node_id is : " <<
_dest_node_id;
- }
- _ch_cur_pb_block = &_ch_pb_block1;
- }
+ _serializer(_parent, _is_local) {}
virtual ~Channel() = default;
@@ -138,37 +119,12 @@ public:
Status init(RuntimeState* state);
Status open(RuntimeState* state);
- // Asynchronously sends a row batch.
- // Returns the status of the most recently finished transmit_data
- // rpc (or OK if there wasn't one that hasn't been reported yet).
- // if batch is nullptr, send the eof packet
- virtual Status send_remote_block(PBlock* block, bool eos = false,
- Status exec_status = Status::OK());
-
- virtual Status
send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
- bool eos = false) {
- return Status::InternalError("Send BroadcastPBlockHolder is not
allowed!");
- }
-
- virtual Status add_rows(Block* block, const std::vector<uint32_t>& row,
bool eos);
-
- virtual Status send_current_block(bool eos, Status exec_status);
-
- Status send_local_block(Status exec_status, bool eos = false);
-
- Status send_local_block(Block* block, bool can_be_moved);
+ Status send_local_block(Block* block, bool eos, bool can_be_moved);
// Flush buffered rows and close channel. This function don't wait the
response
// of close operation, client should call close_wait() to finish channel's
close.
// We split one close operation into two phases in order to make multiple
channels
// can run parallel.
- Status close(RuntimeState* state, Status exec_status);
-
- // Get close wait's response, to finish channel close operation.
- Status close_wait(RuntimeState* state);
-
- int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; }
-
- PBlock* ch_cur_pb_block() { return _ch_cur_pb_block; }
+ Status close(RuntimeState* state);
std::string get_fragment_instance_id_str() {
UniqueId uid(_fragment_instance_id);
@@ -177,151 +133,40 @@ public:
bool is_local() const { return _is_local; }
- virtual void ch_roll_pb_block();
-
bool is_receiver_eof() const { return
_receiver_status.is<ErrorCode::END_OF_FILE>(); }
void set_receiver_eof(Status st) { _receiver_status = st; }
-protected:
- bool _recvr_is_valid() {
- if (_local_recvr && !_local_recvr->is_closed()) {
- return true;
- }
- _receiver_status = Status::EndOfFile(
- "local data stream receiver closed"); // local data stream
receiver closed
- return false;
- }
-
- Status _wait_last_brpc() {
- SCOPED_TIMER(_parent->brpc_wait_timer());
- if (_send_remote_block_callback == nullptr) {
- return Status::OK();
- }
- _send_remote_block_callback->join();
- if (_send_remote_block_callback->cntl_->Failed()) {
- std::string err = fmt::format(
- "failed to send brpc batch, error={}, error_text={},
client: {}, "
- "latency = {}",
- berror(_send_remote_block_callback->cntl_->ErrorCode()),
- _send_remote_block_callback->cntl_->ErrorText(),
- BackendOptions::get_localhost(),
- _send_remote_block_callback->cntl_->latency_us());
- LOG(WARNING) << err;
- return Status::RpcError(err);
- }
- _receiver_status =
Status::create(_send_remote_block_callback->response_->status());
- return _receiver_status;
- }
-
- Status close_internal(Status exec_status);
-
- Parent* _parent = nullptr;
-
- const RowDescriptor& _row_desc;
- const TUniqueId _fragment_instance_id;
- PlanNodeId _dest_node_id;
-
- // the number of RowBatch.data bytes sent successfully
- int64_t _num_data_bytes_sent {};
- int64_t _packet_seq {};
-
- bool _need_close;
- bool _closed;
- int _be_number;
-
- TNetworkAddress _brpc_dest_addr;
-
- PUniqueId _finst_id;
- PUniqueId _query_id;
- PBlock _pb_block;
- std::shared_ptr<PTransmitDataParams> _brpc_request = nullptr;
- std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
- std::shared_ptr<DummyBrpcCallback<PTransmitDataResult>>
_send_remote_block_callback;
- Status _receiver_status;
- int32_t _brpc_timeout_ms = 500;
- RuntimeState* _state = nullptr;
-
- bool _is_local;
- std::shared_ptr<VDataStreamRecvr> _local_recvr;
- // serialized blocks for broadcasting; we need two so we can write
- // one while the other one is still being sent.
- // Which is for same reason as `_cur_pb_block`, `_pb_block1` and
`_pb_block2`
- // in VDataStreamSender.
- PBlock* _ch_cur_pb_block = nullptr;
- PBlock _ch_pb_block1;
- PBlock _ch_pb_block2;
-
- BlockSerializer<Parent> _serializer;
-};
-
-#define HANDLE_CHANNEL_STATUS(state, channel, status) \
- do { \
- if (status.is<ErrorCode::END_OF_FILE>()) { \
- _handle_eof_channel(state, channel, status); \
- } else { \
- RETURN_IF_ERROR(status); \
- } \
- } while (0)
-
-class PipChannel final : public Channel<pipeline::ExchangeSinkLocalState> {
-public:
- PipChannel(pipeline::ExchangeSinkLocalState* parent, const RowDescriptor&
row_desc,
- const TNetworkAddress& brpc_dest, const TUniqueId&
fragment_instance_id,
- PlanNodeId dest_node_id)
- : Channel<pipeline::ExchangeSinkLocalState>(parent, row_desc,
brpc_dest,
- fragment_instance_id,
dest_node_id) {
- ch_roll_pb_block();
- }
-
- ~PipChannel() override { delete
Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; }
-
int64_t mem_usage() const;
- void ch_roll_pb_block() override {
- // We have two choices here.
- // 1. Use a PBlock pool and fetch an available PBlock if we need one.
In this way, we can
- // reuse the memory, but we have to use a lock to synchronize.
- // 2. Create a new PBlock every time. In this way we don't need a lock
but have to allocate
- // new memory.
- // Now we use the second way.
- Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block = new
PBlock();
- }
-
// Asynchronously sends a block
// Returns the status of the most recently finished transmit_data
// rpc (or OK if there wasn't one that hasn't been reported yet).
// if batch is nullptr, send the eof packet
- Status send_remote_block(PBlock* block, bool eos = false,
- Status exec_status = Status::OK()) override;
+ Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos =
false);
+ Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
bool eos = false);
- Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
- bool eos = false) override;
-
- Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos)
override {
- if
(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id.lo == -1) {
+ Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos)
{
+ if (_fragment_instance_id.lo == -1) {
return Status::OK();
}
bool serialized = false;
- _pblock = std::make_unique<PBlock>();
- RETURN_IF_ERROR(
-
Channel<pipeline::ExchangeSinkLocalState>::_serializer.next_serialized_block(
- block, _pblock.get(), 1, &serialized, eos, &rows));
+ if (_pblock == nullptr) {
+ _pblock = std::make_unique<PBlock>();
+ }
+ RETURN_IF_ERROR(_serializer.next_serialized_block(block,
_pblock.get(), 1, &serialized, eos,
+ &rows));
if (serialized) {
- Status exec_status = Status::OK();
- RETURN_IF_ERROR(send_current_block(eos, exec_status));
+ RETURN_IF_ERROR(_send_current_block(eos));
}
return Status::OK();
}
- // send _mutable_block
- Status send_current_block(bool eos, Status exec_status) override;
-
void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
_buffer = buffer;
-
_buffer->register_sink(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id);
+ _buffer->register_sink(_fragment_instance_id);
}
std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>>
get_send_callback(
@@ -337,12 +182,53 @@ public:
std::shared_ptr<pipeline::Dependency> get_local_channel_dependency();
-private:
+protected:
+ Status _send_local_block(bool eos);
+ Status _send_current_block(bool eos);
+
+ Status _recvr_status() const {
+ if (_local_recvr && !_local_recvr->is_closed()) {
+ return Status::OK();
+ }
+ return Status::EndOfFile(
+ "local data stream receiver closed"); // local data stream
receiver closed
+ }
+
+ pipeline::ExchangeSinkLocalState* _parent = nullptr;
+
+ const TUniqueId _fragment_instance_id;
+ PlanNodeId _dest_node_id;
+ bool _closed {false};
+ bool _need_close {false};
+ int _be_number;
+
+ TNetworkAddress _brpc_dest_addr;
+
+ PBlock _pb_block;
+ std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
+ Status _receiver_status;
+ int32_t _brpc_timeout_ms = 500;
+ RuntimeState* _state = nullptr;
+
+ bool _is_local;
+ std::shared_ptr<VDataStreamRecvr> _local_recvr;
+
+ BlockSerializer _serializer;
+
pipeline::ExchangeSinkBuffer* _buffer = nullptr;
bool _eos_send = false;
std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>>
_send_callback;
std::unique_ptr<PBlock> _pblock;
};
+#define HANDLE_CHANNEL_STATUS(state, channel, status) \
+ do { \
+ if (status.is<ErrorCode::END_OF_FILE>()) { \
+ _handle_eof_channel(state, channel, status); \
+ } else { \
+ RETURN_IF_ERROR(status); \
+ } \
+ } while (0)
+
} // namespace vectorized
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]