github-actions[bot] commented on code in PR #62661:
URL: https://github.com/apache/doris/pull/62661#discussion_r3378570991
##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -5695,4 +5735,103 @@ private TStatus checkMaster() {
}
return status;
}
+
+ private static final class AdaptiveBucketSinkContext {
+ private final List<Long> sinkBackendIds;
+ private final int planFragmentNum;
+
+ private AdaptiveBucketSinkContext(List<Long> sinkBackendIds, int
planFragmentNum) {
+ this.sinkBackendIds = sinkBackendIds;
+ this.planFragmentNum = planFragmentNum;
+ }
+ }
+
+ private static AdaptiveBucketSinkContext
collectAdaptiveBucketSinkContext(TUniqueId queryId, long currentBeId) {
+ if (queryId == null) {
+ return new
AdaptiveBucketSinkContext(Lists.newArrayList(currentBeId), 1);
+ }
+ Coordinator coordinator =
QeProcessorImpl.INSTANCE.getCoordinator(queryId);
+ if (coordinator == null) {
+ return new
AdaptiveBucketSinkContext(Lists.newArrayList(currentBeId), 1);
+ }
+ if (!(coordinator instanceof NereidsCoordinator)) {
+ Optional<Coordinator.AdaptiveRandomBucketSinkContext> context =
+ coordinator.getAdaptiveRandomBucketSinkContext();
+ if (context.isPresent()) {
+ return new AdaptiveBucketSinkContext(
+ context.get().getSinkBackendIds(),
context.get().getPlanFragmentNum());
+ }
+ return new
AdaptiveBucketSinkContext(Lists.newArrayList(currentBeId), 1);
+ }
+ Set<Long> sinkBackendIds = new TreeSet<>();
+ int planFragmentNum = 0;
+ for (PipelineDistributedPlan distributedPlan :
+ ((NereidsCoordinator)
coordinator).getCoordinatorContext().distributedPlans) {
+ if (!(distributedPlan.getFragmentJob().getFragment().getSink()
instanceof OlapTableSink)) {
+ continue;
+ }
+ planFragmentNum += distributedPlan.getInstanceJobs().size();
+ for (AssignedJob assignedJob : distributedPlan.getInstanceJobs()) {
+ sinkBackendIds.add(assignedJob.getAssignedWorker().id());
+ }
+ }
+ if (sinkBackendIds.isEmpty()) {
+ sinkBackendIds.add(currentBeId);
+ }
+ return new AdaptiveBucketSinkContext(new ArrayList<>(sinkBackendIds),
Math.max(planFragmentNum, 1));
+ }
+
+ private static void assignAdaptiveBucketToPartition(TOlapTablePartition
partition,
+ List<TTabletLocation> partitionTablets, long currentBeId,
TUniqueId queryId) {
+ if (!Config.enable_adaptive_random_bucket_load
+ || !partition.isSetLoadTabletIdx() || currentBeId <= 0) {
Review Comment:
This runtime partition replan has no way to know that the original sink was
`load_to_single_tablet=true`, so it can still rewrite `load_tablet_idx` for
auto-created/replaced random partitions in an explicit single-tablet load. BE
keeps those loads in `FIND_TABLET_EVERY_SINK` and ignores `local_bucket_seqs`,
but it does consume `partition.load_tablet_idx`; two receiver BEs can therefore
get different rewritten tablet indexes for runtime partitions even if initial
planning preserves the single-tablet setting. This is distinct from the
initial-planning comments because it affects
`createPartition()`/`replacePartition()` after the load has started. Please
gate this helper on the same load mode, or pass enough sink context to skip
adaptive rewriting for `load_to_single_tablet`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]