This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f53bb5d7ff7 branch-3.0: [fix](coordinator) Fix wrong bucket
assignments by coordinator #45365 (#45401)
f53bb5d7ff7 is described below
commit f53bb5d7ff72f917db8e89d5cf51f2e7ed840352
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Dec 13 17:10:24 2024 +0800
branch-3.0: [fix](coordinator) Fix wrong bucket assignments by coordinator
#45365 (#45401)
Cherry-picked from #45365
Co-authored-by: Gabriel <[email protected]>
---
.../main/java/org/apache/doris/qe/Coordinator.java | 85 +++++++++++-----
.../test_colocate_join_with_different_tablets.out | 12 +++
...est_colocate_join_with_different_tablets.groovy | 109 +++++++++++++++++++++
3 files changed, 182 insertions(+), 24 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 0f6bc0212d1..2889e943e93 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2734,7 +2734,8 @@ public class Coordinator implements CoordInterface {
* 1. `parallelExecInstanceNum * numBackends` is larger than scan
ranges.
* 2. Use Nereids planner.
*/
- boolean ignoreStorageDataDistribution = params.fragment != null &&
params.fragment.useSerialSource(context);
+ boolean ignoreStorageDataDistribution = scanNodes != null &&
!scanNodes.isEmpty()
+ && params.fragment != null &&
params.fragment.useSerialSource(context);
FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer,
List<TScanRangeParams>>>>> addressScanRange
@@ -2744,33 +2745,69 @@ public class Coordinator implements CoordInterface {
= findOrInsert(assignment, addressScanRange.getKey(), new
HashMap<>());
if (ignoreStorageDataDistribution) {
- FInstanceExecParam instanceParam = new FInstanceExecParam(
- null, addressScanRange.getKey(), 0, params);
-
- for (Pair<Integer, Map<Integer, List<TScanRangeParams>>>
nodeScanRangeMap : scanRange) {
- for (Map.Entry<Integer, List<TScanRangeParams>>
nodeScanRange
- : nodeScanRangeMap.second.entrySet()) {
- if
(!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
- range.put(nodeScanRange.getKey(),
Lists.newArrayList());
-
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(),
Lists.newArrayList());
+ List<List<Pair<Integer, Map<Integer,
List<TScanRangeParams>>>>> perInstanceScanRanges
+ = ListUtil.splitBySize(scanRange,
parallelExecInstanceNum);
+ /**
+ * Split scan ranges evenly into `parallelExecInstanceNum`
instances.
+ *
+ *
+ * For a fragment contains co-located join,
+ *
+ * scan (id = 0) -> join build (id = 2)
+ * |
+ * scan (id = 1) -> join probe (id = 2)
+ *
+ * If both of `scan (id = 0)` and `scan (id = 1)` are serial
operators, we will plan local exchanger
+ * after them:
+ *
+ * scan (id = 0) -> local exchange -> join build (id = 2)
+ * |
+ * scan (id = 1) -> local exchange -> join probe (id = 2)
+ *
+ *
+ * And there is another more complicated scenario, for
example, `scan (id = 0)` has 10 partitions and
+ * 3 buckets which means 3 * 10 tablets and `scan (id = 1)`
has 3 buckets and no partition which means
+ * 3 tablets totally. If expected parallelism is 8, we will
get a serial scan (id = 0) and a
+ * non-serial scan (id = 1). For this case, we will plan
another plan with local exchange:
+ *
+ * scan (id = 0) -> local exchange -> join build (id = 2)
+ * |
+ * scan (id = 1) -> join probe (id = 2)
+ */
+ FInstanceExecParam firstInstanceParam = null;
+ for (List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>
perInstanceScanRange
+ : perInstanceScanRanges) {
+ FInstanceExecParam instanceParam = new FInstanceExecParam(
+ null, addressScanRange.getKey(), 0, params);
+
+ if (firstInstanceParam == null) {
+ firstInstanceParam = instanceParam;
+ }
+ for (Pair<Integer, Map<Integer, List<TScanRangeParams>>>
nodeScanRangeMap : perInstanceScanRange) {
+ instanceParam.addBucketSeq(nodeScanRangeMap.first);
+ for (Map.Entry<Integer, List<TScanRangeParams>>
nodeScanRange
+ : nodeScanRangeMap.second.entrySet()) {
+ int scanId = nodeScanRange.getKey();
+ Optional<ScanNode> node =
scanNodes.stream().filter(
+ scanNode -> scanNode.getId().asInt() ==
scanId).findFirst();
+ Preconditions.checkArgument(node.isPresent());
+ FInstanceExecParam instanceParamToScan =
node.get().isSerialOperator()
+ ? firstInstanceParam : instanceParam;
+ if
(!instanceParamToScan.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
+ range.put(nodeScanRange.getKey(),
Lists.newArrayList());
+ instanceParamToScan.perNodeScanRanges
+ .put(nodeScanRange.getKey(),
Lists.newArrayList());
+ }
+
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
+
instanceParamToScan.perNodeScanRanges.get(nodeScanRange.getKey())
+ .addAll(nodeScanRange.getValue());
}
-
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
-
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey())
- .addAll(nodeScanRange.getValue());
}
+ params.instanceExecParams.add(instanceParam);
}
- List<FInstanceExecParam> instanceExecParams = new
ArrayList<>();
- instanceExecParams.add(instanceParam);
- for (int i = 1; i < parallelExecInstanceNum; i++) {
- instanceExecParams.add(new FInstanceExecParam(
- null, addressScanRange.getKey(), 0, params));
- }
- int index = 0;
- for (Pair<Integer, Map<Integer, List<TScanRangeParams>>>
nodeScanRangeMap : scanRange) {
- instanceExecParams.get(index %
instanceExecParams.size()).addBucketSeq(nodeScanRangeMap.first);
- index++;
+ for (int i = perInstanceScanRanges.size(); i <
parallelExecInstanceNum; i++) {
+ params.instanceExecParams.add(new FInstanceExecParam(null,
addressScanRange.getKey(), 0, params));
}
- params.instanceExecParams.addAll(instanceExecParams);
} else {
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
diff --git
a/regression-test/data/correctness_p0/test_colocate_join_with_different_tablets.out
b/regression-test/data/correctness_p0/test_colocate_join_with_different_tablets.out
new file mode 100644
index 00000000000..06c381356ea
--- /dev/null
+++
b/regression-test/data/correctness_p0/test_colocate_join_with_different_tablets.out
@@ -0,0 +1,12 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+10000007 10000007 48414 10000007
+10000007 10000007 48414 10000007
+10000007 10000007 48414 10000007
+10000007 10000007 60426 10000007
+10000007 10000007 60426 10000007
+10000007 10000007 60426 10000007
+10000007 10000007 94460 10000007
+10000007 10000007 94460 10000007
+10000007 10000007 94460 10000007
+
diff --git
a/regression-test/suites/correctness_p0/test_colocate_join_with_different_tablets.groovy
b/regression-test/suites/correctness_p0/test_colocate_join_with_different_tablets.groovy
new file mode 100644
index 00000000000..66183591686
--- /dev/null
+++
b/regression-test/suites/correctness_p0/test_colocate_join_with_different_tablets.groovy
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_colocate_join_with_different_tablets") {
+ sql """
+ DROP TABLE IF EXISTS `USR_V_KHZHSJ_ES_POC1`;
+ DROP TABLE IF EXISTS `USR_TLBL_VAL_R1`;
+ CREATE TABLE `USR_V_KHZHSJ_ES_POC1` (
+ `khid` bigint NULL,
+ `khh` bigint NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`khid`, `khh`)
+ DISTRIBUTED BY HASH(`khid`) BUCKETS 16
+ PROPERTIES (
+ "colocate_with" = "test_colocate_join_with_different_tabletsgroup1",
+ "replication_allocation" = "tag.location.default: 1"
+ );
+
+
+ CREATE TABLE `USR_TLBL_VAL_R1` (
+ `lbl_id` bigint NOT NULL COMMENT "标签ID",
+ `khh` bigint NULL COMMENT "客户号"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`lbl_id`, `khh`)
+ COMMENT '标签结果日表'
+ PARTITION BY LIST (`lbl_id`)
+ (PARTITION p0 VALUES IN ("0"),
+ PARTITION p1 VALUES IN ("1"),
+ PARTITION p29 VALUES IN ("29"),
+ PARTITION p35 VALUES IN ("35"),
+ PARTITION p57 VALUES IN ("57"),
+ PARTITION p352 VALUES IN ("352"),
+ PARTITION p402 VALUES IN ("402"),
+ PARTITION p523 VALUES IN ("523"),
+ PARTITION p2347 VALUES IN ("2347"),
+ PARTITION p10376 VALUES IN ("10376"),
+ PARTITION p42408 VALUES IN ("42408"),
+ PARTITION p44410 VALUES IN ("44410"),
+ PARTITION p48414 VALUES IN ("48414"),
+ PARTITION p50416 VALUES IN ("50416"),
+ PARTITION p52418 VALUES IN ("52418"),
+ PARTITION p56422 VALUES IN ("56422"),
+ PARTITION p60426 VALUES IN ("60426"),
+ PARTITION p64430 VALUES IN ("64430"),
+ PARTITION p66432 VALUES IN ("66432"),
+ PARTITION p70436 VALUES IN ("70436"),
+ PARTITION p72438 VALUES IN ("72438"),
+ PARTITION p74440 VALUES IN ("74440"),
+ PARTITION p78444 VALUES IN ("78444"),
+ PARTITION p84450 VALUES IN ("84450"),
+ PARTITION p86452 VALUES IN ("86452"),
+ PARTITION p88454 VALUES IN ("88454"),
+ PARTITION p90456 VALUES IN ("90456"),
+ PARTITION p92458 VALUES IN ("92458"),
+ PARTITION p94460 VALUES IN ("94460"),
+ PARTITION p96462 VALUES IN ("96462"),
+ PARTITION p98464 VALUES IN ("98464"),
+ PARTITION p100466 VALUES IN ("100466"),
+ PARTITION p102468 VALUES IN ("102468"),
+ PARTITION p104470 VALUES IN ("104470"),
+ PARTITION p106472 VALUES IN ("106472"),
+ PARTITION p108474 VALUES IN ("108474"),
+ PARTITION p110476 VALUES IN ("110476"),
+ PARTITION p112478 VALUES IN ("112478"),
+ PARTITION p114480 VALUES IN ("114480"),
+ PARTITION p122488 VALUES IN ("122488"),
+ PARTITION p124490 VALUES IN ("124490"),
+ PARTITION p126492 VALUES IN ("126492"),
+ PARTITION p130496 VALUES IN ("130496"),
+ PARTITION p134500 VALUES IN ("134500"),
+ PARTITION p150516 VALUES IN ("150516"),
+ PARTITION p154520 VALUES IN ("154520"),
+ PARTITION p158524 VALUES IN ("158524"),
+ PARTITION p158525 VALUES IN ("158525"),
+ PARTITION p1848141 VALUES IN ("1848141"),
+ PARTITION p1848161 VALUES IN ("1848161"),
+ PARTITION p1848177 VALUES IN ("1848177"),
+ PARTITION p1848197 VALUES IN ("1848197"),
+ PARTITION p1848218 VALUES IN ("1848218"))
+ DISTRIBUTED BY HASH(`khh`) BUCKETS 16
+ PROPERTIES (
+ "colocate_with" = "test_colocate_join_with_different_tabletsgroup1",
+ "replication_allocation" = "tag.location.default: 1"
+ );
+
+ insert into USR_V_KHZHSJ_ES_POC1 values(10000007, 10000007);
+ insert into USR_V_KHZHSJ_ES_POC1 values(10000007, 10000007);
+ insert into USR_V_KHZHSJ_ES_POC1 values(10000007, 10000007);
+
+ insert into USR_TLBL_VAL_R1 values(48414, 10000007);
+ insert into USR_TLBL_VAL_R1 values(94460, 10000007);
+ insert into USR_TLBL_VAL_R1 values(60426, 10000007);
+ """
+ qt_sql """ select * from USR_V_KHZHSJ_ES_POC1 A,USR_TLBL_VAL_R1 B
WHERE A.khid = B.khh order by lbl_id; """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]