This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c993663827 [fix](nereids) fix cte as bc right side hang bug (#21897)
c993663827 is described below
commit c993663827df871949c959bd99210c3be5cbf8a3
Author: xzj7019 <[email protected]>
AuthorDate: Wed Jul 19 09:43:31 2023 +0800
[fix](nereids) fix cte as bc right side hang bug (#21897)
During original computeMultiCastFragmentParams process, we don't handle the
scenario the cte as the broadcast right side, which will lead the missing
setting of the buildHashTableForBroadcastJoin flag true and finally the sql
hang.
---
.../main/java/org/apache/doris/qe/Coordinator.java | 37 ++++++++++++++++++----
1 file changed, 31 insertions(+), 6 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index a81bd1984b..5ccbbcaa1e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1418,12 +1418,37 @@ public class Coordinator {
params.instanceExecParams.size() +
destParams.perExchNumSenders.get(exchId.asInt()));
}
- for (int j = 0; j < destParams.instanceExecParams.size(); ++j)
{
- TPlanFragmentDestination dest = new
TPlanFragmentDestination();
- dest.fragment_instance_id =
destParams.instanceExecParams.get(j).instanceId;
- dest.server =
toRpcHost(destParams.instanceExecParams.get(j).host);
- dest.brpc_server =
toBrpcHost(destParams.instanceExecParams.get(j).host);
- multiSink.getDestinations().get(i).add(dest);
+ List<TPlanFragmentDestination> destinations =
multiSink.getDestinations().get(i);
+ if (enablePipelineEngine &&
enableShareHashTableForBroadcastJoin
+ && params.fragment.isRightChildOfBroadcastHashJoin()) {
+ // here choose the first instance to build hash table.
+ Map<TNetworkAddress, FInstanceExecParam> destHosts = new
HashMap<>();
+
+ destParams.instanceExecParams.forEach(param -> {
+ if (destHosts.containsKey(param.host)) {
+
destHosts.get(param.host).instancesSharingHashTable.add(param.instanceId);
+ } else {
+ destHosts.put(param.host, param);
+ param.buildHashTableForBroadcastJoin = true;
+ TPlanFragmentDestination dest = new
TPlanFragmentDestination();
+ dest.fragment_instance_id = param.instanceId;
+ try {
+ dest.server = toRpcHost(param.host);
+ dest.setBrpcServer(toBrpcHost(param.host));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ destinations.add(dest);
+ }
+ });
+ } else {
+ for (int j = 0; j < destParams.instanceExecParams.size();
++j) {
+ TPlanFragmentDestination dest = new
TPlanFragmentDestination();
+ dest.fragment_instance_id =
destParams.instanceExecParams.get(j).instanceId;
+ dest.server =
toRpcHost(destParams.instanceExecParams.get(j).host);
+ dest.brpc_server =
toBrpcHost(destParams.instanceExecParams.get(j).host);
+ destinations.add(dest);
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]