This is an automated email from the ASF dual-hosted git repository.
morrySnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4c4c708cd7f [chore](distribute plan) add Javadoc to
UnassignedJob.computeAssignedJobs and related methods (#63743)
4c4c708cd7f is described below
commit 4c4c708cd7f422e49a0c32a15a4649fee72e8a1b
Author: morrySnow <[email protected]>
AuthorDate: Tue Jun 9 00:21:55 2026 +0800
[chore](distribute plan) add Javadoc to UnassignedJob.computeAssignedJobs
and related methods (#63743)
## Summary
- Add comprehensive Javadoc to `UnassignedJob.computeAssignedJobs()` and
all its overrides across 10 concrete subclasses
- Document the two-phase parallelization strategy in
`AbstractUnassignedScanJob` (`multipleMachinesParallelization` +
`insideMachineParallelization`)
- Add Javadoc to all protected helper methods: `degreeOfParallelism`,
`assignLocalShuffleJobs`, `assignedDefaultJobs`,
`useLocalShuffleToAddParallel`, `fillUpAssignedJobs`,
`fillUpSingleEmptyInstance`
- Each subclass override explains its specific logic (colocate/bucket
join, query cache partition optimization, missing bucket fill-up, etc.)
---
.../worker/job/AbstractUnassignedScanJob.java | 132 +++++++++++++++++++++
.../distribute/worker/job/UnassignedAllBEJob.java | 18 +++
.../distribute/worker/job/UnassignedGatherJob.java | 12 ++
.../UnassignedGatherScanMultiRemoteTablesJob.java | 10 ++
.../worker/job/UnassignedGroupCommitJob.java | 9 ++
.../plans/distribute/worker/job/UnassignedJob.java | 16 +++
.../worker/job/UnassignedLocalTVFSinkJob.java | 11 ++
.../worker/job/UnassignedQueryConstantJob.java | 9 ++
.../job/UnassignedScanBucketOlapTableJob.java | 67 +++++++++++
.../worker/job/UnassignedScanMetadataJob.java | 19 +++
.../job/UnassignedScanSingleOlapTableJob.java | 35 ++++++
.../job/UnassignedScanSingleRemoteTableJob.java | 20 ++++
.../worker/job/UnassignedShuffleJob.java | 16 +++
.../worker/job/UnassignedSpecifyInstancesJob.java | 11 ++
14 files changed, 385 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
index 167ea3dc334..8c8a47e8444 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
@@ -47,6 +47,24 @@ public abstract class AbstractUnassignedScanJob extends
AbstractUnassignedJob {
super(statementContext, fragment, scanNodes, exchangeToChildJob);
}
+ /**
+ * Compute assigned scan jobs using a two-phase parallelization strategy:
+ * <ol>
+ * <li><b>Cross-machine parallelization</b> ({@link
#multipleMachinesParallelization}):
+ * For each tablet / scan range, select the best replica and its
hosting backend worker.
+ * This groups scan ranges by the worker that will process them.</li>
+ * <li><b>Intra-machine parallelization</b> ({@link
#insideMachineParallelization}):
+ * Within each worker, split the assigned scan ranges into one or
more instances
+ * based on the degree of parallelism. Supports local shuffle mode
to further
+ * increase parallelism without rescanning data.</li>
+ * </ol>
+ * After both phases, {@link #fillUpAssignedJobs} provides a hook for
subclasses to
+ * supply fallback instances when no workers could be selected (e.g. all
tablets pruned).
+ *
+ * @param distributeContext the distribute context for worker selection
and parallelism config
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @return the list of assigned scan jobs, each bound to a worker with its
tablet ranges
+ */
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
@@ -59,6 +77,18 @@ public abstract class AbstractUnassignedScanJob extends
AbstractUnassignedJob {
return fillUpAssignedJobs(assignedJobs,
distributeContext.workerManager, inputJobs);
}
+ /**
+ * Hook for subclasses to supply fallback instances when the normal
parallelization
+ * produces an empty result. For example, when all tablets of a table have
been pruned
+ * (e.g. TABLET(1234) with a non-existent tablet id), this method can
create a single
+ * empty instance to keep the fragment alive and return an empty result
set.
+ *
+ * @param assignedJobs the list produced by {@link
#insideMachineParallelization};
+ * may be empty if no workers could be selected
+ * @param workerManager the worker manager used to select a random
fallback worker
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @return the (possibly augmented) list of assigned jobs; default returns
unchanged
+ */
protected List<AssignedJob> fillUpAssignedJobs(
List<AssignedJob> assignedJobs,
DistributedPlanWorkerManager workerManager,
@@ -66,9 +96,44 @@ public abstract class AbstractUnassignedScanJob extends
AbstractUnassignedJob {
return assignedJobs;
}
+ /**
+ * Cross-machine parallelization: for each tablet / scan range of the scan
nodes
+ * in this fragment, select the best replica and its hosting {@link
DistributedPlanWorker}.
+ * The result groups all scan ranges by the worker that will process them.
+ * <p>
+ * This is the first phase of the two-phase parallelization. The returned
map drives
+ * the second phase ({@link #insideMachineParallelization}) where each
worker's ranges
+ * are further split into individual instances.
+ *
+ * @param distributeContext the distribute context for worker selection
and parallelism config
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @return a map from selected worker to its {@link UninstancedScanSource}
containing
+ * the raw scan ranges assigned to that worker, not yet split into
instances
+ */
protected abstract Map<DistributedPlanWorker, UninstancedScanSource>
multipleMachinesParallelization(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs);
+ /**
+ * Intra-machine parallelization: for each worker, split its assigned scan
ranges
+ * into one or more {@link AssignedJob} instances. This is the second
phase of
+ * the two-phase parallelization, following {@link
#multipleMachinesParallelization}.
+ * <p>
+ * For each worker entry, the method:
+ * <ol>
+ * <li>Computes the max parallelism from the scan source (e.g. tablet
count).</li>
+ * <li>Determines the final instance count via {@link
#degreeOfParallelism},
+ * capped by the fragment's {@code parallelExecNum} and tablet
count.</li>
+ * <li>Splits scan ranges evenly across instances (default mode) or
creates
+ * local shuffle instances that share a single scan source to add
+ * parallelism without rescanning data ({@link
#assignLocalShuffleJobs}).</li>
+ * </ol>
+ *
+ * @param workerToScanRanges map from worker to its un-instanced scan
ranges,
+ * produced by {@link
#multipleMachinesParallelization}
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @param distributeContext the distribute context for parallelism
configuration
+ * @return the list of assigned jobs, each bound to a worker with its
portion of scan ranges
+ */
protected List<AssignedJob> insideMachineParallelization(
Map<DistributedPlanWorker, UninstancedScanSource>
workerToScanRanges,
ListMultimap<ExchangeNode, AssignedJob> inputJobs,
@@ -104,10 +169,33 @@ public abstract class AbstractUnassignedScanJob extends
AbstractUnassignedJob {
return instances;
}
+ /**
+ * Whether the fragment should use a serial source operator followed by
local
+ * shuffle to add intra-machine parallelism. When true, data is first
gathered
+ * through one exchange, then locally shuffled to multiple instances on
the same
+ * machine, allowing parallel computation without rescanning the source
data.
+ *
+ * @param distributeContext the distribute context; for load jobs, the
connect
+ * context is passed as null to avoid serial
source
+ * @return true if the fragment has a serial source operator and should use
+ * local shuffle to increase parallelism
+ */
protected boolean useLocalShuffleToAddParallel(DistributeContext
distributeContext) {
return fragment.useSerialSource(distributeContext.isLoadJob ? null :
statementContext.getConnectContext());
}
+ /**
+ * Split the given scan source evenly into {@code instanceNum} partitions
and
+ * create one {@link StaticAssignedJob} per partition, all on the same
worker.
+ * Each instance scans a disjoint subset of the tablet ranges, dividing the
+ * total scan workload among the instances.
+ *
+ * @param scanSource the full scan source (e.g. all tablets assigned to
this worker)
+ * @param instanceNum the number of instances to split into
+ * @param instances the output list receiving newly created assigned jobs
+ * @param context the connect context for generating instance IDs
+ * @param worker the worker that will host all of the instances
+ */
protected void assignedDefaultJobs(ScanSource scanSource, int instanceNum,
List<AssignedJob> instances,
ConnectContext context, DistributedPlanWorker worker) {
// split the scanRanges to some partitions, one partition for one
instance
@@ -127,6 +215,22 @@ public abstract class AbstractUnassignedScanJob extends
AbstractUnassignedJob {
}
}
+ /**
+ * Create local shuffle instances on the given worker. The first instance
scans
+ * all data, and remaining instances receive an empty scan source — they
share
+ * the first instance's scan result via local shuffle on the same BE.
+ * This avoids rescanning the same data multiple times while still adding
+ * parallelism for downstream operators (e.g. aggregation).
+ * <p>
+ * All instances share the same {@code shareScanId}, signaling to the
backend
+ * that they belong to the same shared-scan group.
+ *
+ * @param scanSource the full scan source (all data for this worker)
+ * @param instanceNum the total number of local shuffle instances to create
+ * @param instances the output list receiving newly created {@link
LocalShuffleAssignedJob}s
+ * @param context the connect context for generating instance IDs
+ * @param worker the worker that will host all local shuffle instances
+ */
protected void assignLocalShuffleJobs(ScanSource scanSource, int
instanceNum, List<AssignedJob> instances,
ConnectContext context, DistributedPlanWorker worker) {
// only generate one instance to scan all data, in this step
@@ -161,6 +265,25 @@ public abstract class AbstractUnassignedScanJob extends
AbstractUnassignedJob {
}
}
+ /**
+ * Compute the number of parallel instances for this fragment.
+ * The result is bounded by several constraints:
+ * <ul>
+ * <li>If the fragment has unpartitioned data distribution, returns
1.</li>
+ * <li>If query cache is enabled, returns {@code maxParallel} (one
instance per
+ * tablet required for cache lookup).</li>
+ * <li>If the single OLAP scan node qualifies for single-instance
optimization
+ * (e.g. LIMIT with no conjuncts), returns 1 to save resources.</li>
+ * <li>If local shuffle is active, returns the fragment's {@code
parallelExecNum}.</li>
+ * <li>Otherwise, returns {@code min(maxParallel, max(parallelExecNum,
1))},
+ * i.e. capped by the actual tablet count.</li>
+ * </ul>
+ *
+ * @param maxParallel the maximum possible parallelism (e.g. total tablet
count
+ * or bucket count on this worker)
+ * @param useLocalShuffleToAddParallel whether local shuffle is active
+ * @return the number of instances to create for this worker
+ */
protected int degreeOfParallelism(int maxParallel, boolean
useLocalShuffleToAddParallel) {
Preconditions.checkArgument(maxParallel > 0, "maxParallel must be
positive");
if (!fragment.getDataPartition().isPartitioned()) {
@@ -188,6 +311,15 @@ public abstract class AbstractUnassignedScanJob extends
AbstractUnassignedJob {
return Math.min(maxParallel, Math.max(fragment.getParallelExecNum(),
1));
}
+ /**
+ * Create a single empty instance assigned to a random available worker.
+ * Used by subclasses in {@link #fillUpAssignedJobs} as a fallback when
normal
+ * parallelization produces no instances (e.g. all tablets/data pruned
away),
+ * ensuring the fragment can still execute and return an empty result.
+ *
+ * @param workerManager the worker manager to select a random worker from
+ * @return a singleton list containing one empty assigned job
+ */
protected List<AssignedJob>
fillUpSingleEmptyInstance(DistributedPlanWorkerManager workerManager) {
long catalogId = Env.getCurrentInternalCatalog().getId();
if (scanNodes != null && scanNodes.size() > 0) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
index e8b30730103..fb3e1a07526 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
@@ -56,6 +56,24 @@ public class UnassignedAllBEJob extends
AbstractUnassignedJob {
}
// ExchangeNode -> upstreamFragment -> AssignedJob(instances of
upstreamFragment)
+ /**
+ * Compute assigned jobs that deploy one instance on every available
backend.
+ * This is used for dictionary sink fragments where data must be loaded
onto
+ * all BEs. Supports two loading modes:
+ * <ul>
+ * <li><b>Full load</b>: when source data version has changed, redeploy
to all BEs
+ * with parallelism matching the upstream fragment instance
count.</li>
+ * <li><b>Partial load</b>: when only some BEs are outdated, deploy only
to those
+ * outdated BEs to avoid redundant work.</li>
+ * </ul>
+ * Each BE gets one instance with an empty {@link DefaultScanSource} (the
actual
+ * scan data comes from the upstream exchange).
+ *
+ * @param distributeContext the distribute context providing the worker
manager
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs,
+ * used to determine the expected instance count for full
loads
+ * @return one assigned job per target backend
+ */
@Override
public List<AssignedJob> computeAssignedJobs(DistributeContext
distributeContext,
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
index bd1f2779acc..3cb1b9ec858 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
@@ -40,6 +40,18 @@ public class UnassignedGatherJob extends
AbstractUnassignedJob {
super(statementContext, fragment, ImmutableList.of(),
exchangeToChildJob);
}
+ /**
+ * Compute assigned jobs for a gather (single-node) fragment.
+ * All instances are placed on a single randomly selected worker.
+ * When {@code useSerialSource} is true, multiple local shuffle instances
+ * are created on the same worker to add intra-machine parallelism:
+ * the first instance scans all data from the upstream exchange and
+ * local-shuffles it to the other local instances for parallel processing.
+ *
+ * @param distributeContext the distribute context for worker selection
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @return one or more assigned jobs, all on the same selected worker
+ */
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java
index f3d260e289d..7045ceb9748 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java
@@ -61,6 +61,16 @@ public class UnassignedGatherScanMultiRemoteTablesJob
extends AbstractUnassigned
return true;
}
+ /**
+ * Compute a single assigned job that gathers scan ranges from all
+ * {@link org.apache.doris.planner.DataGenScanNode} sources in this
fragment.
+ * All scan ranges from each DataGenScanNode are collected into one
+ * {@link DefaultScanSource} and placed on a randomly selected worker.
+ *
+ * @param distributeContext the distribute context for worker selection
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @return a list containing exactly one assigned job with all scan ranges
merged
+ */
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
index d4f32cce896..ca29f0a7689 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
@@ -39,6 +39,15 @@ public class UnassignedGroupCommitJob extends
AbstractUnassignedJob {
super(statementContext, fragment, scanNodes, exchangeToChildJob);
}
+ /**
+ * Compute a single assigned job bound to the group commit merge backend.
+ * The target backend is determined by {@link
StatementContext#getGroupCommitMergeBackend()},
+ * ensuring the group commit sink executes on the specific BE designated
for merging.
+ *
+ * @param distributeContext the distribute context (unused — worker is
fixed by group commit logic)
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @return a list containing exactly one assigned job on the group commit
merge backend
+ */
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java
index a5d6331440a..2830b1d1d9c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java
@@ -43,6 +43,22 @@ public interface UnassignedJob extends
TreeNode<UnassignedJob> {
ListMultimap<ExchangeNode, UnassignedJob> getExchangeToChildJob();
+ /**
+ * Compute and return the list of {@link AssignedJob}s for this fragment.
+ * This is the core method that transforms an unassigned fragment-level
job into
+ * concrete parallel instances, each bound to a specific {@link
DistributedPlanWorker}
+ * and carrying its assigned {@link ScanSource} (data ranges).
+ *
+ * @param distributeContext
+ * the distribute context containing worker manager, selected
workers, and other
+ * planner state needed for worker selection and parallelism
decisions
+ * @param inputJobs
+ * multimap from child {@link ExchangeNode} to their
already-assigned jobs;
+ * provides the child fragment instance layout used by
shuffle/gather jobs
+ * to determine their own instance count and worker placement
+ * @return the list of assigned jobs, each representing one fragment
instance scheduled
+ * on a specific worker with its data source
+ */
List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
index abe804dc170..a9b259de3f7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
@@ -48,6 +48,17 @@ public class UnassignedLocalTVFSinkJob extends
AbstractUnassignedJob {
this.backendId = backendId;
}
+ /**
+ * Compute a single assigned job on the designated backend for local TVF
sink.
+ * The target backend is determined by {@code backendId}. If the specified
backend
+ * is not alive, an {@link IllegalStateException} is thrown. This ensures
+ * INSERT INTO local(...) writes to the correct node's local disk.
+ *
+ * @param distributeContext the distribute context (unused — worker is
fixed by backendId)
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @return a list containing exactly one assigned job on the designated
backend
+ * @throws IllegalStateException if the target backend is not available
+ */
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java
index 4c9fb15a2b7..735760553c7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java
@@ -37,6 +37,15 @@ public class UnassignedQueryConstantJob extends
AbstractUnassignedJob {
super(statementContext, fragment, ImmutableList.of(),
ArrayListMultimap.create());
}
+ /**
+ * Compute a single assigned job on a randomly selected worker for
constant queries
+ * (e.g. SELECT 1, SELECT * FROM VALUES(...)). Such queries have no data
scan,
+ * so a single instance with an empty {@link DefaultScanSource} suffices.
+ *
+ * @param distributeContext the distribute context for random worker
selection
+ * @param inputJobs unused — constant queries have no child fragments
+ * @return a list containing exactly one assigned job on a random worker
+ */
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
index b6a450c93a1..c4a08a339ee 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
@@ -86,6 +86,18 @@ public class UnassignedScanBucketOlapTableJob extends
AbstractUnassignedScanJob
return olapScanNodes;
}
+ /**
+ * Select a replica and its hosting worker for each bucket's tablets
across all
+ * OLAP scan nodes in this fragment, grouping by bucket index. This is the
key
+ * mechanism for bucket-shuffle join and colocate join: tablets belonging
to the
+ * same bucket index across different tables are co-located on the same
worker,
+ * enabling local join without data shuffling.
+ *
+ * @param distributeContext the distribute context
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @return a map from worker to its {@link UninstancedScanSource} keyed by
bucket index,
+ * e.g. {@code {BackendWorker("172.0.0.1") -> {bucket 0:
{olapScanNode1: [...], olapScanNode2: [...]}}}}
+ */
@Override
protected Map<DistributedPlanWorker, UninstancedScanSource>
multipleMachinesParallelization(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
@@ -112,6 +124,23 @@ public class UnassignedScanBucketOlapTableJob extends
AbstractUnassignedScanJob
);
}
+ /**
+ * Split each worker's assigned buckets into one or more instances, then
fill up
+ * missing bucket instances when needed for outer join or non-intersect
set operations
+ * in bucket-shuffle mode.
+ * <p>
+ * After the default even-split from {@link
AbstractUnassignedScanJob#insideMachineParallelization},
+ * this method checks whether the fragment contains right outer join, full
outer join,
+ * semi/anti join, or non-intersect set operations that use bucket
shuffle. If a bucket
+ * index has no left-side data (e.g. due to tablet pruning), a placeholder
instance is
+ * created for that bucket so the right-side data still has a destination
to be shuffled to,
+ * preventing the join from silently dropping rows.
+ *
+ * @param workerToScanRanges map from worker to its un-instanced bucket
ranges
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @param distributeContext the distribute context for parallelism
configuration
+ * @return the list of assigned jobs, with missing bucket placeholders
filled in
+ */
@Override
protected List<AssignedJob> insideMachineParallelization(
Map<DistributedPlanWorker, UninstancedScanSource>
workerToScanRanges,
@@ -177,6 +206,29 @@ public class UnassignedScanBucketOlapTableJob extends
AbstractUnassignedScanJob
return assignedJobs;
}
+ /**
+ * Creates local shuffle instances for bucket-based join fragments.
+ * Handles two scenarios:
+ * <ol>
+ * <li><b>All serial</b>: all scan nodes use serial source. Only the
first
+ * instance scans, others share via local shuffle. Each instance is
+ * assigned a subset of bucket indexes for join processing.</li>
+ * <li><b>Mixed serial/non-serial</b>: some scan nodes are serial (e.g.
+ * multi-partition table) and some are not. The serial scan source is
+ * merged into the first instance, while non-serial sources are
+ * parallelized normally. All instances use
+ * {@link LocalShuffleBucketJoinAssignedJob} which carries the
specific
+ * bucket indexes to join.</li>
+ * </ol>
+ * Any remaining slots (when {@code instanceNum} exceeds the number of
+ * bucket groups) are filled with empty instances that have no join
buckets.
+ *
+ * @param scanSource the bucket scan source to distribute
+ * @param instanceNum the target number of instances for this worker
+ * @param instances the output list receiving newly created assigned jobs
+ * @param context the connect context for generating instance IDs
+ * @param worker the worker that will host all instances
+ */
@Override
protected void assignLocalShuffleJobs(ScanSource scanSource, int
instanceNum, List<AssignedJob> instances,
ConnectContext context, DistributedPlanWorker worker) {
@@ -506,6 +558,21 @@ public class UnassignedScanBucketOlapTableJob extends
AbstractUnassignedScanJob
return workers;
}
+ /**
+ * Compute parallelism for bucket-based scan fragments.
+ * In addition to the base class constraints, this override introduces a
+ * tablet-count-based strategy for pure colocate scan (no exchange nodes):
+ * parallelism is derived from the total tablet count, capped by the
+ * session variable {@code colocateMaxParallelNum} (default 128).
+ * <p>
+ * When exchange nodes are present (e.g. bucket shuffle join), falls back
+ * to {@link AbstractUnassignedScanJob#degreeOfParallelism} to avoid
+ * over-parallelizing the join fragment.
+ *
+ * @param maxParallel the maximum possible parallelism (bucket count)
+ * @param useLocalShuffleToAddParallel whether local shuffle is active
+ * @return the number of instances to create per worker
+ */
@Override
protected int degreeOfParallelism(int maxParallel, boolean
useLocalShuffleToAddParallel) {
Preconditions.checkArgument(maxParallel > 0, "maxParallel must be
positive");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java
index aab0f20895d..8ff7092300f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java
@@ -48,6 +48,15 @@ public class UnassignedScanMetadataJob extends
AbstractUnassignedScanJob {
this.schemaScanNode = schemaScanNode;
}
+ /**
+ * Select a worker for the schema metadata scan node (e.g.
information_schema tables).
+ * Metadata scans are typically lightweight and produce a single scan
range per node;
+ * this method distributes them across available workers for load
balancing.
+ *
+ * @param distributeContext the distribute context
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @return a map from worker to its assigned schema scan ranges
+ */
@Override
protected Map<DistributedPlanWorker, UninstancedScanSource>
multipleMachinesParallelization(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
@@ -56,6 +65,16 @@ public class UnassignedScanMetadataJob extends
AbstractUnassignedScanJob {
);
}
+ /**
+ * If no workers could be selected for the metadata scan (e.g. all
backends are
+ * unavailable), create a single empty instance on a random available
worker
+ * as a fallback to prevent query failure.
+ *
+ * @param assignedJobs the list produced by {@link
#insideMachineParallelization}
+ * @param workerManager the worker manager to select a fallback worker from
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @return the original list if non-empty, otherwise a single empty
instance
+ */
@Override
protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob>
assignedJobs,
DistributedPlanWorkerManager workerManager,
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
index fa72f8c0105..f6d1694d855 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
@@ -62,6 +62,16 @@ public class UnassignedScanSingleOlapTableJob extends
AbstractUnassignedScanJob
this.olapScanNode = olapScanNode;
}
+ /**
+ * Select a replica and its hosting worker for every tablet of the OLAP
scan node,
+ * without bucket awareness. Each tablet is assigned to the best available
backend
+ * holding a replica, and tablets on the same backend are grouped together.
+ *
+ * @param distributeContext the distribute context
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @return a map from worker to its assigned tablets (as {@link
UninstancedScanSource}),
+ * e.g. {@code {BackendWorker("172.0.0.1") ->
[tablet_10001..10004]}}
+ */
@Override
protected Map<DistributedPlanWorker, UninstancedScanSource>
multipleMachinesParallelization(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
@@ -78,6 +88,20 @@ public class UnassignedScanSingleOlapTableJob extends
AbstractUnassignedScanJob
);
}
+ /**
+ * For each worker, split its assigned tablets into one or more instances.
+ * When the fragment uses query cache and the tablet count exceeds the
threshold,
+ * a partition-based grouping strategy is attempted first: tablets
belonging to
+ * the same partition are kept within the same instance to reduce backend
+ * concurrency pressure during cache lookup. If partition-based grouping
is not
+ * applicable or fails, falls back to the default even-split strategy from
+ * {@link AbstractUnassignedScanJob#insideMachineParallelization}.
+ *
+ * @param workerToScanRanges map from worker to its un-instanced tablet
ranges
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @param distributeContext the distribute context for parallelism
configuration
+ * @return the list of assigned jobs, each bound to a worker with its
tablet portion
+ */
@Override
protected List<AssignedJob> insideMachineParallelization(
Map<DistributedPlanWorker, UninstancedScanSource>
workerToScanRanges,
@@ -225,6 +249,17 @@ public class UnassignedScanSingleOlapTableJob extends
AbstractUnassignedScanJob
return partitionToScanRanges;
}
+ /**
+ * If the normal parallelization produced an empty list (e.g. all tablets
have been
+ * pruned by TABLET() hint specifying a non-existent tablet), create a
single empty
+ * instance on a random worker so the fragment can still execute and
return an empty
+ * result set rather than failing.
+ *
+ * @param assignedJobs the list produced by {@link
#insideMachineParallelization}
+ * @param workerManager the worker manager to select a fallback worker from
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @return the original list if non-empty, otherwise a single empty
instance
+ */
@Override
protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob>
assignedJobs,
DistributedPlanWorkerManager workerManager,
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java
index bc98119d939..e9dfc735880 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java
@@ -48,6 +48,16 @@ public class UnassignedScanSingleRemoteTableJob extends
AbstractUnassignedScanJo
this.scanWorkerSelector = Objects.requireNonNull(scanWorkerSelector,
"scanWorkerSelector is not null");
}
+ /**
+ * Select a worker for each scan range of the external / remote table scan
node.
+ * For external tables (Hive, Iceberg, etc.), scan ranges represent file
splits
+ * rather than tablets, and workers are selected based on data locality or
+ * workload balancing.
+ *
+ * @param distributeContext the distribute context
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @return a map from worker to its assigned file scan ranges
+ */
@Override
protected Map<DistributedPlanWorker, UninstancedScanSource>
multipleMachinesParallelization(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
@@ -56,6 +66,16 @@ public class UnassignedScanSingleRemoteTableJob extends
AbstractUnassignedScanJo
);
}
+ /**
+ * If all file scan ranges have been pruned and the assigned job list is
empty,
+ * create a single empty instance on a random worker so the fragment can
still
+ * execute (returning an empty result) rather than failing.
+ *
+ * @param assignedJobs the list produced by {@link
#insideMachineParallelization}
+ * @param workerManager the worker manager to select a fallback worker from
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @return the original list if non-empty, otherwise a single empty
instance
+ */
@Override
protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob>
assignedJobs,
DistributedPlanWorkerManager workerManager,
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
index 27792eb288e..40c90563730 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
@@ -50,6 +50,22 @@ public class UnassignedShuffleJob extends
AbstractUnassignedJob {
super(statementContext, fragment, ImmutableList.of(),
exchangeToChildJob);
}
+ /**
+ * Compute assigned jobs for a shuffle (data redistribution) fragment.
+ * The instance count is determined by the parallelism of the largest child
+ * fragment. When the expected instance count is lower than the child count
+ * (e.g. due to session variable limits or query cache constraints),
workers
+ * are shuffled to spread instances across different backends for load
balancing.
+ * When more instances are needed, worker assignment follows the child
layout.
+ * <p>
+ * If {@code useSerialSource} is true, multiple local shuffle instances are
+ * created per worker to add intra-machine parallelism without rescanning
data.
+ *
+ * @param distributeContext the distribute context for worker selection
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs,
+ * used to determine the largest child fragment's
instance layout
+ * @return assigned shuffle jobs with workers selected from child fragment
layout
+ */
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java
index 6ded32e0cd9..ca3fe1c4aae 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java
@@ -42,6 +42,17 @@ public class UnassignedSpecifyInstancesJob extends
AbstractUnassignedJob {
this.specifyInstances = fragment.specifyInstances.get();
}
+ /**
+ * Compute assigned jobs by delegating to the fragment's
+ * {@link NereidsSpecifyInstances}. This is used when the fragment has
+ * pre-specified instance-to-worker mappings (e.g. from hints or
+ * statement-level instance specifications), bypassing the normal
+ * worker selection and parallelization logic.
+ *
+ * @param distributeContext the distribute context (forwarded to specify
instances)
+ * @param inputJobs multimap from child exchange nodes to their assigned
jobs
+ * @return assigned jobs built from the pre-specified instance layout
+ */
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]