This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new a5c35eb [Bug] Fix the bug of null pointer exception of colocate join
(#5961)
a5c35eb is described below
commit a5c35eb20f3a10834f2920bf45515b2f4e411edf
Author: HappenLee <[email protected]>
AuthorDate: Thu Jun 3 21:19:58 2021 -0500
[Bug] Fix the bug of null pointer exception of colocate join (#5961)
---
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 7 +++++--
.../src/test/java/org/apache/doris/qe/CoordinatorTest.java | 9 +++++++--
2 files changed, 12 insertions(+), 4 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 4b0018f..cb93a09 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1109,7 +1109,6 @@ public class Coordinator {
}
}
- // One fragment could only have one HashJoinNode
private boolean isColocateJoin(PlanNode node) {
// TODO(cmy): some internal process, such as broker load task, do not
have ConnectContext.
// Any configurations needed by the Coordinator should be passed in
Coordinator initialization.
@@ -1177,6 +1176,7 @@ public class Coordinator {
private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId,
int parallelExecInstanceNum, FragmentExecParams params) {
Map<Integer, TNetworkAddress> bucketSeqToAddress =
fragmentIdToSeqToAddressMap.get(fragmentId);
+ BucketSeqToScanRange bucketSeqToScanRange =
fragmentIdTobucketSeqToScanRangeMap.get(fragmentId);
Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
// 1. count each node in one fragment should scan how many tablet,
gather them in one list
@@ -1282,8 +1282,11 @@ public class Coordinator {
final OlapScanNode scanNode) throws Exception {
if
(!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new
HashedMap());
+ fragmentIdTobucketSeqToScanRangeMap.put(scanNode.getFragmentId(),
new BucketSeqToScanRange());
}
Map<Integer, TNetworkAddress> bucketSeqToAddress =
fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
+ BucketSeqToScanRange bucketSeqToScanRange =
fragmentIdTobucketSeqToScanRangeMap.get(scanNode.getFragmentId());
+
HashMap<TNetworkAddress, Long> assignedBytesPerHost =
Maps.newHashMap();
for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) {
//fill scanRangeParamsList
@@ -1708,7 +1711,7 @@ public class Coordinator {
}
}
- private BucketSeqToScanRange bucketSeqToScanRange = new
BucketSeqToScanRange();
+ private Map<PlanFragmentId, BucketSeqToScanRange>
fragmentIdTobucketSeqToScanRangeMap = Maps.newHashMap();
private Map<PlanFragmentId, Map<Integer, TNetworkAddress>>
fragmentIdToSeqToAddressMap = Maps.newHashMap();
// cache the fragment id to its scan node ids. Used for colocate join.
private Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds =
Maps.newHashMap();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index 41e1755..97143fa 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -108,6 +108,7 @@ public class CoordinatorTest extends Coordinator {
Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap",
fragmentToBucketSeqToAddress);
// 2. set bucketSeqToScanRange in coordinator
+ Map<PlanFragmentId, BucketSeqToScanRange>
fragmentIdBucketSeqToScanRangeMap = new HashMap<>();
BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>();
List<TScanRangeParams> scanRangeParamsList = new ArrayList<>();
@@ -117,7 +118,8 @@ public class CoordinatorTest extends Coordinator {
for (int i = 0; i < 3; i++) {
bucketSeqToScanRange.put(i, ScanRangeMap);
}
- Deencapsulation.setField(coordinator, "bucketSeqToScanRange",
bucketSeqToScanRange);
+ fragmentIdBucketSeqToScanRangeMap.put(planFragmentId,
bucketSeqToScanRange);
+ Deencapsulation.setField(coordinator,
"fragmentIdTobucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
FragmentExecParams params = new FragmentExecParams(null);
Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 1, params);
@@ -293,13 +295,16 @@ public class CoordinatorTest extends Coordinator {
Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap",
fragmentToBucketSeqToAddress);
// 2. set bucketSeqToScanRange in coordinator
+ Map<PlanFragmentId, BucketSeqToScanRange>
fragmentIdBucketSeqToScanRangeMap = new HashMap<>();
BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>();
ScanRangeMap.put(scanNodeId, new ArrayList<>());
for (int i = 0; i < 3; i++) {
bucketSeqToScanRange.put(i, ScanRangeMap);
}
- Deencapsulation.setField(coordinator, "bucketSeqToScanRange",
bucketSeqToScanRange);
+ fragmentIdBucketSeqToScanRangeMap.put(planFragmentId,
bucketSeqToScanRange);
+ Deencapsulation.setField(coordinator,
"fragmentIdTobucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
+
TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
OlapScanNode olapScanNode = new OlapScanNode(new
PlanNodeId(scanNodeId), tupleDescriptor, "test");
PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]