github-actions[bot] commented on code in PR #62661:
URL: https://github.com/apache/doris/pull/62661#discussion_r3417844764
##########
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java:
##########
@@ -538,6 +543,98 @@ public Map<String, Integer> getBeToInstancesNum() {
return result;
}
+ public static final class AdaptiveRandomBucketSinkContext {
+ private final List<Long> sinkBackendIds;
+ private final int planFragmentNum;
+
+ private AdaptiveRandomBucketSinkContext(List<Long> sinkBackendIds, int
planFragmentNum) {
+ this.sinkBackendIds = sinkBackendIds;
+ this.planFragmentNum = planFragmentNum;
+ }
+
+ public List<Long> getSinkBackendIds() {
+ return sinkBackendIds;
+ }
+
+ public int getPlanFragmentNum() {
+ return planFragmentNum;
+ }
+ }
+
+ public Optional<AdaptiveRandomBucketSinkContext>
getAdaptiveRandomBucketSinkContext(long tableId) {
+ Set<Long> sinkBackendIds = new TreeSet<>();
+ int planFragmentNum = 0;
+ for (PipelineExecContext context : pipelineExecContexts.values()) {
+ TPipelineFragmentParams params = context.rpcParams;
+ if (params.getFragment().getOutputSink() == null
+ || params.getFragment().getOutputSink().getType() !=
TDataSinkType.OLAP_TABLE_SINK) {
+ continue;
+ }
+ TOlapTableSink sink =
params.getFragment().getOutputSink().getOlapTableSink();
+ if (sink.getTableId() != tableId) {
+ continue;
+ }
+ if (!OlapTableSink.shouldAssignAdaptiveRandomBucket(sink)) {
+ continue;
+ }
+ sinkBackendIds.add(params.getBackendId());
+ planFragmentNum += params.getLocalParamsSize();
+ }
+ if (sinkBackendIds.isEmpty()) {
+ return Optional.empty();
+ }
+ return Optional.of(new AdaptiveRandomBucketSinkContext(
+ new ArrayList<>(sinkBackendIds), Math.max(planFragmentNum,
1)));
+ }
+
+ private static void assignAdaptiveRandomBucketForFragment(
+ Collection<TPipelineFragmentParams> fragmentParamsList) {
+ List<TPipelineFragmentParams> sinkParams = fragmentParamsList.stream()
+ .filter(param -> param.getFragment().getOutputSink() != null
+ && param.getFragment().getOutputSink().getType() ==
TDataSinkType.OLAP_TABLE_SINK)
+ .collect(Collectors.toList());
+ if (sinkParams.isEmpty()) {
+ return;
+ }
+ TOlapTableSink sink =
sinkParams.get(0).getFragment().getOutputSink().getOlapTableSink();
+ if (!OlapTableSink.shouldAssignAdaptiveRandomBucket(sink)) {
+ return;
+ }
+ List<Long> sinkBackendIds = sinkParams.stream()
+ .map(TPipelineFragmentParams::getBackendId)
+ .distinct()
+ .sorted()
+ .collect(Collectors.toList());
+ int planFragmentNum = sinkParams.stream()
+ .mapToInt(TPipelineFragmentParams::getLocalParamsSize)
+ .sum();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Adaptive random bucket planning in legacy fragment={},
sinkBackendIds={}, "
+ + "planFragmentNum={}",
+ sinkParams.get(0).getFragmentId(), sinkBackendIds,
planFragmentNum);
+ }
+ Map<Long, Map<Long, OlapTableSink.AdaptiveBucketAssignment>>
assignments =
+
OlapTableSink.computeAdaptiveRandomBucketAssignments(sinkBackendIds,
+ sink.getPartition().getPartitions(),
sink.getLocation().getTablets(), planFragmentNum);
+ for (TPipelineFragmentParams sinkParam : sinkParams) {
+ Map<Long, OlapTableSink.AdaptiveBucketAssignment>
partitionAssignments =
+ assignments.get(sinkParam.getBackendId());
+ if (partitionAssignments == null) {
+ continue;
+ }
+ TOlapTableSink copiedSink =
deepCopyOlapTableSinkForCurrentBackend(sinkParam);
+ OlapTableSink.applyAdaptiveRandomBucketAssignments(
+ copiedSink.getPartition().getPartitions(),
+ partitionAssignments);
+ }
Review Comment:
`deepCopyOlapTableSinkForCurrentBackend()` still is not BE-local on the
classic Coordinator path. In `FragmentExecParams.toThrift()` the legacy planner
creates one `TPlanFragment fragmentThrift = fragment.toThrift()` before the
backend loop and every `TPipelineFragmentParams` gets that same object via
`params.setFragment(fragmentThrift)`. So this `setOutputSink()` mutates the
shared fragment; as `assignAdaptiveRandomBucketForFragment()` iterates
backends, the last backend's `bucket_be_id` / `local_bucket_seqs` overwrite the
assignments for all earlier backends. A classic multi-BE load can then have
every sender route using the last BE's adaptive assignment. This is distinct
from the existing Nereids deep-copy thread because `ThriftPlansBuilder` now
creates a fragment per worker before copying the sink; the classic Coordinator
path still shares the fragment. Please deep-copy the `TPlanFragment` (or create
per-backend `fragmentThrift`) before attaching the copied sink.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]