This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0493eb1  [Optimize] optimize host selection strategy (#4914)
0493eb1 is described below

commit 0493eb172fee42b36f8838a4c52af8f19f53af0b
Author: xinghuayu007 <[email protected]>
AuthorDate: Sat Nov 28 09:48:13 2020 +0800

    [Optimize] optimize host selection strategy (#4914)
    
    When a tablet selects which replica's host to execute scan operation,
    it takes `round-robin` strategy to load balance. `minAssignedBytes` is the 
current load of one host.
    If a backend is not alive momently, it will randomly take one of other 
replicas as the choice,
    but the unalive backend's `minAssignedBytes`  not be descreased and the new 
choice's `minAssignedBytes`
    also not be increased. That will make the real load of the backends not 
correct.
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 11 +++
 .../java/org/apache/doris/qe/CoordinatorTest.java  | 82 ++++++++++++++++++++++
 2 files changed, 93 insertions(+)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 8a943f0..1f6ee87 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1308,6 +1308,17 @@ public class Coordinator {
             Reference<Long> backendIdRef = new Reference<Long>();
             TNetworkAddress execHostPort = 
SimpleScheduler.getHost(minLocation.backend_id,
                     scanRangeLocations.getLocations(), this.idToBackend, 
backendIdRef);
+            if (!execHostPort.hostname.equals(minLocation.server.hostname) ||
+                    execHostPort.port != minLocation.server.port) {
+                assignedBytesPerHost.put(minLocation.server,
+                        assignedBytesPerHost.get(minLocation.server) - 
scanRangeLength);
+                Long id = assignedBytesPerHost.get(execHostPort);
+                if (id == null) {
+                    assignedBytesPerHost.put(execHostPort, 0L);
+                } else {
+                    assignedBytesPerHost.put(execHostPort, id+scanRangeLength);
+                }
+            }
             this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
 
             Map<Integer, List<TScanRangeParams>> scanRanges = 
findOrInsert(assignment, execHostPort,
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index eb1bad3..b2f780f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -387,5 +387,87 @@ public class CoordinatorTest extends Coordinator {
         Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 5, params);
         Assert.assertEquals(3, params.instanceExecParams.size());
     }
+
+    @Test
+    public void testComputeScanRangeAssignmentByScheduler()  {
+        Coordinator coordinator = new Coordinator(context, analyzer, planner);
+        PlanFragmentId planFragmentId = new PlanFragmentId(1);
+        int scanNodeId = 1;
+        Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new 
HashMap<>();
+        fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
+        fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
+
+        TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
+        OlapTable olapTable = new OlapTable();
+        HashDistributionInfo hashDistributionInfo = new 
HashDistributionInfo(66, new ArrayList<>());
+        Deencapsulation.setField(olapTable, "defaultDistributionInfo", 
hashDistributionInfo);
+        tupleDescriptor.setTable(olapTable);
+
+        OlapScanNode olapScanNode = new OlapScanNode(new 
PlanNodeId(scanNodeId), tupleDescriptor, "test");
+        // each olaptable bucket have the same TScanRangeLocations, be id is 
{0, 1, 2}
+        TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
+        TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
+        tScanRangeLocation0.backend_id = 0;
+        tScanRangeLocation0.server = new TNetworkAddress("0.0.0.0", 9050);
+        TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
+        tScanRangeLocation1.backend_id = 1;
+        tScanRangeLocation1.server = new TNetworkAddress("0.0.0.1", 9050);
+        TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
+        tScanRangeLocation2.backend_id = 2;
+        tScanRangeLocation2.server = new TNetworkAddress("0.0.0.2", 9050);
+        tScanRangeLocations.locations = new ArrayList<>();
+        tScanRangeLocations.locations.add(tScanRangeLocation0);
+        tScanRangeLocations.locations.add(tScanRangeLocation1);
+        tScanRangeLocations.locations.add(tScanRangeLocation2);
+
+        TScanRangeLocations tScanRangeLocations1 = new TScanRangeLocations();
+        TScanRangeLocation tScanRangeLocation3 = new TScanRangeLocation();
+        tScanRangeLocation3.backend_id = 0;
+        tScanRangeLocation3.server = new TNetworkAddress("0.0.0.0", 9050);
+        TScanRangeLocation tScanRangeLocation4 = new TScanRangeLocation();
+        tScanRangeLocation4.backend_id = 1;
+        tScanRangeLocation4.server = new TNetworkAddress("0.0.0.1", 9050);
+        TScanRangeLocation tScanRangeLocation5 = new TScanRangeLocation();
+        tScanRangeLocation5.backend_id = 2;
+        tScanRangeLocation5.server = new TNetworkAddress("0.0.0.2", 9050);
+        tScanRangeLocations1.locations = new ArrayList<>();
+        tScanRangeLocations1.locations.add(tScanRangeLocation3);
+        tScanRangeLocations1.locations.add(tScanRangeLocation4);
+        tScanRangeLocations1.locations.add(tScanRangeLocation5);
+
+        olapScanNode.setFragment(new PlanFragment(planFragmentId, olapScanNode,
+                new DataPartition(TPartitionType.UNPARTITIONED)));
+
+        // init all backend
+        Backend backend0 = new Backend(0, "0.0.0.0", 9060);
+        backend0.setAlive(false);
+        backend0.setBePort(9050);
+        Backend backend1 = new Backend(1, "0.0.0.1", 9060);
+        backend1.setAlive(true);
+        backend1.setBePort(9050);
+        Backend backend2 = new Backend(2, "0.0.0.2", 9060);
+        backend2.setAlive(true);
+        backend2.setBePort(9050);
+
+        ImmutableMap<Long, Backend> idToBackend =
+                new ImmutableMap.Builder<Long, Backend>().
+                put(0l, backend0).
+                put(1l, backend1).
+                put(2l, backend2).build();
+        Deencapsulation.setField(coordinator, "idToBackend", idToBackend);
+        FragmentScanRangeAssignment assignment = new 
FragmentScanRangeAssignment();
+        List<TScanRangeLocations> locations = new ArrayList<>();
+        locations.add(tScanRangeLocations);
+        locations.add(tScanRangeLocations1);
+        Deencapsulation.invoke(coordinator, 
"computeScanRangeAssignmentByScheduler",
+                olapScanNode, locations, assignment);
+        for (Map.Entry entry:assignment.entrySet()) {
+            Map<Integer, List<TScanRangeParams>> addr = (HashMap<Integer, 
List<TScanRangeParams>>) entry.getValue();
+            for (Map.Entry item:addr.entrySet()) {
+                List<TScanRangeParams> params = (List<TScanRangeParams>) 
item.getValue();
+                Assert.assertTrue(params.size() == 2);
+            }
+        }
+    }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to