This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0493eb1 [Optimize] optimize host selection strategy (#4914)
0493eb1 is described below
commit 0493eb172fee42b36f8838a4c52af8f19f53af0b
Author: xinghuayu007 <[email protected]>
AuthorDate: Sat Nov 28 09:48:13 2020 +0800
[Optimize] optimize host selection strategy (#4914)
When a tablet selects which replica's host to execute scan operation,
it takes `round-robin` strategy to load balance. `minAssignedBytes` is the
current load of one host.
If a backend is not alive momently, it will randomly take one of other
replicas as the choice,
but the unalive backend's `minAssignedBytes` not be descreased and the new
choice's `minAssignedBytes`
also not be increased. That will make the real load of the backends not
correct.
---
.../main/java/org/apache/doris/qe/Coordinator.java | 11 +++
.../java/org/apache/doris/qe/CoordinatorTest.java | 82 ++++++++++++++++++++++
2 files changed, 93 insertions(+)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 8a943f0..1f6ee87 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1308,6 +1308,17 @@ public class Coordinator {
Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostPort =
SimpleScheduler.getHost(minLocation.backend_id,
scanRangeLocations.getLocations(), this.idToBackend,
backendIdRef);
+ if (!execHostPort.hostname.equals(minLocation.server.hostname) ||
+ execHostPort.port != minLocation.server.port) {
+ assignedBytesPerHost.put(minLocation.server,
+ assignedBytesPerHost.get(minLocation.server) -
scanRangeLength);
+ Long id = assignedBytesPerHost.get(execHostPort);
+ if (id == null) {
+ assignedBytesPerHost.put(execHostPort, 0L);
+ } else {
+ assignedBytesPerHost.put(execHostPort, id+scanRangeLength);
+ }
+ }
this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
Map<Integer, List<TScanRangeParams>> scanRanges =
findOrInsert(assignment, execHostPort,
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index eb1bad3..b2f780f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -387,5 +387,87 @@ public class CoordinatorTest extends Coordinator {
Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 5, params);
Assert.assertEquals(3, params.instanceExecParams.size());
}
+
+ @Test
+ public void testComputeScanRangeAssignmentByScheduler() {
+ Coordinator coordinator = new Coordinator(context, analyzer, planner);
+ PlanFragmentId planFragmentId = new PlanFragmentId(1);
+ int scanNodeId = 1;
+ Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new
HashMap<>();
+ fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
+ fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
+
+ TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
+ OlapTable olapTable = new OlapTable();
+ HashDistributionInfo hashDistributionInfo = new
HashDistributionInfo(66, new ArrayList<>());
+ Deencapsulation.setField(olapTable, "defaultDistributionInfo",
hashDistributionInfo);
+ tupleDescriptor.setTable(olapTable);
+
+ OlapScanNode olapScanNode = new OlapScanNode(new
PlanNodeId(scanNodeId), tupleDescriptor, "test");
+ // each olaptable bucket have the same TScanRangeLocations, be id is
{0, 1, 2}
+ TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
+ TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
+ tScanRangeLocation0.backend_id = 0;
+ tScanRangeLocation0.server = new TNetworkAddress("0.0.0.0", 9050);
+ TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
+ tScanRangeLocation1.backend_id = 1;
+ tScanRangeLocation1.server = new TNetworkAddress("0.0.0.1", 9050);
+ TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
+ tScanRangeLocation2.backend_id = 2;
+ tScanRangeLocation2.server = new TNetworkAddress("0.0.0.2", 9050);
+ tScanRangeLocations.locations = new ArrayList<>();
+ tScanRangeLocations.locations.add(tScanRangeLocation0);
+ tScanRangeLocations.locations.add(tScanRangeLocation1);
+ tScanRangeLocations.locations.add(tScanRangeLocation2);
+
+ TScanRangeLocations tScanRangeLocations1 = new TScanRangeLocations();
+ TScanRangeLocation tScanRangeLocation3 = new TScanRangeLocation();
+ tScanRangeLocation3.backend_id = 0;
+ tScanRangeLocation3.server = new TNetworkAddress("0.0.0.0", 9050);
+ TScanRangeLocation tScanRangeLocation4 = new TScanRangeLocation();
+ tScanRangeLocation4.backend_id = 1;
+ tScanRangeLocation4.server = new TNetworkAddress("0.0.0.1", 9050);
+ TScanRangeLocation tScanRangeLocation5 = new TScanRangeLocation();
+ tScanRangeLocation5.backend_id = 2;
+ tScanRangeLocation5.server = new TNetworkAddress("0.0.0.2", 9050);
+ tScanRangeLocations1.locations = new ArrayList<>();
+ tScanRangeLocations1.locations.add(tScanRangeLocation3);
+ tScanRangeLocations1.locations.add(tScanRangeLocation4);
+ tScanRangeLocations1.locations.add(tScanRangeLocation5);
+
+ olapScanNode.setFragment(new PlanFragment(planFragmentId, olapScanNode,
+ new DataPartition(TPartitionType.UNPARTITIONED)));
+
+ // init all backend
+ Backend backend0 = new Backend(0, "0.0.0.0", 9060);
+ backend0.setAlive(false);
+ backend0.setBePort(9050);
+ Backend backend1 = new Backend(1, "0.0.0.1", 9060);
+ backend1.setAlive(true);
+ backend1.setBePort(9050);
+ Backend backend2 = new Backend(2, "0.0.0.2", 9060);
+ backend2.setAlive(true);
+ backend2.setBePort(9050);
+
+ ImmutableMap<Long, Backend> idToBackend =
+ new ImmutableMap.Builder<Long, Backend>().
+ put(0l, backend0).
+ put(1l, backend1).
+ put(2l, backend2).build();
+ Deencapsulation.setField(coordinator, "idToBackend", idToBackend);
+ FragmentScanRangeAssignment assignment = new
FragmentScanRangeAssignment();
+ List<TScanRangeLocations> locations = new ArrayList<>();
+ locations.add(tScanRangeLocations);
+ locations.add(tScanRangeLocations1);
+ Deencapsulation.invoke(coordinator,
"computeScanRangeAssignmentByScheduler",
+ olapScanNode, locations, assignment);
+ for (Map.Entry entry:assignment.entrySet()) {
+ Map<Integer, List<TScanRangeParams>> addr = (HashMap<Integer,
List<TScanRangeParams>>) entry.getValue();
+ for (Map.Entry item:addr.entrySet()) {
+ List<TScanRangeParams> params = (List<TScanRangeParams>)
item.getValue();
+ Assert.assertTrue(params.size() == 2);
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]