This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new b6fb227402 [fix](nereids) fix cte bucket shuffle path (#22311)
b6fb227402 is described below
commit b6fb227402bd9a408da0845c8365297c635a009c
Author: xzj7019 <[email protected]>
AuthorDate: Thu Jul 27 22:44:51 2023 +0800
[fix](nereids) fix cte bucket shuffle path (#22311)
---
.../main/java/org/apache/doris/qe/Coordinator.java | 40 +++++++++++++++++++++-
1 file changed, 39 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 39c94320c7..f4a03f9c07 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1432,7 +1432,45 @@ public class Coordinator {
}
List<TPlanFragmentDestination> destinations =
multiSink.getDestinations().get(i);
- if (enablePipelineEngine &&
enableShareHashTableForBroadcastJoin
+ if (sink.getOutputPartition() != null
+ &&
sink.getOutputPartition().isBucketShuffleHashPartition()) {
+ // the destFragment must be bucket shuffle
+ Preconditions.checkState(bucketShuffleJoinController
+
.isBucketShuffleJoin(destFragment.getFragmentId().asInt()), "Sink is"
+ + "Bucket Shuffle Partition, The destFragment must
have bucket shuffle join node ");
+
+ int bucketSeq = 0;
+ int bucketNum =
bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId());
+
+ // when left table is empty, it's bucketset is empty.
+ // set right table destination address to the address of
left table
+ if (destParams.instanceExecParams.size() == 1 &&
(bucketNum == 0
+ ||
destParams.instanceExecParams.get(0).bucketSeqSet.isEmpty())) {
+ bucketNum = 1;
+
destParams.instanceExecParams.get(0).bucketSeqSet.add(0);
+ }
+ // process bucket shuffle join on fragment without scan
node
+ TNetworkAddress dummyServer = new
TNetworkAddress("0.0.0.0", 0);
+ while (bucketSeq < bucketNum) {
+ TPlanFragmentDestination dest = new
TPlanFragmentDestination();
+
+ dest.fragment_instance_id = new TUniqueId(-1, -1);
+ dest.server = dummyServer;
+ dest.setBrpcServer(dummyServer);
+
+ for (FInstanceExecParam instanceExecParams :
destParams.instanceExecParams) {
+ if
(instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
+ dest.fragment_instance_id =
instanceExecParams.instanceId;
+ dest.server =
toRpcHost(instanceExecParams.host);
+
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
+ break;
+ }
+ }
+
+ bucketSeq++;
+ destinations.add(dest);
+ }
+ } else if (enablePipelineEngine &&
enableShareHashTableForBroadcastJoin
&& ((ExchangeNode)
exchNode).isRightChildOfBroadcastHashJoin()) {
// here choose the first instance to build hash table.
Map<TNetworkAddress, FInstanceExecParam> destHosts = new
HashMap<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]