github-actions[bot] commented on code in PR #62661:
URL: https://github.com/apache/doris/pull/62661#discussion_r3378489608


##########
be/src/load/channel/tablets_channel.cpp:
##########
@@ -675,6 +731,158 @@ Status BaseTabletsChannel::_write_block_data(
     return Status::OK();
 }
 
+std::shared_ptr<std::mutex> 
BaseTabletsChannel::_get_partition_route_lock(int64_t partition_id) {
+    std::lock_guard<std::mutex> l(_partition_route_locks_lock);
+    auto& lock = _partition_route_locks[partition_id];
+    if (lock == nullptr) {
+        lock = std::make_shared<std::mutex>();
+    }
+    return lock;
+}
+
+Status BaseTabletsChannel::_write_block_data_for_receiver_side_random_bucket(
+        const PTabletWriterAddBlockRequest& request, int64_t cur_seq,
+        std::unordered_map<int64_t, DorisVector<uint32_t>>& 
partition_to_rowidxs,
+        PTabletWriterAddBlockResult* response) {
+    Block send_data;
+    [[maybe_unused]] size_t uncompressed_size = 0;
+    [[maybe_unused]] int64_t uncompressed_time = 0;
+    RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size, 
&uncompressed_time));
+    if (send_data.rows() != request.partition_ids_size()) {
+        return Status::InternalError(
+                "invalid receiver-side random bucket add block request row 
count, load_id={}, "
+                "index_id={}, packet_seq={}, block_rows={}, 
partition_ids_size={}",
+                print_id(_load_id), _index_id, request.packet_seq(), 
send_data.rows(),
+                request.partition_ids_size());
+    }
+
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        for (const auto& [partition_id, _] : partition_to_rowidxs) {
+            _partition_ids.emplace(partition_id);
+        }
+    }
+
+    g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes();
+    Defer defer {
+            [&]() { g_tablets_channel_send_data_allocated_size << 
-send_data.allocated_bytes(); }};
+
+    auto* tablet_errors = response->mutable_tablet_errors();
+    auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos();
+
+    auto write_partition_data = [&](int64_t partition_id,
+                                    const DorisVector<uint32_t>& row_idxs) -> 
Status {
+        auto partition_lock = _get_partition_route_lock(partition_id);
+        std::lock_guard<std::mutex> partition_guard(*partition_lock);
+
+        if (_adaptive_random_bucket_state == nullptr) {
+            return Status::InternalError(
+                    "receiver-side random bucket state is not initialized, 
load_id={}, "
+                    "index_id={}, packet_seq={}, partition_id={}",
+                    print_id(_load_id), _index_id, request.packet_seq(), 
partition_id);
+        }
+        int64_t tablet_id = 
_adaptive_random_bucket_state->current_tablet(partition_id);

Review Comment:
   This receiver-side random bucket path logs an `INFO` record before and after 
every partition write in every add-batch RPC. A large stream/broker load can 
send thousands of batches across many partitions, so these two unconditional 
logs become a hot-path cost and can flood BE logs/disk even when the load is 
healthy. Please demote the begin/done records to `VLOG_DEBUG`/`VLOG_PROGRESS` 
or rate-limit them; keep `INFO` for state transitions/errors such as 
initialization, rotation, or broken tablets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to