This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 27ee6ed1bd6 branch-3.0: [fix](coordinator) Fix wrong bucket assginment
in old-version coordin… #44539 (#44571)
27ee6ed1bd6 is described below
commit 27ee6ed1bd62fcfe93682c1d9433b24f77d544e9
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Nov 26 10:51:36 2024 +0800
branch-3.0: [fix](coordinator) Fix wrong bucket assginment in old-version
coordin… #44539 (#44571)
Cherry-picked from #44539
Co-authored-by: Gabriel <[email protected]>
---
.../main/java/org/apache/doris/qe/Coordinator.java | 95 +++++++++++-----------
1 file changed, 47 insertions(+), 48 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 139996ce9ab..8be50772094 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1437,31 +1437,8 @@ public class Coordinator implements CoordInterface {
destParams.instanceExecParams.get(0).bucketSeqSet.add(0);
}
// process bucket shuffle join on fragment without scan node
- TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0",
0);
while (bucketSeq < bucketNum) {
- TPlanFragmentDestination dest = new
TPlanFragmentDestination();
-
- dest.fragment_instance_id = new TUniqueId(-1, -1);
- dest.server = dummyServer;
- dest.setBrpcServer(dummyServer);
-
- Set<TNetworkAddress> hostSet = new HashSet<>();
- for (int insIdx = 0; insIdx <
destParams.instanceExecParams.size(); insIdx++) {
- FInstanceExecParam instanceExecParams =
destParams.instanceExecParams.get(insIdx);
- if (destParams.ignoreDataDistribution
- && hostSet.contains(instanceExecParams.host)) {
- continue;
- }
- hostSet.add(instanceExecParams.host);
- if
(instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
- dest.fragment_instance_id =
instanceExecParams.instanceId;
- dest.server = toRpcHost(instanceExecParams.host);
-
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
- instanceExecParams.recvrId =
params.destinations.size();
- break;
- }
- }
-
+ TPlanFragmentDestination dest = setDestination(destParams,
params.destinations.size(), bucketSeq);
bucketSeq++;
params.destinations.add(dest);
}
@@ -1508,6 +1485,50 @@ public class Coordinator implements CoordInterface {
}
}
+ private TPlanFragmentDestination setDestination(FragmentExecParams
destParams, int recvrId, int bucketSeq)
+ throws Exception {
+ TPlanFragmentDestination dest = new TPlanFragmentDestination();
+ TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);
+ dest.fragment_instance_id = new TUniqueId(-1, -1);
+ dest.server = dummyServer;
+ dest.setBrpcServer(dummyServer);
+
+ if (destParams.ignoreDataDistribution) {
+ Map<TNetworkAddress, Pair<TUniqueId, Set<Integer>>>
hostToInstanceIdAndBucketSeq
+ = new HashMap<>();
+ for (int insIdx = 0; insIdx <
destParams.instanceExecParams.size(); insIdx++) {
+ FInstanceExecParam instanceExecParams =
destParams.instanceExecParams.get(insIdx);
+
hostToInstanceIdAndBucketSeq.putIfAbsent(instanceExecParams.host,
+ Pair.of(instanceExecParams.instanceId, new
HashSet<>()));
+
hostToInstanceIdAndBucketSeq.get(instanceExecParams.host).second.addAll(
+ instanceExecParams.bucketSeqSet);
+ }
+ for (int insIdx = 0; insIdx <
destParams.instanceExecParams.size(); insIdx++) {
+ FInstanceExecParam instanceExecParams =
destParams.instanceExecParams.get(insIdx);
+ if
(hostToInstanceIdAndBucketSeq.get(instanceExecParams.host).second.contains(bucketSeq))
{
+ dest.fragment_instance_id =
hostToInstanceIdAndBucketSeq.get(instanceExecParams.host)
+ .first;
+ dest.server = toRpcHost(instanceExecParams.host);
+ dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
+ instanceExecParams.recvrId = recvrId;
+ break;
+ }
+ }
+ } else {
+ for (int insIdx = 0; insIdx <
destParams.instanceExecParams.size(); insIdx++) {
+ FInstanceExecParam instanceExecParams =
destParams.instanceExecParams.get(insIdx);
+ if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
+ dest.fragment_instance_id = instanceExecParams.instanceId;
+ dest.server = toRpcHost(instanceExecParams.host);
+ dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
+ instanceExecParams.recvrId = recvrId;
+ break;
+ }
+ }
+ }
+ return dest;
+ }
+
private void computeMultiCastFragmentParams() throws Exception {
for (FragmentExecParams params : fragmentExecParamsMap.values()) {
if (!(params.fragment instanceof MultiCastPlanFragment)) {
@@ -1560,31 +1581,9 @@ public class Coordinator implements CoordInterface {
destParams.instanceExecParams.get(0).bucketSeqSet.add(0);
}
// process bucket shuffle join on fragment without scan
node
- TNetworkAddress dummyServer = new
TNetworkAddress("0.0.0.0", 0);
while (bucketSeq < bucketNum) {
- TPlanFragmentDestination dest = new
TPlanFragmentDestination();
-
- dest.fragment_instance_id = new TUniqueId(-1, -1);
- dest.server = dummyServer;
- dest.setBrpcServer(dummyServer);
-
- Set<TNetworkAddress> hostSet = new HashSet<>();
- for (int insIdx = 0; insIdx <
destParams.instanceExecParams.size(); insIdx++) {
- FInstanceExecParam instanceExecParams =
destParams.instanceExecParams.get(insIdx);
- if (destParams.ignoreDataDistribution
- &&
hostSet.contains(instanceExecParams.host)) {
- continue;
- }
- hostSet.add(instanceExecParams.host);
- if
(instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
- dest.fragment_instance_id =
instanceExecParams.instanceId;
- dest.server =
toRpcHost(instanceExecParams.host);
-
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
- instanceExecParams.recvrId =
params.destinations.size();
- break;
- }
- }
-
+ TPlanFragmentDestination dest =
setDestination(destParams, params.destinations.size(),
+ bucketSeq);
bucketSeq++;
destinations.add(dest);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]