github-actions[bot] commented on code in PR #62661:
URL: https://github.com/apache/doris/pull/62661#discussion_r3371964168


##########
be/src/exec/sink/vtablet_finder.cpp:
##########
@@ -22,16 +22,89 @@
 #include <gen_cpp/FrontendService_types.h>
 #include <glog/logging.h>
 
+#include <algorithm>
 #include <string>
 #include <utility>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/config.h"
 #include "common/status.h"
 #include "core/block/block.h"
 #include "runtime/runtime_state.h"
 #include "storage/tablet_info.h"
 
 namespace doris {
+
+void AdaptiveRandomBucketState::init_partition(int64_t partition_id,
+                                               const std::vector<int64_t>& 
tablets,
+                                               const std::vector<int32_t>& 
bucket_seqs,
+                                               int32_t start_tablet_idx) {
+    if (partition_id < 0 || tablets.empty()) {
+        return;
+    }
+    std::lock_guard<std::mutex> lock(_mutex);
+    if (_partition_states.contains(partition_id)) {
+        return;

Review Comment:
   `TabletsChannel` is keyed by load/index and shared by all senders, but each 
sender's open request can carry a different FE-selected ordered bucket list for 
the same partition on the same receiver BE. The first sender to initialize the 
partition wins here, and all later opens for that partition return without 
merging or validating their `random_bucket_partitions`. For example, if two 
sink backends are assigned different local start buckets that both route to the 
same bucket-owner BE, the second sender's rows will be routed using the first 
sender's tablet order/start, defeating the FE assignment and potentially 
concentrating all rows on the wrong bucket until rotation. The receiver state 
needs to either include sender-specific state or merge/validate subsequent 
ordered lists instead of silently ignoring them.



##########
be/src/load/channel/tablets_channel.cpp:
##########
@@ -639,6 +681,133 @@ Status BaseTabletsChannel::_write_block_data(
     return Status::OK();
 }
 
+std::shared_ptr<std::mutex> 
BaseTabletsChannel::_get_partition_route_lock(int64_t partition_id) {
+    std::lock_guard<std::mutex> l(_partition_route_locks_lock);
+    auto& lock = _partition_route_locks[partition_id];
+    if (lock == nullptr) {
+        lock = std::make_shared<std::mutex>();
+    }
+    return lock;
+}
+
+Status BaseTabletsChannel::_write_block_data_for_receiver_side_random_bucket(
+        const PTabletWriterAddBlockRequest& request, int64_t cur_seq,
+        std::unordered_map<int64_t, DorisVector<uint32_t>>& 
partition_to_rowidxs,
+        PTabletWriterAddBlockResult* response) {
+    Block send_data;
+    [[maybe_unused]] size_t uncompressed_size = 0;
+    [[maybe_unused]] int64_t uncompressed_time = 0;
+    RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size, 
&uncompressed_time));
+    CHECK(send_data.rows() == request.partition_ids_size())
+            << "block rows: " << send_data.rows()
+            << ", partition_ids_size: " << request.partition_ids_size();
+
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        for (const auto& [partition_id, _] : partition_to_rowidxs) {
+            _partition_ids.emplace(partition_id);
+        }
+    }
+
+    g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes();
+    Defer defer {
+            [&]() { g_tablets_channel_send_data_allocated_size << 
-send_data.allocated_bytes(); }};
+
+    auto* tablet_errors = response->mutable_tablet_errors();
+    auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos();
+
+    auto write_partition_data = [&](int64_t partition_id,
+                                    const DorisVector<uint32_t>& row_idxs) -> 
Status {
+        auto partition_lock = _get_partition_route_lock(partition_id);
+        std::lock_guard<std::mutex> partition_guard(*partition_lock);
+
+        CHECK(_adaptive_random_bucket_state != nullptr);
+        int64_t tablet_id = 
_adaptive_random_bucket_state->current_tablet(partition_id);
+        CHECK(tablet_id >= 0) << "invalid current tablet, load_id=" << _load_id
+                              << ", partition_id=" << partition_id;
+        LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: route+write begin"
+                  << ", load_id=" << _load_id << ", index_id=" << _index_id
+                  << ", sender_id=" << request.sender_id()
+                  << ", packet_seq=" << request.packet_seq() << ", 
partition_id=" << partition_id
+                  << ", tablet_id=" << tablet_id << ", row_count=" << 
row_idxs.size();
+
+        {
+            std::shared_lock<std::shared_mutex> 
broken_rlock(_broken_tablets_lock);
+            if (_is_broken_tablet(tablet_id)) {
+                LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: skip broken tablet"
+                          << ", load_id=" << _load_id << ", index_id=" << 
_index_id
+                          << ", sender_id=" << request.sender_id()
+                          << ", packet_seq=" << request.packet_seq()
+                          << ", partition_id=" << partition_id << ", 
tablet_id=" << tablet_id;
+                return Status::OK();
+            }
+        }
+
+        decltype(_tablet_writers.find(tablet_id)) tablet_writer_it;
+        {
+            std::lock_guard<std::mutex> l(_tablet_writers_lock);
+            tablet_writer_it = _tablet_writers.find(tablet_id);
+            if (tablet_writer_it == _tablet_writers.end()) {
+                return Status::InternalError("unknown tablet to append data, 
tablet={}", tablet_id);
+            }
+        }
+
+        bool memtable_flushed = false;
+        Status st = tablet_writer_it->second->write(&send_data, row_idxs, 
&memtable_flushed);
+        if (!st.ok()) {

Review Comment:
   This new receiver-side path keeps an `unordered_map` iterator after 
releasing `_tablet_writers_lock`, then dereferences it for `write()`, 
`cancel_with_status()`, and `set_tablet_load_rowset_num_info()`. `add_batch()` 
is explicitly allowed to run concurrently with `incremental_open()`, and 
`incremental_open()` inserts into `_tablet_writers` under the same lock; that 
insert can rehash the map and invalidate this iterator while the partition 
route lock is held. A concrete auto-partition load can be writing partition P 
here while another sender incremental-opens a different partition on the same 
tablets channel, causing undefined behavior or a write through an invalid 
iterator. Copy a stable `BaseDeltaWriter*` while holding `_tablet_writers_lock` 
and use that pointer after releasing the lock, matching the fact that writers 
are inserted but not erased during the channel lifetime.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to