github-actions[bot] commented on code in PR #62661:
URL: https://github.com/apache/doris/pull/62661#discussion_r3378489608
##########
be/src/load/channel/tablets_channel.cpp:
##########
@@ -675,6 +731,158 @@ Status BaseTabletsChannel::_write_block_data(
return Status::OK();
}
+std::shared_ptr<std::mutex>
BaseTabletsChannel::_get_partition_route_lock(int64_t partition_id) {
+ std::lock_guard<std::mutex> l(_partition_route_locks_lock);
+ auto& lock = _partition_route_locks[partition_id];
+ if (lock == nullptr) {
+ lock = std::make_shared<std::mutex>();
+ }
+ return lock;
+}
+
+Status BaseTabletsChannel::_write_block_data_for_receiver_side_random_bucket(
+ const PTabletWriterAddBlockRequest& request, int64_t cur_seq,
+ std::unordered_map<int64_t, DorisVector<uint32_t>>&
partition_to_rowidxs,
+ PTabletWriterAddBlockResult* response) {
+ Block send_data;
+ [[maybe_unused]] size_t uncompressed_size = 0;
+ [[maybe_unused]] int64_t uncompressed_time = 0;
+ RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size,
&uncompressed_time));
+ if (send_data.rows() != request.partition_ids_size()) {
+ return Status::InternalError(
+ "invalid receiver-side random bucket add block request row
count, load_id={}, "
+ "index_id={}, packet_seq={}, block_rows={},
partition_ids_size={}",
+ print_id(_load_id), _index_id, request.packet_seq(),
send_data.rows(),
+ request.partition_ids_size());
+ }
+
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ for (const auto& [partition_id, _] : partition_to_rowidxs) {
+ _partition_ids.emplace(partition_id);
+ }
+ }
+
+ g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes();
+ Defer defer {
+ [&]() { g_tablets_channel_send_data_allocated_size <<
-send_data.allocated_bytes(); }};
+
+ auto* tablet_errors = response->mutable_tablet_errors();
+ auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos();
+
+ auto write_partition_data = [&](int64_t partition_id,
+ const DorisVector<uint32_t>& row_idxs) ->
Status {
+ auto partition_lock = _get_partition_route_lock(partition_id);
+ std::lock_guard<std::mutex> partition_guard(*partition_lock);
+
+ if (_adaptive_random_bucket_state == nullptr) {
+ return Status::InternalError(
+ "receiver-side random bucket state is not initialized,
load_id={}, "
+ "index_id={}, packet_seq={}, partition_id={}",
+ print_id(_load_id), _index_id, request.packet_seq(),
partition_id);
+ }
+ int64_t tablet_id =
_adaptive_random_bucket_state->current_tablet(partition_id);
Review Comment:
This receiver-side random bucket path logs an `INFO` record before and after
every partition write in every add-batch RPC. A large stream/broker load can
send thousands of batches across many partitions, so these two unconditional
logs become a hot-path cost and can flood BE logs/disk even when the load is
healthy. Please demote the begin/done records to `VLOG_DEBUG`/`VLOG_PROGRESS`
or rate-limit them; keep `INFO` for state transitions/errors such as
initialization, rotation, or broken tablets.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]