This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2ede4d7de19 [fix](join) Disable scan sharing for NAAJ (#39480)
2ede4d7de19 is described below
commit 2ede4d7de19235a727a73d8ade7b8478e38264bc
Author: Gabriel <[email protected]>
AuthorDate: Mon Aug 19 11:15:01 2024 +0800
[fix](join) Disable scan sharing for NAAJ (#39480)
---
.../main/java/org/apache/doris/qe/Coordinator.java | 20 +++++++++++---------
.../java/org/apache/doris/qe/CoordinatorTest.java | 20 ++++++++++----------
2 files changed, 21 insertions(+), 19 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 4ee46d3bec3..881cb7d096a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1876,10 +1876,11 @@ public class Coordinator implements CoordInterface {
if ((isColocateFragment(fragment, fragment.getPlanRoot())
&&
fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId())
&&
fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0)) {
- computeColocateJoinInstanceParam(fragment.getFragmentId(),
parallelExecInstanceNum, params);
+ computeColocateJoinInstanceParam(fragment.getFragmentId(),
parallelExecInstanceNum, params,
+ fragment.hasNullAwareLeftAntiJoin());
} else if
(bucketShuffleJoinController.isBucketShuffleJoin(fragment.getFragmentId().asInt()))
{
bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(),
- parallelExecInstanceNum, params);
+ parallelExecInstanceNum, params,
fragment.hasNullAwareLeftAntiJoin());
} else {
// case A
for (Entry<TNetworkAddress, Map<Integer,
List<TScanRangeParams>>> entry : fragmentExecParamsMap.get(
@@ -1904,7 +1905,8 @@ public class Coordinator implements CoordInterface {
int expectedInstanceNum =
Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
boolean forceToLocalShuffle = context != null
- &&
context.getSessionVariable().isForceToLocalShuffle();
+ &&
context.getSessionVariable().isForceToLocalShuffle()
+ && !fragment.hasNullAwareLeftAntiJoin();
boolean ignoreStorageDataDistribution =
forceToLocalShuffle || (node.isPresent()
&&
node.get().ignoreStorageDataDistribution(context, addressToBackendID.size())
&& useNereids);
@@ -2072,9 +2074,9 @@ public class Coordinator implements CoordInterface {
}
private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId,
- int parallelExecInstanceNum, FragmentExecParams params) {
+ int parallelExecInstanceNum, FragmentExecParams params, boolean
hasNullAwareLeftAntiJoin) {
assignScanRanges(fragmentId, parallelExecInstanceNum, params,
fragmentIdTobucketSeqToScanRangeMap,
- fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds);
+ fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds,
hasNullAwareLeftAntiJoin);
}
private Map<TNetworkAddress, Long> getReplicaNumPerHostForOlapTable() {
@@ -2689,16 +2691,16 @@ public class Coordinator implements CoordInterface {
}
private void computeInstanceParam(PlanFragmentId fragmentId,
- int parallelExecInstanceNum, FragmentExecParams params) {
+ int parallelExecInstanceNum, FragmentExecParams params,
boolean hasNullAwareLeftAntiJoin) {
assignScanRanges(fragmentId, parallelExecInstanceNum, params,
fragmentIdBucketSeqToScanRangeMap,
- fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds);
+ fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds,
hasNullAwareLeftAntiJoin);
}
}
private void assignScanRanges(PlanFragmentId fragmentId, int
parallelExecInstanceNum, FragmentExecParams params,
Map<PlanFragmentId, BucketSeqToScanRange>
fragmentIdBucketSeqToScanRangeMap,
Map<PlanFragmentId, Map<Integer, TNetworkAddress>>
curFragmentIdToSeqToAddressMap,
- Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds) {
+ Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds, boolean
hasNullAwareLeftAntiJoin) {
Map<Integer, TNetworkAddress> bucketSeqToAddress =
curFragmentIdToSeqToAddressMap.get(fragmentId);
BucketSeqToScanRange bucketSeqToScanRange =
fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
@@ -2732,7 +2734,7 @@ public class Coordinator implements CoordInterface {
* 2. Use Nereids planner.
*/
boolean forceToLocalShuffle = context != null
- && context.getSessionVariable().isForceToLocalShuffle();
+ && context.getSessionVariable().isForceToLocalShuffle() &&
!hasNullAwareLeftAntiJoin;
boolean ignoreStorageDataDistribution = forceToLocalShuffle ||
(scanNodes.stream()
.allMatch(node -> node.ignoreStorageDataDistribution(context,
addressToBackendID.size()))
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index e249b3d87d2..b6632a39db7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -123,7 +123,7 @@ public class CoordinatorTest extends Coordinator {
Deencapsulation.setField(coordinator,
"fragmentIdTobucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
FragmentExecParams params = new FragmentExecParams(null);
- Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 1, params);
+ Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 1, params, false);
Assert.assertEquals(1, params.instanceExecParams.size());
// check whether one instance have 3 tablet to scan
@@ -134,15 +134,15 @@ public class CoordinatorTest extends Coordinator {
}
params = new FragmentExecParams(null);
- Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 2, params);
+ Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 2, params, false);
Assert.assertEquals(2, params.instanceExecParams.size());
params = new FragmentExecParams(null);
- Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 3, params);
+ Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 3, params, false);
Assert.assertEquals(3, params.instanceExecParams.size());
params = new FragmentExecParams(null);
- Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 5, params);
+ Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 5, params, false);
Assert.assertEquals(3, params.instanceExecParams.size());
}
@@ -324,7 +324,7 @@ public class CoordinatorTest extends Coordinator {
PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode,
new DataPartition(TPartitionType.UNPARTITIONED));
FragmentExecParams params = new FragmentExecParams(fragment);
- Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 1, params);
+ Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 1, params, false);
StringBuilder sb = new StringBuilder();
params.appendTo(sb);
Assert.assertTrue(sb.toString().contains("range=[id1,range=[]]"));
@@ -452,19 +452,19 @@ public class CoordinatorTest extends Coordinator {
Deencapsulation.setField(bucketShuffleJoinController,
"fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
FragmentExecParams params = new FragmentExecParams(null);
- Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 1, params);
+ Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 1, params, false);
Assert.assertEquals(1, params.instanceExecParams.size());
params = new FragmentExecParams(null);
- Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 2, params);
+ Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 2, params, false);
Assert.assertEquals(2, params.instanceExecParams.size());
params = new FragmentExecParams(null);
- Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 3, params);
+ Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 3, params, false);
Assert.assertEquals(3, params.instanceExecParams.size());
params = new FragmentExecParams(null);
- Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 5, params);
+ Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 5, params, false);
Assert.assertEquals(3, params.instanceExecParams.size());
}
@@ -506,7 +506,7 @@ public class CoordinatorTest extends Coordinator {
new DataPartition(TPartitionType.UNPARTITIONED));
FragmentExecParams params = new FragmentExecParams(fragment);
- Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 1, params);
+ Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 1, params, false);
Assert.assertEquals(1, params.instanceExecParams.size());
StringBuilder sb = new StringBuilder();
params.appendTo(sb);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]