This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e41ffc7cb51 [Refactor](Exec) Support one rpc send muti blocks (#50113)
e41ffc7cb51 is described below
commit e41ffc7cb51e5336c730e6e9e4d6b9c80c353c5b
Author: HappenLee <[email protected]>
AuthorDate: Tue Apr 29 10:35:17 2025 +0800
[Refactor](Exec) Support one rpc send muti blocks (#50113)
1. Rmove unless channel ptr in trans struct to reduce mem consume
2. Send multi block one time in one rpc
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 256 +++++++++++++++------
be/src/pipeline/exec/exchange_sink_buffer.h | 15 +-
be/src/vec/runtime/vdata_stream_mgr.cpp | 12 +
be/src/vec/sink/vdata_stream_sender.cpp | 4 +-
be/test/vec/exec/exchange_sink_test.h | 5 +-
.../java/org/apache/doris/qe/SessionVariable.java | 13 ++
gensrc/proto/internal_service.proto | 1 +
gensrc/thrift/PaloInternalService.thrift | 1 +
8 files changed, 220 insertions(+), 87 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 95a5a00f68a..fc28f8b115c 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -100,7 +100,13 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id,
PlanNodeId dest_node_
_node_id(node_id),
_state(state),
_context(state->get_query_ctx()),
- _exchange_sink_num(sender_ins_ids.size()) {}
+ _exchange_sink_num(sender_ins_ids.size()),
+
_send_multi_blocks(state->query_options().__isset.exchange_multi_blocks_byte_size
&&
+
state->query_options().exchange_multi_blocks_byte_size > 0) {
+ if (_send_multi_blocks) {
+ _send_multi_blocks_byte_size =
state->query_options().exchange_multi_blocks_byte_size;
+ }
+}
void ExchangeSinkBuffer::close() {
// Could not clear the queue here, because there maybe a running rpc want
to
@@ -124,9 +130,12 @@ void ExchangeSinkBuffer::construct_request(TUniqueId
fragment_instance_id) {
auto instance_data = std::make_unique<RpcInstance>(low_id);
instance_data->mutex = std::make_unique<std::mutex>();
instance_data->seq = 0;
- instance_data->package_queue = std::queue<TransmitInfo,
std::list<TransmitInfo>>();
- instance_data->broadcast_package_queue =
- std::queue<BroadcastTransmitInfo,
std::list<BroadcastTransmitInfo>>();
+ instance_data->package_queue =
+ std::unordered_map<vectorized::Channel*,
+ std::queue<TransmitInfo,
std::list<TransmitInfo>>>();
+ instance_data->broadcast_package_queue = std::unordered_map<
+ vectorized::Channel*,
+ std::queue<BroadcastTransmitInfo,
std::list<BroadcastTransmitInfo>>>();
_queue_capacity = config::exchg_buffer_queue_capacity_factor *
_rpc_instances.size();
PUniqueId finst_id;
@@ -146,14 +155,14 @@ void ExchangeSinkBuffer::construct_request(TUniqueId
fragment_instance_id) {
_rpc_instances[low_id] = std::move(instance_data);
}
-Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
+Status ExchangeSinkBuffer::add_block(vectorized::Channel* channel,
TransmitInfo&& request) {
if (_is_failed) {
return Status::OK();
}
- auto ins_id = request.channel->dest_ins_id();
+ auto ins_id = channel->dest_ins_id();
if (!_rpc_instances.contains(ins_id)) {
return Status::InternalError("fragment_instance_id {} not do
register_sink",
-
print_id(request.channel->_fragment_instance_id));
+ print_id(channel->_fragment_instance_id));
}
auto& instance_data = *_rpc_instances[ins_id];
if (instance_data.rpc_channel_is_turn_off) {
@@ -170,10 +179,9 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&&
request) {
if (request.block) {
RETURN_IF_ERROR(
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
- COUNTER_UPDATE(request.channel->_parent->memory_used_counter(),
- request.block->ByteSizeLong());
+ COUNTER_UPDATE(channel->_parent->memory_used_counter(),
request.block->ByteSizeLong());
}
- instance_data.package_queue.emplace(std::move(request));
+ instance_data.package_queue[channel].emplace(std::move(request));
_total_queue_size++;
if (_total_queue_size > _queue_capacity) {
for (auto& dep : _queue_deps) {
@@ -188,14 +196,15 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&&
request) {
return Status::OK();
}
-Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
+Status ExchangeSinkBuffer::add_block(vectorized::Channel* channel,
+ BroadcastTransmitInfo&& request) {
if (_is_failed) {
return Status::OK();
}
- auto ins_id = request.channel->dest_ins_id();
+ auto ins_id = channel->dest_ins_id();
if (!_rpc_instances.contains(ins_id)) {
return Status::InternalError("fragment_instance_id {} not do
register_sink",
-
print_id(request.channel->_fragment_instance_id));
+ print_id(channel->_fragment_instance_id));
}
auto& instance_data = *_rpc_instances[ins_id];
if (instance_data.rpc_channel_is_turn_off) {
@@ -213,7 +222,7 @@ Status
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
request.block_holder->get_block()->be_exec_version()));
}
- instance_data.broadcast_package_queue.emplace(request);
+ instance_data.broadcast_package_queue[channel].emplace(request);
}
if (send_now) {
RETURN_IF_ERROR(_send_rpc(instance_data));
@@ -225,9 +234,31 @@ Status
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
std::unique_lock<std::mutex> lock(*(instance_data.mutex));
- std::queue<TransmitInfo, std::list<TransmitInfo>>& q =
instance_data.package_queue;
- std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>&
broadcast_q =
- instance_data.broadcast_package_queue;
+ auto& q_map = instance_data.package_queue;
+ auto& broadcast_q_map = instance_data.broadcast_package_queue;
+
+ auto find_max_size_queue = [](vectorized::Channel*& channel, auto& ptr,
auto& map) {
+ for (auto& [chan, lists] : map) {
+ if (!ptr) {
+ if (!lists.empty()) {
+ channel = chan;
+ ptr = &lists;
+ }
+ } else {
+ if (ptr->size() < lists.size()) {
+ channel = chan;
+ ptr = &lists;
+ }
+ }
+ }
+ };
+
+ vectorized::Channel* channel = nullptr;
+
+ std::queue<TransmitInfo, std::list<TransmitInfo>>* q_ptr = nullptr;
+ find_max_size_queue(channel, q_ptr, q_map);
+ std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>*
broadcast_q_ptr = nullptr;
+ find_max_size_queue(channel, broadcast_q_ptr, broadcast_q_map);
if (_is_failed) {
_turn_off_channel(instance_data, lock);
@@ -237,20 +268,49 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance&
instance_data) {
return Status::OK();
}
- if (!q.empty()) {
+ auto mem_byte = 0;
+ if (q_ptr && !q_ptr->empty()) {
+ auto& q = *q_ptr;
+
+ std::vector<TransmitInfo> requests(_send_multi_blocks ? q.size() : 1);
+ for (int i = 0; i < requests.size(); i++) {
+ requests[i] = std::move(q.front());
+ q.pop();
+
+ if (requests[i].block) {
+ // make sure rpc byte size under the
_send_multi_blocks_bytes_size
+ mem_byte += requests[i].block->ByteSizeLong();
+ if (_send_multi_blocks && mem_byte >
_send_multi_blocks_byte_size) {
+ requests.resize(i + 1);
+ break;
+ }
+ }
+ }
+
// If we have data to shuffle which is not broadcasted
- auto& request = q.front();
+ auto& request = requests[0];
auto& brpc_request = instance_data.request;
- brpc_request->set_eos(request.eos);
- brpc_request->set_packet_seq(instance_data.seq++);
- brpc_request->set_sender_id(request.channel->_parent->sender_id());
- brpc_request->set_be_number(request.channel->_parent->be_number());
- if (request.block && !request.block->column_metas().empty()) {
- brpc_request->set_allocated_block(request.block.get());
+ brpc_request->set_sender_id(channel->_parent->sender_id());
+ brpc_request->set_be_number(channel->_parent->be_number());
+
+ if (_send_multi_blocks) {
+ for (auto& req : requests) {
+ if (req.block && !req.block->column_metas().empty()) {
+ auto add_block = brpc_request->add_blocks();
+ add_block->Swap(req.block.get());
+ }
+ }
+ } else {
+ if (request.block && !request.block->column_metas().empty()) {
+ brpc_request->set_allocated_block(request.block.get());
+ }
}
- auto send_callback =
request.channel->get_send_callback(&instance_data, request.eos);
-
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
+ instance_data.seq += requests.size();
+ brpc_request->set_packet_seq(instance_data.seq);
+ brpc_request->set_eos(requests.back().eos);
+ auto send_callback = channel->get_send_callback(&instance_data,
requests.back().eos);
+ send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms);
if (config::execution_ignore_eovercrowded) {
send_callback->cntl_->ignore_eovercrowded();
}
@@ -308,38 +368,79 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance&
instance_data) {
if (enable_http_send_block(*brpc_request)) {
RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
std::move(send_remote_block_closure),
-
request.channel->_brpc_dest_addr));
+
channel->_brpc_dest_addr));
} else {
- transmit_blockv2(*request.channel->_brpc_stub,
- std::move(send_remote_block_closure));
+ transmit_blockv2(*channel->_brpc_stub,
std::move(send_remote_block_closure));
}
}
- if (request.block) {
- COUNTER_UPDATE(request.channel->_parent->memory_used_counter(),
- -request.block->ByteSizeLong());
+
+ if (!_send_multi_blocks && request.block) {
static_cast<void>(brpc_request->release_block());
+ } else {
+ brpc_request->clear_blocks();
}
- q.pop();
- _total_queue_size--;
+ if (mem_byte) {
+ COUNTER_UPDATE(channel->_parent->memory_used_counter(), -mem_byte);
+ }
+ DCHECK_GE(_total_queue_size, requests.size());
+ _total_queue_size -= (int)requests.size();
if (_total_queue_size <= _queue_capacity) {
for (auto& dep : _queue_deps) {
dep->set_ready();
}
}
- } else if (!broadcast_q.empty()) {
+ } else if (broadcast_q_ptr && !broadcast_q_ptr->empty()) {
+ auto& broadcast_q = *broadcast_q_ptr;
// If we have data to shuffle which is broadcasted
- auto& request = broadcast_q.front();
+ std::vector<BroadcastTransmitInfo> requests(_send_multi_blocks ?
broadcast_q.size() : 1);
+ for (int i = 0; i < requests.size(); i++) {
+ requests[i] = broadcast_q.front();
+ broadcast_q.pop();
+
+ if (requests[i].block_holder->get_block()) {
+ // make sure rpc byte size under the
_send_multi_blocks_bytes_size
+ mem_byte +=
requests[i].block_holder->get_block()->ByteSizeLong();
+ if (_send_multi_blocks && mem_byte >
_send_multi_blocks_byte_size) {
+ requests.resize(i + 1);
+ break;
+ }
+ }
+ }
+
+ auto& request = requests[0];
auto& brpc_request = instance_data.request;
- brpc_request->set_eos(request.eos);
- brpc_request->set_packet_seq(instance_data.seq++);
- brpc_request->set_sender_id(request.channel->_parent->sender_id());
- brpc_request->set_be_number(request.channel->_parent->be_number());
- if (request.block_holder->get_block() &&
- !request.block_holder->get_block()->column_metas().empty()) {
-
brpc_request->set_allocated_block(request.block_holder->get_block());
+ brpc_request->set_sender_id(channel->_parent->sender_id());
+ brpc_request->set_be_number(channel->_parent->be_number());
+
+ if (_send_multi_blocks) {
+ for (int i = 0; i < requests.size(); i++) {
+ auto& req = requests[i];
+ if (auto block = req.block_holder->get_block();
+ block && !block->column_metas().empty()) {
+ auto add_block = brpc_request->add_blocks();
+ for (int j = 0; j < block->column_metas_size(); ++j) {
+
add_block->add_column_metas()->CopyFrom(block->column_metas(j));
+ }
+ add_block->set_be_exec_version(block->be_exec_version());
+ add_block->set_compressed(block->compressed());
+ add_block->set_compression_type(block->compression_type());
+
add_block->set_uncompressed_size(block->uncompressed_size());
+ add_block->set_allocated_column_values(
+ const_cast<std::string*>(&block->column_values()));
+ }
+ }
+ } else {
+ if (request.block_holder->get_block() &&
+ !request.block_holder->get_block()->column_metas().empty()) {
+
brpc_request->set_allocated_block(request.block_holder->get_block());
+ }
}
- auto send_callback =
request.channel->get_send_callback(&instance_data, request.eos);
-
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
+ instance_data.seq += requests.size();
+ brpc_request->set_packet_seq(instance_data.seq);
+ brpc_request->set_eos(requests.back().eos);
+ auto send_callback = channel->get_send_callback(&instance_data,
requests.back().eos);
+
+ send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms);
if (config::execution_ignore_eovercrowded) {
send_callback->cntl_->ignore_eovercrowded();
}
@@ -397,16 +498,19 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance&
instance_data) {
if (enable_http_send_block(*brpc_request)) {
RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
std::move(send_remote_block_closure),
-
request.channel->_brpc_dest_addr));
+
channel->_brpc_dest_addr));
} else {
- transmit_blockv2(*request.channel->_brpc_stub,
- std::move(send_remote_block_closure));
+ transmit_blockv2(*channel->_brpc_stub,
std::move(send_remote_block_closure));
}
}
- if (request.block_holder->get_block()) {
+ if (!_send_multi_blocks && request.block_holder->get_block()) {
static_cast<void>(brpc_request->release_block());
+ } else {
+ for (int i = 0; i < brpc_request->mutable_blocks()->size(); ++i) {
+
static_cast<void>(brpc_request->mutable_blocks(i)->release_column_values());
+ }
+ brpc_request->clear_blocks();
}
- broadcast_q.pop();
} else {
instance_data.rpc_channel_is_idle = true;
}
@@ -436,27 +540,27 @@ void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance&
ins) {
// and the rpc_channel should be turned off immediately.
Defer turn_off([&]() { _turn_off_channel(ins, lock); });
- std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>&
broadcast_q =
- ins.broadcast_package_queue;
- for (; !broadcast_q.empty(); broadcast_q.pop()) {
- if (broadcast_q.front().block_holder->get_block()) {
-
COUNTER_UPDATE(broadcast_q.front().channel->_parent->memory_used_counter(),
-
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
+ auto& broadcast_q_map = ins.broadcast_package_queue;
+ for (auto& [channel, broadcast_q] : broadcast_q_map) {
+ for (; !broadcast_q.empty(); broadcast_q.pop()) {
+ if (broadcast_q.front().block_holder->get_block()) {
+ COUNTER_UPDATE(channel->_parent->memory_used_counter(),
+
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
+ }
}
}
- {
- std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>
empty;
- swap(empty, broadcast_q);
- }
-
- std::queue<TransmitInfo, std::list<TransmitInfo>>& q = ins.package_queue;
- for (; !q.empty(); q.pop()) {
- // Must update _total_queue_size here, otherwise if _total_queue_size
> _queue_capacity at EOF,
- // ExchangeSinkQueueDependency will be blocked and pipeline will be
deadlocked
- _total_queue_size--;
- if (q.front().block) {
- COUNTER_UPDATE(q.front().channel->_parent->memory_used_counter(),
- -q.front().block->ByteSizeLong());
+ broadcast_q_map.clear();
+
+ auto& q_map = ins.package_queue;
+ for (auto& [channel, q] : q_map) {
+ for (; !q.empty(); q.pop()) {
+ // Must update _total_queue_size here, otherwise if
_total_queue_size > _queue_capacity at EOF,
+ // ExchangeSinkQueueDependency will be blocked and pipeline will
be deadlocked
+ _total_queue_size--;
+ if (q.front().block) {
+ COUNTER_UPDATE(channel->_parent->memory_used_counter(),
+ -q.front().block->ByteSizeLong());
+ }
}
}
@@ -467,10 +571,7 @@ void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance&
ins) {
}
}
- {
- std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
- swap(empty, q);
- }
+ q_map.clear();
}
// The unused parameter `with_lock` is to ensure that the function is called
when the lock is held.
@@ -582,8 +683,11 @@ std::string
ExchangeSinkBuffer::debug_each_instance_queue_size() {
fmt::memory_buffer debug_string_buffer;
for (auto& [id, instance_data] : _rpc_instances) {
std::unique_lock<std::mutex> lock(*instance_data->mutex);
- fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n",
id,
- instance_data->package_queue.size());
+ auto queue_size = 0;
+ for (auto& [_, list] : instance_data->package_queue) {
+ queue_size += list.size();
+ }
+ fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n",
id, queue_size);
}
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 9a00ef072d5..44416ef68e1 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -111,13 +111,11 @@ private:
namespace pipeline {
struct TransmitInfo {
- vectorized::Channel* channel = nullptr;
std::unique_ptr<PBlock> block;
bool eos;
};
struct BroadcastTransmitInfo {
- vectorized::Channel* channel = nullptr;
std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr;
bool eos;
};
@@ -144,10 +142,13 @@ struct RpcInstance {
int64_t seq = 0;
// Queue for regular data transmission requests
- std::queue<TransmitInfo, std::list<TransmitInfo>> package_queue;
+ std::unordered_map<vectorized::Channel*, std::queue<TransmitInfo,
std::list<TransmitInfo>>>
+ package_queue;
// Queue for broadcast data transmission requests
- std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>
broadcast_package_queue;
+ std::unordered_map<vectorized::Channel*,
+ std::queue<BroadcastTransmitInfo,
std::list<BroadcastTransmitInfo>>>
+ broadcast_package_queue;
// RPC request parameters for data transmission
std::shared_ptr<PTransmitDataParams> request;
@@ -275,8 +276,8 @@ public:
void construct_request(TUniqueId);
- Status add_block(TransmitInfo&& request);
- Status add_block(BroadcastTransmitInfo&& request);
+ Status add_block(vectorized::Channel* channel, TransmitInfo&& request);
+ Status add_block(vectorized::Channel* channel, BroadcastTransmitInfo&&
request);
void close();
void update_rpc_time(RpcInstance& ins, int64_t start_rpc_time, int64_t
receive_rpc_time);
void update_profile(RuntimeProfile* profile);
@@ -341,6 +342,8 @@ private:
// The ExchangeSinkLocalState in _parents is only used in
_turn_off_channel.
std::vector<ExchangeSinkLocalState*> _parents;
const int64_t _exchange_sink_num;
+ bool _send_multi_blocks = false;
+ int _send_multi_blocks_byte_size = 256 * 1024;
};
} // namespace pipeline
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 535f59c49e8..2a4f4e22861 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -146,6 +146,18 @@ Status VDataStreamMgr::transmit_block(const
PTransmitDataParams* request,
}
bool eos = request->eos();
+ if (!request->blocks().empty()) {
+ for (int i = 0; i < request->blocks_size(); i++) {
+ std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>();
+ pblock_ptr->Swap(const_cast<PBlock*>(&request->blocks(i)));
+ RETURN_IF_ERROR(recvr->add_block(
+ std::move(pblock_ptr), request->sender_id(),
request->be_number(),
+ request->packet_seq() - request->blocks_size() + i, eos ?
nullptr : done,
+ wait_for_worker, cpu_time_stop_watch.elapsed_time()));
+ }
+ }
+
+ // old logic, for compatibility
if (request->has_block()) {
std::unique_ptr<PBlock> pblock_ptr {
const_cast<PTransmitDataParams*>(request)->release_block()};
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 3f09d02f20f..0e9c5371a2b 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -174,7 +174,7 @@ Status Channel::send_remote_block(std::unique_ptr<PBlock>&&
block, bool eos) {
}
}
if (eos || block->column_metas_size()) {
- RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos}));
+ RETURN_IF_ERROR(_buffer->add_block(this, {std::move(block), eos}));
}
return Status::OK();
}
@@ -188,7 +188,7 @@ Status
Channel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& blo
_eos_send = true;
}
if (eos || block->get_block()->column_metas_size()) {
- RETURN_IF_ERROR(_buffer->add_block({this, block, eos}));
+ RETURN_IF_ERROR(_buffer->add_block(this, {block, eos}));
}
return Status::OK();
}
diff --git a/be/test/vec/exec/exchange_sink_test.h
b/be/test/vec/exec/exchange_sink_test.h
index 59004a53a9e..5cf0fdbed6a 100644
--- a/be/test/vec/exec/exchange_sink_test.h
+++ b/be/test/vec/exec/exchange_sink_test.h
@@ -138,9 +138,8 @@ struct SinkWithChannel {
std::map<int64_t, std::shared_ptr<Channel>> channels;
Status add_block(int64_t id, bool eos) {
auto channel = channels[id];
- TransmitInfo transmitInfo {
- .channel = channel.get(), .block = std::make_unique<PBlock>(),
.eos = eos};
- return buffer->add_block(std::move(transmitInfo));
+ TransmitInfo transmitInfo {.block = std::make_unique<PBlock>(), .eos =
eos};
+ return buffer->add_block(channel.get(), std::move(transmitInfo));
}
};
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index c7d26af924f..67e52b66133 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -674,6 +674,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_ES_PARALLEL_SCROLL =
"enable_es_parallel_scroll";
+ public static final String EXCHANGE_MULTI_BLOCKS_BYTE_SIZE =
"exchange_multi_blocks_byte_size";
+
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
@@ -2243,6 +2245,10 @@ public class SessionVariable implements Serializable,
Writable {
"When processing both \\n and \\r\\n as CSV line
separators, should \\r be retained?"})
public boolean keepCarriageReturn = false;
+ @VariableMgr.VarAttr(name = EXCHANGE_MULTI_BLOCKS_BYTE_SIZE,
+ description = {"Enable exchange to send multiple blocks in one
RPC. Default is 256KB. A negative"
+ + " value disables multi-block exchange."})
+ public int exchangeMultiBlocksByteSize = 256 * 1024;
@VariableMgr.VarAttr(name = FORCE_JNI_SCANNER,
description = {"强制使用jni方式读取外表", "Force the use of jni mode to read
external table"})
@@ -2591,6 +2597,12 @@ public class SessionVariable implements Serializable,
Writable {
this.disableStreamPreaggregations = random.nextBoolean();
this.enableShareHashTableForBroadcastJoin = random.nextBoolean();
this.enableParallelResultSink = random.nextBoolean();
+
+ // 4KB = 4 * 1024 bytes
+ int minBytes = 4 * 1024;
+ // 10MB = 10 * 1024 * 1024 bytes
+ int maxBytes = 10 * 1024 * 1024;
+ this.exchangeMultiBlocksByteSize = minBytes + (int)
(random.nextDouble() * (maxBytes - minBytes));
int randomInt = random.nextInt(4);
if (randomInt % 2 == 0) {
this.rewriteOrToInPredicateThreshold = 100000;
@@ -4165,6 +4177,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableRuntimeFilterPartitionPrune(enableRuntimeFilterPartitionPrune);
tResult.setMinimumOperatorMemoryRequiredKb(minimumOperatorMemoryRequiredKB);
+ tResult.setExchangeMultiBlocksByteSize(exchangeMultiBlocksByteSize);
return tResult;
}
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 402f1044568..e36907e9ea1 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -51,6 +51,7 @@ message PTransmitDataParams {
optional bool transfer_by_attachment = 10 [default = false];
optional PUniqueId query_id = 11;
optional PStatus exec_status = 12;
+ repeated PBlock blocks = 13;
};
message PTransmitDataResult {
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 60ca569949f..81e4d1f877c 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -393,6 +393,7 @@ struct TQueryOptions {
162: optional bool dump_heap_profile_when_mem_limit_exceeded = false
163: optional bool inverted_index_compatible_read = false
164: optional bool check_orc_init_sargs_success = false
+ 165: optional i32 exchange_multi_blocks_byte_size = 262144
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]