This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.2 by this push:
new dd6befd Coordinator segment balancer max load queue fix (#5888)
(#5970)
dd6befd is described below
commit dd6befdf1cf70194200912b0db2023ce9287372e
Author: Jihoon Son <[email protected]>
AuthorDate: Mon Jul 9 11:22:23 2018 -0700
Coordinator segment balancer max load queue fix (#5888) (#5970)
* Coordinator segment balancer will now respect
"maxSegmentsInNodeLoadingQueue" config
* allow moves from full load queues
* better variable names
---
.../helper/DruidCoordinatorBalancer.java | 26 ++++++----
.../coordinator/DruidCoordinatorBalancerTest.java | 55 ++++++++++++++++++----
2 files changed, 63 insertions(+), 18 deletions(-)
diff --git
a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
index cc266d7..15f1343 100644
---
a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
+++
b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
@@ -20,9 +20,9 @@
package io.druid.server.coordinator.helper;
import com.google.common.collect.Lists;
-import io.druid.java.util.emitter.EmittingLogger;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.emitter.EmittingLogger;
import io.druid.server.coordinator.BalancerSegmentHolder;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
@@ -103,31 +103,37 @@ public class DruidCoordinatorBalancer implements
DruidCoordinatorHelper
return;
}
- final List<ServerHolder> serverHolderList = Lists.newArrayList(servers);
+ final List<ServerHolder> toMoveFrom = Lists.newArrayList(servers);
+ final List<ServerHolder> toMoveTo = Lists.newArrayList(servers);
- if (serverHolderList.size() <= 1) {
+ if (toMoveTo.size() <= 1) {
log.info("[%s]: One or fewer servers found. Cannot balance.", tier);
return;
}
int numSegments = 0;
- for (ServerHolder server : serverHolderList) {
- numSegments += server.getServer().getSegments().size();
+ for (ServerHolder sourceHolder : toMoveFrom) {
+ numSegments += sourceHolder.getServer().getSegments().size();
}
if (numSegments == 0) {
log.info("No segments found. Cannot balance.");
return;
}
+
+ final int maxToLoad =
params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
long unmoved = 0L;
for (int iter = 0; iter < maxSegmentsToMove; iter++) {
- final BalancerSegmentHolder segmentToMove =
strategy.pickSegmentToMove(serverHolderList);
+ if (maxToLoad > 0) {
+ toMoveTo.removeIf(s -> s.getNumberOfSegmentsInQueue() >= maxToLoad);
+ }
+ final BalancerSegmentHolder segmentToMove =
strategy.pickSegmentToMove(toMoveFrom);
if (segmentToMove != null &&
params.getAvailableSegments().contains(segmentToMove.getSegment())) {
- final ServerHolder holder =
strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(),
serverHolderList);
+ final ServerHolder destinationHolder =
strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveTo);
- if (holder != null) {
- moveSegment(segmentToMove, holder.getServer(), params);
+ if (destinationHolder != null) {
+ moveSegment(segmentToMove, destinationHolder.getServer(), params);
} else {
++unmoved;
}
@@ -140,7 +146,7 @@ public class DruidCoordinatorBalancer implements
DruidCoordinatorHelper
stats.addToTieredStat("unmovedCount", tier, unmoved);
stats.addToTieredStat("movedCount", tier,
currentlyMovingSegments.get(tier).size());
if (params.getCoordinatorDynamicConfig().emitBalancingStats()) {
- strategy.emitStats(tier, stats, serverHolderList);
+ strategy.emitStats(tier, stats, toMoveFrom);
}
log.info(
"[%s]: Segments Moved: [%d] Segments Let Alone: [%d]",
diff --git
a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java
b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java
index ec0b841..84cf5ce 100644
---
a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java
+++
b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java
@@ -163,7 +163,6 @@ public class DruidCoordinatorBalancerTest
balancerStrategyExecutor.shutdownNow();
}
-
@Test
public void testMoveToEmptyServerBalancer() throws IOException
{
@@ -186,7 +185,7 @@ public class DruidCoordinatorBalancerTest
)
);
- DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
+ DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
)
@@ -198,6 +197,48 @@ public class DruidCoordinatorBalancerTest
}
@Test
+ public void testMoveMaxLoadQueueServerBalancer()
+ {
+ mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
+ mockDruidServer(druidServer2, "2", "normal", 0L, 100L,
Collections.emptyMap());
+
+ EasyMock.replay(druidServer3);
+ EasyMock.replay(druidServer4);
+
+ // Mock stuff that the coordinator needs
+ mockCoordinator(coordinator);
+
+ BalancerStrategy predefinedPickOrderStrategy = new
PredefinedPickOrderBalancerStrategy(
+ balancerStrategy,
+ ImmutableList.of(
+ new BalancerSegmentHolder(druidServer1, segment1),
+ new BalancerSegmentHolder(druidServer1, segment2),
+ new BalancerSegmentHolder(druidServer1, segment3),
+ new BalancerSegmentHolder(druidServer1, segment4)
+ )
+ );
+
+ DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
+ ImmutableList.of(druidServer1, druidServer2),
+ ImmutableList.of(peon1, peon2)
+ )
+ .withBalancerStrategy(predefinedPickOrderStrategy)
+ .withDynamicConfigs(
+ CoordinatorDynamicConfig
+ .builder()
+ .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
+ .withMaxSegmentsInNodeLoadingQueue(1)
+ .build()
+ )
+ .build();
+
+ params = new DruidCoordinatorBalancerTester(coordinator).run(params);
+
+ // max to move is 5, all segments on server 1, but only expect to move 1
to server 2 since max node load queue is 1
+ Assert.assertEquals(1,
params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
+ }
+
+ @Test
public void testMoveSameSegmentTwice() throws Exception
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
@@ -216,7 +257,7 @@ public class DruidCoordinatorBalancerTest
)
);
- DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
+ DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
)
@@ -245,7 +286,7 @@ public class DruidCoordinatorBalancerTest
// Mock stuff that the coordinator needs
mockCoordinator(coordinator);
- DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
+ DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
).build();
@@ -254,7 +295,6 @@ public class DruidCoordinatorBalancerTest
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount",
"normal") > 0);
}
-
@Test
public void testRun2() throws IOException
{
@@ -267,13 +307,13 @@ public class DruidCoordinatorBalancerTest
// Mock stuff that the coordinator needs
mockCoordinator(coordinator);
- DruidCoordinatorRuntimeParams params =
defaullRuntimeParamsBuilder(druidServers, peons).build();
+ DruidCoordinatorRuntimeParams params =
defaultRuntimeParamsBuilder(druidServers, peons).build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount",
"normal") > 0);
}
- private DruidCoordinatorRuntimeParams.Builder defaullRuntimeParamsBuilder(
+ private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder(
List<ImmutableDruidServer> druidServers,
List<LoadQueuePeon> peons
)
@@ -393,5 +433,4 @@ public class DruidCoordinatorBalancerTest
delegate.emitStats(tier, stats, serverHolderList);
}
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]