github-actions[bot] commented on code in PR #62661:
URL: https://github.com/apache/doris/pull/62661#discussion_r3377508056


##########
be/src/exec/sink/writer/vtablet_writer.cpp:
##########
@@ -657,6 +666,70 @@ void VNodeChannel::_open_internal(bool is_incremental) {
     request->set_txn_expiration(_parent->_txn_expiration);
     request->set_write_file_cache(_parent->_write_file_cache);
 
+    if (_parent->_tablet_finder->is_adaptive_random_bucket()) {
+        std::unordered_map<int64_t, std::vector<int64_t>> 
partition_to_ordered_tablets;
+        std::unordered_map<int64_t, std::unordered_set<int64_t>> 
partition_to_local_tablets;
+        for (const auto& tablet : _all_tablets) {
+            
partition_to_ordered_tablets[tablet.partition_id].push_back(tablet.tablet_id);
+            
partition_to_local_tablets[tablet.partition_id].insert(tablet.tablet_id);
+        }
+        std::unordered_map<int64_t, const VOlapTablePartition*> 
id_to_partition;

Review Comment:
   This adds a `random_bucket_partition` before proving that this 
`VNodeChannel` owns any of the FE-selected `local_bucket_seqs`. `IndexChannel` 
still opens channels for BEs that have replicas for the partition, so a 
non-selected BE can have entries in `_all_tablets` but none of the selected 
bucket tablets. The loop below then skips every selected tablet and sends an 
empty `ordered_tablet_ids`, which makes the receiver abort at 
`BaseTabletsChannel::_init_receiver_side_random_bucket_state()` with 
`CHECK(!params.ordered_tablet_ids.empty())`. This is distinct from the existing 
selected-order issue: even after mapping global bucket seqs correctly, channels 
with no selected local buckets need to be skipped instead of opening 
receiver-side random state with an empty partition.



##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -5695,4 +5735,94 @@ private TStatus checkMaster() {
         }
         return status;
     }
+
+    private static final class AdaptiveBucketSinkContext {
+        private final List<Long> sinkBackendIds;
+        private final int planFragmentNum;
+
+        private AdaptiveBucketSinkContext(List<Long> sinkBackendIds, int 
planFragmentNum) {
+            this.sinkBackendIds = sinkBackendIds;
+            this.planFragmentNum = planFragmentNum;

Review Comment:
   `createPartition()`/`replacePartition()` already look up a classic 
`Coordinator` to decide `needUseCache`, but this helper falls back to 
`[currentBeId], planFragmentNum=1` for every non-Nereids coordinator. In a 
classic multi-instance load that creates or replaces a random partition at 
runtime, each BE will independently replan that new partition as if it were the 
only sink backend, while the initial sink assignment was computed across all 
sink BEs in `Coordinator.assignAdaptiveRandomBucketForFragment()`. That can 
give different `bucket_be_id` / `local_bucket_seqs` for runtime partitions than 
for the rest of the load and duplicate bucket choices across senders. Please 
derive the sink backend set and fragment instance count for classic 
coordinators as well, or skip adaptive replanning for this path until it can 
use the same load-wide context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to