This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.2 by this push:
     new dd6befd  Coordinator segment balancer max load queue fix (#5888) 
(#5970)
dd6befd is described below

commit dd6befdf1cf70194200912b0db2023ce9287372e
Author: Jihoon Son <[email protected]>
AuthorDate: Mon Jul 9 11:22:23 2018 -0700

    Coordinator segment balancer max load queue fix (#5888) (#5970)
    
    * Coordinator segment balancer will now respect 
"maxSegmentsInNodeLoadingQueue" config
    
    * allow moves from full load queues
    
    * better variable names
---
 .../helper/DruidCoordinatorBalancer.java           | 26 ++++++----
 .../coordinator/DruidCoordinatorBalancerTest.java  | 55 ++++++++++++++++++----
 2 files changed, 63 insertions(+), 18 deletions(-)

diff --git 
a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
 
b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
index cc266d7..15f1343 100644
--- 
a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
+++ 
b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
@@ -20,9 +20,9 @@
 package io.druid.server.coordinator.helper;
 
 import com.google.common.collect.Lists;
-import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.client.ImmutableDruidServer;
 import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.server.coordinator.BalancerSegmentHolder;
 import io.druid.server.coordinator.BalancerStrategy;
 import io.druid.server.coordinator.CoordinatorStats;
@@ -103,31 +103,37 @@ public class DruidCoordinatorBalancer implements 
DruidCoordinatorHelper
       return;
     }
 
-    final List<ServerHolder> serverHolderList = Lists.newArrayList(servers);
+    final List<ServerHolder> toMoveFrom = Lists.newArrayList(servers);
+    final List<ServerHolder> toMoveTo = Lists.newArrayList(servers);
 
-    if (serverHolderList.size() <= 1) {
+    if (toMoveTo.size() <= 1) {
       log.info("[%s]: One or fewer servers found.  Cannot balance.", tier);
       return;
     }
 
     int numSegments = 0;
-    for (ServerHolder server : serverHolderList) {
-      numSegments += server.getServer().getSegments().size();
+    for (ServerHolder sourceHolder : toMoveFrom) {
+      numSegments += sourceHolder.getServer().getSegments().size();
     }
 
     if (numSegments == 0) {
       log.info("No segments found.  Cannot balance.");
       return;
     }
+
+    final int maxToLoad = 
params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
     long unmoved = 0L;
     for (int iter = 0; iter < maxSegmentsToMove; iter++) {
-      final BalancerSegmentHolder segmentToMove = 
strategy.pickSegmentToMove(serverHolderList);
+      if (maxToLoad > 0) {
+        toMoveTo.removeIf(s -> s.getNumberOfSegmentsInQueue() >= maxToLoad);
+      }
+      final BalancerSegmentHolder segmentToMove = 
strategy.pickSegmentToMove(toMoveFrom);
 
       if (segmentToMove != null && 
params.getAvailableSegments().contains(segmentToMove.getSegment())) {
-        final ServerHolder holder = 
strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), 
serverHolderList);
+        final ServerHolder destinationHolder = 
strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveTo);
 
-        if (holder != null) {
-          moveSegment(segmentToMove, holder.getServer(), params);
+        if (destinationHolder != null) {
+          moveSegment(segmentToMove, destinationHolder.getServer(), params);
         } else {
           ++unmoved;
         }
@@ -140,7 +146,7 @@ public class DruidCoordinatorBalancer implements 
DruidCoordinatorHelper
     stats.addToTieredStat("unmovedCount", tier, unmoved);
     stats.addToTieredStat("movedCount", tier, 
currentlyMovingSegments.get(tier).size());
     if (params.getCoordinatorDynamicConfig().emitBalancingStats()) {
-      strategy.emitStats(tier, stats, serverHolderList);
+      strategy.emitStats(tier, stats, toMoveFrom);
     }
     log.info(
         "[%s]: Segments Moved: [%d] Segments Let Alone: [%d]",
diff --git 
a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java
 
b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java
index ec0b841..84cf5ce 100644
--- 
a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java
+++ 
b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java
@@ -163,7 +163,6 @@ public class DruidCoordinatorBalancerTest
     balancerStrategyExecutor.shutdownNow();
   }
 
-
   @Test
   public void testMoveToEmptyServerBalancer() throws IOException
   {
@@ -186,7 +185,7 @@ public class DruidCoordinatorBalancerTest
         )
     );
 
-    DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
         ImmutableList.of(druidServer1, druidServer2),
         ImmutableList.of(peon1, peon2)
     )
@@ -198,6 +197,48 @@ public class DruidCoordinatorBalancerTest
   }
 
   @Test
+  public void testMoveMaxLoadQueueServerBalancer()
+  {
+    mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
+    mockDruidServer(druidServer2, "2", "normal", 0L, 100L, 
Collections.emptyMap());
+
+    EasyMock.replay(druidServer3);
+    EasyMock.replay(druidServer4);
+
+    // Mock stuff that the coordinator needs
+    mockCoordinator(coordinator);
+
+    BalancerStrategy predefinedPickOrderStrategy = new 
PredefinedPickOrderBalancerStrategy(
+        balancerStrategy,
+        ImmutableList.of(
+            new BalancerSegmentHolder(druidServer1, segment1),
+            new BalancerSegmentHolder(druidServer1, segment2),
+            new BalancerSegmentHolder(druidServer1, segment3),
+            new BalancerSegmentHolder(druidServer1, segment4)
+        )
+    );
+
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
+        ImmutableList.of(druidServer1, druidServer2),
+        ImmutableList.of(peon1, peon2)
+    )
+        .withBalancerStrategy(predefinedPickOrderStrategy)
+        .withDynamicConfigs(
+            CoordinatorDynamicConfig
+                .builder()
+                .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
+                .withMaxSegmentsInNodeLoadingQueue(1)
+                .build()
+        )
+        .build();
+
+    params = new DruidCoordinatorBalancerTester(coordinator).run(params);
+
+    // max to move is 5, all segments on server 1, but only expect to move 1 
to server 2 since max node load queue is 1
+    Assert.assertEquals(1, 
params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
+  }
+
+  @Test
   public void testMoveSameSegmentTwice() throws Exception
   {
     mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
@@ -216,7 +257,7 @@ public class DruidCoordinatorBalancerTest
         )
     );
 
-    DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
         ImmutableList.of(druidServer1, druidServer2),
         ImmutableList.of(peon1, peon2)
     )
@@ -245,7 +286,7 @@ public class DruidCoordinatorBalancerTest
     // Mock stuff that the coordinator needs
     mockCoordinator(coordinator);
 
-    DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
         ImmutableList.of(druidServer1, druidServer2),
         ImmutableList.of(peon1, peon2)
     ).build();
@@ -254,7 +295,6 @@ public class DruidCoordinatorBalancerTest
     Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", 
"normal") > 0);
   }
 
-
   @Test
   public void testRun2() throws IOException
   {
@@ -267,13 +307,13 @@ public class DruidCoordinatorBalancerTest
     // Mock stuff that the coordinator needs
     mockCoordinator(coordinator);
 
-    DruidCoordinatorRuntimeParams params = 
defaullRuntimeParamsBuilder(druidServers, peons).build();
+    DruidCoordinatorRuntimeParams params = 
defaultRuntimeParamsBuilder(druidServers, peons).build();
 
     params = new DruidCoordinatorBalancerTester(coordinator).run(params);
     Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", 
"normal") > 0);
   }
 
-  private DruidCoordinatorRuntimeParams.Builder defaullRuntimeParamsBuilder(
+  private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder(
       List<ImmutableDruidServer> druidServers,
       List<LoadQueuePeon> peons
   )
@@ -393,5 +433,4 @@ public class DruidCoordinatorBalancerTest
       delegate.emitStats(tier, stats, serverHolderList);
     }
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to