github-actions[bot] commented on code in PR #62661:
URL: https://github.com/apache/doris/pull/62661#discussion_r3491739303


##########
fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java:
##########
@@ -574,6 +581,375 @@ private TOlapTablePartitionParam 
createDummyPartition(long dbId, OlapTable table
         return partitionParam;
     }
 
+    public static final class AdaptiveBucketAssignment {
+        private final int loadTabletIdx;
+        private final AdaptiveIndexBucketAssignment firstIndexAssignment;
+        private final Map<Long, AdaptiveIndexBucketAssignment> 
indexAssignments;
+
+        public AdaptiveBucketAssignment(int loadTabletIdx,
+                AdaptiveIndexBucketAssignment firstIndexAssignment,
+                Map<Long, AdaptiveIndexBucketAssignment> indexAssignments) {
+            this.loadTabletIdx = loadTabletIdx;
+            this.firstIndexAssignment = firstIndexAssignment;
+            this.indexAssignments = new HashMap<>(indexAssignments);
+        }
+
+        public long getBucketBeId() {
+            return firstIndexAssignment == null ? -1L : 
firstIndexAssignment.getBucketBeId();
+        }
+
+        public int getLoadTabletIdx() {
+            return loadTabletIdx;
+        }
+
+        public List<Integer> getLocalBucketSeqs() {
+            return firstIndexAssignment == null ? Collections.emptyList()
+                    : firstIndexAssignment.getLocalBucketSeqs();
+        }
+
+        public Map<Long, AdaptiveIndexBucketAssignment> getIndexAssignments() {
+            return indexAssignments;
+        }
+    }
+
+    public static final class AdaptiveIndexBucketAssignment {
+        private final long indexId;
+        private final long bucketBeId;
+        private final List<Integer> localBucketSeqs;
+
+        public AdaptiveIndexBucketAssignment(long indexId, long bucketBeId, 
List<Integer> localBucketSeqs) {
+            this.indexId = indexId;
+            this.bucketBeId = bucketBeId;
+            this.localBucketSeqs = new ArrayList<>(localBucketSeqs);
+        }
+
+        public long getIndexId() {
+            return indexId;
+        }
+
+        public long getBucketBeId() {
+            return bucketBeId;
+        }
+
+        public List<Integer> getLocalBucketSeqs() {
+            return localBucketSeqs;
+        }
+
+        @Override
+        public String toString() {
+            return "indexId=" + indexId + ",bucketBeId=" + bucketBeId
+                    + ",localBucketSeqs=" + localBucketSeqs;
+        }
+    }
+
+    public static boolean shouldAssignAdaptiveRandomBucket(TOlapTableSink 
sink) {
+        return sink != null
+                && sink.isSetEnableAdaptiveRandomBucket()
+                && sink.isEnableAdaptiveRandomBucket()
+                && (!sink.isSetLoadToSingleTablet() || 
!sink.isLoadToSingleTablet())
+                && sink.isSetPartition()
+                && sink.getPartition() != null
+                && (!sink.getPartition().isSetDistributedColumns()
+                        || 
sink.getPartition().getDistributedColumns().isEmpty());
+    }
+
+    public boolean shouldAssignAdaptiveRandomBucket() {
+        return tDataSink != null && 
shouldAssignAdaptiveRandomBucket(tDataSink.getOlapTableSink());
+    }
+
+    public static Map<Long, Map<Long, AdaptiveBucketAssignment>> 
computeAdaptiveRandomBucketAssignments(
+            List<Long> sinkBackendIds, List<TOlapTablePartition> partitions,
+            List<TTabletLocation> tabletLocations, int sinkInstanceNum) {
+        Map<Long, Map<Long, AdaptiveBucketAssignment>> assignments = new 
HashMap<>();
+        List<Long> orderedSinkBackendIds = sinkBackendIds.stream()
+                .distinct()
+                .sorted()
+                .collect(Collectors.toList());
+        for (Long sinkBackendId : orderedSinkBackendIds) {
+            assignments.put(sinkBackendId, new HashMap<>());
+        }
+        if (orderedSinkBackendIds.isEmpty() || partitions == null || 
tabletLocations == null) {
+            return assignments;
+        }
+
+        Map<Long, TTabletLocation> tabletLocationMap = new 
HashMap<>(tabletLocations.size());
+        for (TTabletLocation tabletLocation : tabletLocations) {
+            tabletLocationMap.put(tabletLocation.getTabletId(), 
tabletLocation);
+        }
+
+        for (TOlapTablePartition partition : partitions) {
+            if (!partition.isSetLoadTabletIdx() || partition.getNumBuckets() 
<= 0
+                    || partition.getIndexes().isEmpty()) {
+                continue;
+            }
+            TOlapTableIndexTablets baseIndex = partition.getIndexes().get(0);
+            Map<Long, List<Integer>> beToBucketSeqs = 
buildBeToBucketSeqs(baseIndex, tabletLocationMap);
+            long baseTabletIndex = partition.getLoadTabletIdx();
+            int fallbackBucketIdx = (int) Math.floorMod(baseTabletIndex, 
(long) partition.getNumBuckets());
+            int targetBucketNum = Math.min(

Review Comment:
   The new `sinkInstanceNum` still cannot increase the number of adaptive 
buckets when several sink instances run on the same BE. 
`computeAdaptiveRandomBucketAssignments()` deduplicates `sinkBackendIds` and 
then caps `targetBucketNum` by `orderedSinkBackendIds.size()`, and the result 
is keyed only by backend id. For example, with two sink BEs, four local sink 
instances per BE, and eight random buckets, `sinkInstanceNum` is 8 but this 
line still plans only `min(2, 8, 8) = 2` buckets, so all four senders on a BE 
open the same `local_bucket_seqs` and start on the same receiver tablet until 
rotation. Please either make the assignment granular per local sender/instance, 
or remove `sinkInstanceNum` from this calculation and document that adaptive 
random bucket intentionally works at backend granularity rather than 
sink-instance granularity.



##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -5745,4 +5768,132 @@ private TStatus checkMaster() {
         }
         return status;
     }
+
+    private static final class AdaptiveBucketSinkContext {
+        private final boolean enableAdaptiveRandomBucket;
+        private final List<Long> sinkBackendIds;
+        private final int sinkInstanceNum;
+
+        private AdaptiveBucketSinkContext(boolean enableAdaptiveRandomBucket, 
List<Long> sinkBackendIds,
+                int sinkInstanceNum) {
+            this.enableAdaptiveRandomBucket = enableAdaptiveRandomBucket;
+            this.sinkBackendIds = sinkBackendIds;
+            this.sinkInstanceNum = sinkInstanceNum;
+        }
+    }
+
+    private static AdaptiveBucketSinkContext 
disabledAdaptiveBucketSinkContext() {
+        return new AdaptiveBucketSinkContext(false, Lists.newArrayList(), 1);
+    }
+
+    private static AdaptiveBucketSinkContext 
collectAdaptiveBucketSinkContext(TUniqueId queryId, long tableId) {
+        if (queryId == null) {
+            return disabledAdaptiveBucketSinkContext();
+        }
+        Coordinator coordinator = 
QeProcessorImpl.INSTANCE.getCoordinator(queryId);
+        if (coordinator == null) {
+            return disabledAdaptiveBucketSinkContext();
+        }
+        if (!(coordinator instanceof NereidsCoordinator)) {
+            Optional<Coordinator.AdaptiveRandomBucketSinkContext> context =
+                    coordinator.getAdaptiveRandomBucketSinkContext(tableId);
+            if (context.isPresent()) {
+                return new AdaptiveBucketSinkContext(
+                        true, context.get().getSinkBackendIds(), 
context.get().getSinkInstanceNum());
+            }
+            return disabledAdaptiveBucketSinkContext();
+        }
+        Set<Long> sinkBackendIds = new TreeSet<>();
+        int sinkInstanceNum = 0;
+        for (PipelineDistributedPlan distributedPlan :
+                ((NereidsCoordinator) 
coordinator).getCoordinatorContext().distributedPlans) {
+            if (!(distributedPlan.getFragmentJob().getFragment().getSink() 
instanceof OlapTableSink)) {
+                continue;
+            }
+            OlapTableSink sink = (OlapTableSink) 
distributedPlan.getFragmentJob().getFragment().getSink();
+            if (sink.getDstTable().getId() != tableId) {
+                continue;
+            }
+            if (!sink.shouldAssignAdaptiveRandomBucket()) {
+                continue;
+            }
+            sinkInstanceNum += distributedPlan.getInstanceJobs().size();
+            for (AssignedJob assignedJob : distributedPlan.getInstanceJobs()) {
+                sinkBackendIds.add(assignedJob.getAssignedWorker().id());
+            }
+        }
+        if (sinkBackendIds.isEmpty()) {
+            return disabledAdaptiveBucketSinkContext();
+        }
+        return new AdaptiveBucketSinkContext(true, new 
ArrayList<>(sinkBackendIds), Math.max(sinkInstanceNum, 1));
+    }
+
+    private static void assignAdaptiveBucketToPartition(TOlapTablePartition 
partition,
+            List<TTabletLocation> partitionTablets, long currentBeId, long 
tableId, TUniqueId queryId,
+            boolean enableAdaptiveRandomBucket) {
+        if (!enableAdaptiveRandomBucket || 
!Config.enable_adaptive_random_bucket_load

Review Comment:
   This method should treat `enableAdaptiveRandomBucket` from the 
create/replace-partition request as the per-load source of truth. That flag was 
fixed when the sink was planned/opened and is sent by BE from 
`_tablet_finder->is_adaptive_random_bucket()`, but this extra mutable global 
config check can turn the feature off only for runtime-created/replaced 
partitions of an in-flight adaptive load. If `ADMIN SET FRONTEND CONFIG 
('enable_adaptive_random_bucket_load' = 'false')` runs after planning, FE 
returns partitions without `bucket_be_id` / `local_bucket_seqs`; the adaptive 
sender then falls back to its own BE id for channel selection and can later 
fail with `unknown partition channel` when this BE is not the bucket owner. 
Please gate runtime replanning on the request's per-load flag, not the current 
global default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to