github-actions[bot] commented on code in PR #62661:
URL: https://github.com/apache/doris/pull/62661#discussion_r3414747878
##########
be/src/load/channel/tablets_channel.cpp:
##########
@@ -675,6 +719,160 @@ Status BaseTabletsChannel::_write_block_data(
return Status::OK();
}
+std::shared_ptr<std::mutex>
BaseTabletsChannel::_get_partition_route_lock(int64_t partition_id) {
+ std::lock_guard<std::mutex> l(_partition_route_locks_lock);
+ auto& lock = _partition_route_locks[partition_id];
+ if (lock == nullptr) {
+ lock = std::make_shared<std::mutex>();
+ }
+ return lock;
+}
+
+Status BaseTabletsChannel::_write_block_data_for_receiver_side_random_bucket(
+ const PTabletWriterAddBlockRequest& request, int64_t cur_seq,
+ std::unordered_map<int64_t, DorisVector<uint32_t>>&
partition_to_rowidxs,
+ PTabletWriterAddBlockResult* response) {
+ Block send_data;
+ [[maybe_unused]] size_t uncompressed_size = 0;
+ [[maybe_unused]] int64_t uncompressed_time = 0;
+ RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size,
&uncompressed_time));
+ if (send_data.rows() != request.partition_ids_size()) {
+ return Status::InternalError(
+ "invalid receiver-side random bucket add block request row
count, load_id={}, "
+ "index_id={}, packet_seq={}, block_rows={},
partition_ids_size={}",
+ print_id(_load_id), _index_id, request.packet_seq(),
send_data.rows(),
+ request.partition_ids_size());
+ }
+
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ for (const auto& [partition_id, _] : partition_to_rowidxs) {
+ _partition_ids.emplace(partition_id);
+ }
+ }
+
+ g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes();
+ Defer defer {
+ [&]() { g_tablets_channel_send_data_allocated_size <<
-send_data.allocated_bytes(); }};
+
+ auto* tablet_errors = response->mutable_tablet_errors();
Review Comment:
This partition route lock is held while `tablet_writer->write()` runs, and
that write can block on table-level memtable backpressure or submit a flush.
The adaptive state is keyed by `(sender_id, partition_id)`, so different
senders that FE assigns to different buckets on the same receiver do not need a
partition-wide critical section. A concrete case is two sink BEs whose selected
buckets are both owned by the same third BE: they should be able to write
bucket 0 and bucket 1 concurrently, but this lock serializes all rows for the
partition. Please narrow the lock to the state lookup/rotation, or key it by
sender as well, and keep the potentially blocking write outside the
partition-wide lock.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]