This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 71b133f3ff Add `RoundRobinServerSelector` to speed up segment
assignments (#13367)
71b133f3ff is described below
commit 71b133f3ff1ba9c547f94ddeed63d72e14419e94
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Nov 16 20:05:17 2022 +0530
Add `RoundRobinServerSelector` to speed up segment assignments (#13367)
Segment assignments can take very long due to the strategy cost computation
for a large number of segments. This commit allows segment assignments to be
done in a round-robin fashion within a tier. Only segment balancing takes
cost-based
decisions to move segments around.
Changes
- Add dynamic config `useRoundRobinSegmentAssignment` with default value
false
- Add `RoundRobinServerSelector`. This does not implement the
`BalancerStrategy`
as it does not conform to that contract and may also be used in conjunction
with a
strategy (round-robin for `RunRules` and a cost strategy for
`BalanceSegments`)
- Drops are still cost-based even when round-robin assignment is enabled.
---
.../coordinator/CoordinatorDynamicConfig.java | 34 ++++-
.../druid/server/coordinator/DruidCoordinator.java | 12 +-
.../coordinator/DruidCoordinatorRuntimeParams.java | 22 +++
.../coordinator/RoundRobinServerSelector.java | 153 +++++++++++++++++++++
.../druid/server/coordinator/ServerHolder.java | 33 +++++
.../druid/server/coordinator/rules/LoadRule.java | 59 +++++---
.../coordinator/RoundRobinServerSelectorTest.java | 149 ++++++++++++++++++++
.../server/coordinator/rules/LoadRuleTest.java | 75 +++++++---
.../simulate/CoordinatorSimulation.java | 6 +
.../simulate/CoordinatorSimulationBaseTest.java | 22 ++-
.../simulate/CoordinatorSimulationBuilder.java | 37 +++--
.../simulate/RoundRobinAssignmentTest.java | 112 +++++++++++++++
.../server/http/CoordinatorDynamicConfigTest.java | 47 ++++---
.../coordinator-dynamic-config.tsx | 12 ++
14 files changed, 691 insertions(+), 82 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index 90574780ea..475a8f88cd 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -61,6 +61,7 @@ public class CoordinatorDynamicConfig
private final int replicationThrottleLimit;
private final int balancerComputeThreads;
private final boolean emitBalancingStats;
+ private final boolean useRoundRobinSegmentAssignment;
/**
* List of specific data sources for which kill tasks are sent in {@link
KillUnusedSegments}.
@@ -134,7 +135,8 @@ public class CoordinatorDynamicConfig
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int
decommissioningMaxPercentOfMaxSegmentsToMove,
@JsonProperty("pauseCoordination") boolean pauseCoordination,
@JsonProperty("replicateAfterLoadTimeout") boolean
replicateAfterLoadTimeout,
- @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer
maxNonPrimaryReplicantsToLoad
+ @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer
maxNonPrimaryReplicantsToLoad,
+ @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean
useRoundRobinSegmentAssignment
)
{
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
@@ -195,6 +197,12 @@ public class CoordinatorDynamicConfig
"maxNonPrimaryReplicantsToLoad must be greater than or equal to 0."
);
this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad;
+
+ if (useRoundRobinSegmentAssignment == null) {
+ this.useRoundRobinSegmentAssignment =
Builder.DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT;
+ } else {
+ this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
+ }
}
private static Set<String> parseJsonStringOrArray(Object jsonStringOrArray)
@@ -316,6 +324,12 @@ public class CoordinatorDynamicConfig
return maxSegmentsInNodeLoadingQueue;
}
+ @JsonProperty
+ public boolean isUseRoundRobinSegmentAssignment()
+ {
+ return useRoundRobinSegmentAssignment;
+ }
+
/**
* List of historical servers to 'decommission'. Coordinator will not assign
new segments to 'decommissioning'
* servers, and segments will be moved away from them to be placed on
non-decommissioning servers at the maximum rate
@@ -509,6 +523,7 @@ public class CoordinatorDynamicConfig
private static final boolean DEFAULT_PAUSE_COORDINATION = false;
private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false;
private static final int DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD =
Integer.MAX_VALUE;
+ private static final boolean DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT = false;
private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments;
private Long mergeBytesLimit;
@@ -528,6 +543,7 @@ public class CoordinatorDynamicConfig
private Boolean pauseCoordination;
private Boolean replicateAfterLoadTimeout;
private Integer maxNonPrimaryReplicantsToLoad;
+ private Boolean useRoundRobinSegmentAssignment;
public Builder()
{
@@ -554,7 +570,8 @@ public class CoordinatorDynamicConfig
@Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove,
@JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination,
@JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean
replicateAfterLoadTimeout,
- @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer
maxNonPrimaryReplicantsToLoad
+ @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer
maxNonPrimaryReplicantsToLoad,
+ @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean
useRoundRobinSegmentAssignment
)
{
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
@@ -576,6 +593,7 @@ public class CoordinatorDynamicConfig
this.pauseCoordination = pauseCoordination;
this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;
this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad;
+ this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
}
public Builder
withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long
leadingTimeMillis)
@@ -681,6 +699,12 @@ public class CoordinatorDynamicConfig
return this;
}
+ public Builder withUseRoundRobinSegmentAssignment(boolean
useRoundRobinSegmentAssignment)
+ {
+ this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
+ return this;
+ }
+
public CoordinatorDynamicConfig build()
{
return new CoordinatorDynamicConfig(
@@ -709,7 +733,8 @@ public class CoordinatorDynamicConfig
pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION :
pauseCoordination,
replicateAfterLoadTimeout == null ?
DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
maxNonPrimaryReplicantsToLoad == null ?
DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
- : maxNonPrimaryReplicantsToLoad
+ :
maxNonPrimaryReplicantsToLoad,
+ useRoundRobinSegmentAssignment == null ?
DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment
);
}
@@ -747,7 +772,8 @@ public class CoordinatorDynamicConfig
replicateAfterLoadTimeout == null ?
defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
maxNonPrimaryReplicantsToLoad == null
? defaults.getMaxNonPrimaryReplicantsToLoad()
- : maxNonPrimaryReplicantsToLoad
+ : maxNonPrimaryReplicantsToLoad,
+ useRoundRobinSegmentAssignment == null ?
defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment
);
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 50239db4ec..38fcd9efa5 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -993,10 +993,19 @@ public class DruidCoordinator
stopPeonsForDisappearedServers(currentServers);
+ final RoundRobinServerSelector roundRobinServerSelector;
+ if
(params.getCoordinatorDynamicConfig().isUseRoundRobinSegmentAssignment()) {
+ roundRobinServerSelector = new RoundRobinServerSelector(cluster);
+ log.info("Using round-robin segment assignment.");
+ } else {
+ roundRobinServerSelector = null;
+ }
+
return params.buildFromExisting()
.withDruidCluster(cluster)
.withLoadManagementPeons(loadManagementPeons)
.withSegmentReplicantLookup(segmentReplicantLookup)
+ .withRoundRobinServerSelector(roundRobinServerSelector)
.build();
}
@@ -1044,7 +1053,8 @@ public class DruidCoordinator
new ServerHolder(
server,
loadManagementPeons.get(server.getName()),
- decommissioningServers.contains(server.getHost())
+ decommissioningServers.contains(server.getHost()),
+
params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue()
)
);
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
index ed488607ed..f195a5ebad 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
@@ -70,6 +70,7 @@ public class DruidCoordinatorRuntimeParams
private final CoordinatorStats stats;
private final BalancerStrategy balancerStrategy;
private final Set<String> broadcastDatasources;
+ private final @Nullable RoundRobinServerSelector roundRobinServerSelector;
private DruidCoordinatorRuntimeParams(
long startTimeNanos,
@@ -80,6 +81,7 @@ public class DruidCoordinatorRuntimeParams
@Nullable DataSourcesSnapshot dataSourcesSnapshot,
Map<String, LoadQueuePeon> loadManagementPeons,
ReplicationThrottler replicationManager,
+ RoundRobinServerSelector roundRobinServerSelector,
ServiceEmitter emitter,
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorCompactionConfig coordinatorCompactionConfig,
@@ -96,6 +98,7 @@ public class DruidCoordinatorRuntimeParams
this.dataSourcesSnapshot = dataSourcesSnapshot;
this.loadManagementPeons = loadManagementPeons;
this.replicationManager = replicationManager;
+ this.roundRobinServerSelector = roundRobinServerSelector;
this.emitter = emitter;
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.coordinatorCompactionConfig = coordinatorCompactionConfig;
@@ -150,6 +153,12 @@ public class DruidCoordinatorRuntimeParams
return replicationManager;
}
+ @Nullable
+ public RoundRobinServerSelector getRoundRobinServerSelector()
+ {
+ return roundRobinServerSelector;
+ }
+
public ServiceEmitter getEmitter()
{
return emitter;
@@ -211,6 +220,7 @@ public class DruidCoordinatorRuntimeParams
dataSourcesSnapshot,
loadManagementPeons,
replicationManager,
+ roundRobinServerSelector,
emitter,
coordinatorDynamicConfig,
coordinatorCompactionConfig,
@@ -231,6 +241,7 @@ public class DruidCoordinatorRuntimeParams
null, // dataSourcesSnapshot
loadManagementPeons,
replicationManager,
+ roundRobinServerSelector,
emitter,
coordinatorDynamicConfig,
coordinatorCompactionConfig,
@@ -250,6 +261,7 @@ public class DruidCoordinatorRuntimeParams
private @Nullable DataSourcesSnapshot dataSourcesSnapshot;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private ReplicationThrottler replicationManager;
+ private @Nullable RoundRobinServerSelector roundRobinServerSelector;
private ServiceEmitter emitter;
private CoordinatorDynamicConfig coordinatorDynamicConfig;
private CoordinatorCompactionConfig coordinatorCompactionConfig;
@@ -267,6 +279,7 @@ public class DruidCoordinatorRuntimeParams
this.dataSourcesSnapshot = null;
this.loadManagementPeons = new HashMap<>();
this.replicationManager = null;
+ this.roundRobinServerSelector = null;
this.emitter = null;
this.stats = new CoordinatorStats();
this.coordinatorDynamicConfig =
CoordinatorDynamicConfig.builder().build();
@@ -283,6 +296,7 @@ public class DruidCoordinatorRuntimeParams
@Nullable DataSourcesSnapshot dataSourcesSnapshot,
Map<String, LoadQueuePeon> loadManagementPeons,
ReplicationThrottler replicationManager,
+ RoundRobinServerSelector roundRobinServerSelector,
ServiceEmitter emitter,
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorCompactionConfig coordinatorCompactionConfig,
@@ -299,6 +313,7 @@ public class DruidCoordinatorRuntimeParams
this.dataSourcesSnapshot = dataSourcesSnapshot;
this.loadManagementPeons = loadManagementPeons;
this.replicationManager = replicationManager;
+ this.roundRobinServerSelector = roundRobinServerSelector;
this.emitter = emitter;
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.coordinatorCompactionConfig = coordinatorCompactionConfig;
@@ -319,6 +334,7 @@ public class DruidCoordinatorRuntimeParams
dataSourcesSnapshot,
loadManagementPeons,
replicationManager,
+ roundRobinServerSelector,
emitter,
coordinatorDynamicConfig,
coordinatorCompactionConfig,
@@ -401,6 +417,12 @@ public class DruidCoordinatorRuntimeParams
return this;
}
+ public Builder withRoundRobinServerSelector(RoundRobinServerSelector
roundRobinServerSelector)
+ {
+ this.roundRobinServerSelector = roundRobinServerSelector;
+ return this;
+ }
+
public Builder withEmitter(ServiceEmitter emitter)
{
this.emitter = emitter;
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/RoundRobinServerSelector.java
b/server/src/main/java/org/apache/druid/server/coordinator/RoundRobinServerSelector.java
new file mode 100644
index 0000000000..89459adcc4
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/coordinator/RoundRobinServerSelector.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * Provides iterators over historicals for a given tier that can load a
+ * specified segment.
+ * <p>
+ * Once a selector is initialized with a {@link DruidCluster}, an iterator
+ * returned by {@link #getServersInTierToLoadSegment(String, DataSegment)}
+ * iterates over the historicals in a tier in a round robin fashion. The next
+ * invocation of this method picks up where the last iterator had left off.
+ * <p>
+ * This class is not thread-safe and must be used from a single thread.
+ */
+@NotThreadSafe
+public class RoundRobinServerSelector
+{
+ private final Map<String, CircularServerList> tierToServers = new
HashMap<>();
+
+ public RoundRobinServerSelector(DruidCluster cluster)
+ {
+ cluster.getHistoricals().forEach(
+ (tier, servers) -> tierToServers.put(tier, new
CircularServerList(servers))
+ );
+ }
+
+ /**
+ * Returns an iterator over the servers in this tier which are eligible to
+ * load the given segment.
+ */
+ public Iterator<ServerHolder> getServersInTierToLoadSegment(String tier,
DataSegment segment)
+ {
+ final CircularServerList iterator = tierToServers.get(tier);
+ if (iterator == null) {
+ return Collections.emptyIterator();
+ }
+
+ return new EligibleServerIterator(segment, iterator);
+ }
+
+ /**
+ * Iterator over servers in a tier that are eligible to load a given segment.
+ */
+ private static class EligibleServerIterator implements Iterator<ServerHolder>
+ {
+ final CircularServerList delegate;
+ final DataSegment segment;
+
+ ServerHolder nextEligible;
+ int remainingIterations;
+
+ EligibleServerIterator(DataSegment segment, CircularServerList delegate)
+ {
+ this.delegate = delegate;
+ this.segment = segment;
+ this.remainingIterations = delegate.servers.size();
+ nextEligible = search();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return nextEligible != null;
+ }
+
+ @Override
+ public ServerHolder next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ ServerHolder previous = nextEligible;
+ delegate.advanceCursor();
+ nextEligible = search();
+ return previous;
+ }
+
+ ServerHolder search()
+ {
+ while (remainingIterations-- > 0) {
+ ServerHolder nextServer = delegate.peekNext();
+ if (nextServer.canLoadSegment(segment)) {
+ return nextServer;
+ } else {
+ delegate.advanceCursor();
+ }
+ }
+
+ return null;
+ }
+ }
+
+ /**
+ * Circular list over all servers in a tier. A single instance of this is
+ * maintained for each tier.
+ */
+ private static class CircularServerList
+ {
+ final List<ServerHolder> servers = new ArrayList<>();
+ int currentPosition;
+
+ CircularServerList(Set<ServerHolder> servers)
+ {
+ this.servers.addAll(servers);
+ //Collections.shuffle(this.servers);
+ }
+
+ void advanceCursor()
+ {
+ if (++currentPosition >= servers.size()) {
+ currentPosition = 0;
+ }
+ }
+
+ ServerHolder peekNext()
+ {
+ int nextPosition = currentPosition < servers.size() ? currentPosition :
0;
+ return servers.get(nextPosition);
+ }
+ }
+
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
index 43fdaaef1d..057dfb118c 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
@@ -34,6 +34,7 @@ public class ServerHolder implements Comparable<ServerHolder>
private final ImmutableDruidServer server;
private final LoadQueuePeon peon;
private final boolean isDecommissioning;
+ private final int maxSegmentsInLoadQueue;
public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon)
{
@@ -41,10 +42,21 @@ public class ServerHolder implements
Comparable<ServerHolder>
}
public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon, boolean
isDecommissioning)
+ {
+ this(server, peon, isDecommissioning, 0);
+ }
+
+ public ServerHolder(
+ ImmutableDruidServer server,
+ LoadQueuePeon peon,
+ boolean isDecommissioning,
+ int maxSegmentsInNodeLoadingQueue
+ )
{
this.server = server;
this.peon = peon;
this.isDecommissioning = isDecommissioning;
+ this.maxSegmentsInLoadQueue = maxSegmentsInNodeLoadingQueue;
}
public ImmutableDruidServer getServer()
@@ -138,6 +150,27 @@ public class ServerHolder implements
Comparable<ServerHolder>
return server.getSegment(segmentId) != null;
}
+ /**
+ * Checks if the server can load the given segment.
+ * <p>
+ * A load is possible only if the server meets all of the following criteria:
+ * <ul>
+ * <li>is not being decommissioned</li>
+ * <li>is not already serving the segment</li>
+ * <li>is not performing any other action on the segment</li>
+ * <li>has not already exceeded the load queue limit in this run</li>
+ * <li>has available disk space</li>
+ * </ul>
+ */
+ public boolean canLoadSegment(DataSegment segment)
+ {
+ return !isDecommissioning
+ && !isServingSegment(segment.getId())
+ && !isLoadingSegment(segment)
+ && (maxSegmentsInLoadQueue == 0 || maxSegmentsInLoadQueue >
peon.getNumberOfSegmentsInQueue())
+ && getAvailableSize() >= segment.getSize();
+ }
+
@Override
public int compareTo(ServerHolder serverHolder)
{
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
index 3b20cf0913..933b63e472 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
@@ -44,7 +44,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
-import java.util.Objects;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -178,7 +177,7 @@ public abstract class LoadRule implements Rule
targetReplicants.getOrDefault(tier, 0),
numAssigned, // note that the currentReplicantsInTier is the
just-assigned primary replica.
params,
- createLoadQueueSizeLimitingPredicate(params).and(holder ->
!holder.equals(primaryHolderToLoad)),
+ createLoadQueueSizeLimitingPredicate(segment).and(holder ->
!holder.equals(primaryHolderToLoad)),
segment
);
@@ -193,15 +192,10 @@ public abstract class LoadRule implements Rule
}
private static Predicate<ServerHolder> createLoadQueueSizeLimitingPredicate(
- final DruidCoordinatorRuntimeParams params
+ final DataSegment segment
)
{
- final int maxSegmentsInNodeLoadingQueue =
params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
- if (maxSegmentsInNodeLoadingQueue <= 0) {
- return Objects::nonNull;
- } else {
- return s -> (s != null && s.getNumberOfSegmentsInQueue() <
maxSegmentsInNodeLoadingQueue);
- }
+ return server -> server != null && server.canLoadSegment(segment);
}
private static List<ServerHolder> getFilteredHolders(
@@ -219,6 +213,21 @@ public abstract class LoadRule implements Rule
return
queue.stream().filter(isActive.and(predicate)).collect(Collectors.toList());
}
+ private Iterator<ServerHolder> getRoundRobinIterator(
+ DruidCoordinatorRuntimeParams params,
+ String tier,
+ DataSegment segment
+ )
+ {
+ if (params.getRoundRobinServerSelector() == null
+ ||
!params.getCoordinatorDynamicConfig().isUseRoundRobinSegmentAssignment()) {
+ return null;
+ }
+
+ return params.getRoundRobinServerSelector()
+ .getServersInTierToLoadSegment(tier, segment);
+ }
+
/**
* Iterates through each tier and find the respective segment homes; with
the found segment homes, selects the one
* with the highest priority to be the holder for the primary replica.
@@ -230,6 +239,8 @@ public abstract class LoadRule implements Rule
)
{
ServerHolder topCandidate = null;
+ final boolean useRoundRobinAssignment =
params.getCoordinatorDynamicConfig()
+
.isUseRoundRobinSegmentAssignment();
for (final Object2IntMap.Entry<String> entry :
targetReplicants.object2IntEntrySet()) {
final int targetReplicantsInTier = entry.getIntValue();
// sanity check: target number of replicants should be more than zero.
@@ -248,7 +259,7 @@ public abstract class LoadRule implements Rule
final List<ServerHolder> holders = getFilteredHolders(
tier,
params.getDruidCluster(),
- createLoadQueueSizeLimitingPredicate(params)
+ createLoadQueueSizeLimitingPredicate(segment)
);
// no holders satisfy the predicate
if (holders.isEmpty()) {
@@ -256,12 +267,20 @@ public abstract class LoadRule implements Rule
continue;
}
- final ServerHolder candidate =
params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders);
+ final ServerHolder candidate;
+ if (useRoundRobinAssignment) {
+ Iterator<ServerHolder> roundRobinIterator =
getRoundRobinIterator(params, tier, segment);
+ candidate = roundRobinIterator.hasNext() ? roundRobinIterator.next() :
null;
+ } else {
+ candidate =
params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders);
+ if (candidate != null) {
+ strategyCache.put(tier, candidate);
+ }
+ }
+
if (candidate == null) {
log.warn(noAvailability);
} else {
- // cache the result for later use.
- strategyCache.put(tier, candidate);
if (topCandidate == null ||
candidate.getServer().getPriority() >
topCandidate.getServer().getPriority()) {
topCandidate = candidate;
@@ -307,7 +326,7 @@ public abstract class LoadRule implements Rule
entry.getIntValue(),
params.getSegmentReplicantLookup().getTotalReplicants(segment.getId(), tier),
params,
- createLoadQueueSizeLimitingPredicate(params),
+ createLoadQueueSizeLimitingPredicate(segment),
segment
);
stats.addToGlobalStat(NON_PRIMARY_ASSIGNED_COUNT, numAssigned);
@@ -347,6 +366,7 @@ public abstract class LoadRule implements Rule
return 0;
}
+ final Iterator<ServerHolder> roundRobinServerIterator =
getRoundRobinIterator(params, tier, segment);
final ReplicationThrottler throttler = params.getReplicationManager();
for (int numAssigned = 0; numAssigned < numToAssign; numAssigned++) {
if (!throttler.canCreateReplicant(tier)) {
@@ -355,10 +375,15 @@ public abstract class LoadRule implements Rule
}
// Retrieves from cache if available
- ServerHolder holder = strategyCache.remove(tier);
- // Does strategy call if not in cache
- if (holder == null) {
+ final ServerHolder holder;
+ if (strategyCache.containsKey(tier)) {
+ // found in cache
+ holder = strategyCache.remove(tier);
+ } else if (roundRobinServerIterator == null) {
+ // Call balancer strategy
holder =
params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders);
+ } else {
+ holder = roundRobinServerIterator.hasNext() ?
roundRobinServerIterator.next() : null;
}
if (holder == null) {
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
new file mode 100644
index 0000000000..bcef7ca8a4
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+public class RoundRobinServerSelectorTest
+{
+ private static final String TIER = "normal";
+
+ private final DataSegment segment = new DataSegment(
+ "wiki",
+ Intervals.of("2022-01-01/2022-01-02"),
+ "1",
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new NumberedShardSpec(1, 10),
+ IndexIO.CURRENT_VERSION_ID,
+ 100
+ );
+
+ @Test
+ public void testSingleIterator()
+ {
+ final ServerHolder serverXL = createHistorical("serverXL", 1000);
+ final ServerHolder serverL = createHistorical("serverXL", 900);
+ final ServerHolder serverM = createHistorical("serverXL", 800);
+
+ // This server is too small to house the segment
+ final ServerHolder serverXS = createHistorical("serverXL", 10);
+
+ DruidCluster cluster = DruidClusterBuilder
+ .newBuilder()
+ .addTier(TIER, serverXL, serverM, serverXS, serverL)
+ .build();
+ final RoundRobinServerSelector selector = new
RoundRobinServerSelector(cluster);
+
+ // Verify that only eligible servers are returned in order of available
size
+ Iterator<ServerHolder> pickedServers =
selector.getServersInTierToLoadSegment(TIER, segment);
+
+ Assert.assertTrue(pickedServers.hasNext());
+ Assert.assertEquals(serverXL, pickedServers.next());
+ Assert.assertEquals(serverL, pickedServers.next());
+ Assert.assertEquals(serverM, pickedServers.next());
+
+ Assert.assertFalse(pickedServers.hasNext());
+ }
+
+ @Test
+ public void testNextIteratorContinuesFromSamePosition()
+ {
+ final ServerHolder serverXL = createHistorical("serverXL", 1000);
+ final ServerHolder serverL = createHistorical("serverXL", 900);
+ final ServerHolder serverM = createHistorical("serverXL", 800);
+
+ // This server is too small to house the segment
+ final ServerHolder serverXS = createHistorical("serverXL", 10);
+
+ DruidCluster cluster = DruidClusterBuilder
+ .newBuilder()
+ .addTier(TIER, serverXL, serverM, serverXS, serverL)
+ .build();
+ final RoundRobinServerSelector selector = new
RoundRobinServerSelector(cluster);
+
+ // Verify that only eligible servers are returned in order of available
size
+ Iterator<ServerHolder> pickedServers =
selector.getServersInTierToLoadSegment(TIER, segment);
+ Assert.assertTrue(pickedServers.hasNext());
+ Assert.assertEquals(serverXL, pickedServers.next());
+
+ // Second iterator starts from previous position but resets allowed number
of iterations
+ pickedServers = selector.getServersInTierToLoadSegment(TIER, segment);
+ Assert.assertTrue(pickedServers.hasNext());
+
+ Assert.assertEquals(serverL, pickedServers.next());
+ Assert.assertEquals(serverM, pickedServers.next());
+ Assert.assertEquals(serverXL, pickedServers.next());
+
+ Assert.assertFalse(pickedServers.hasNext());
+ }
+
+ @Test
+ public void testNoServersInTier()
+ {
+ DruidCluster cluster = DruidClusterBuilder
+ .newBuilder()
+ .addTier(TIER)
+ .build();
+ final RoundRobinServerSelector selector = new
RoundRobinServerSelector(cluster);
+
+ Iterator<ServerHolder> eligibleServers =
selector.getServersInTierToLoadSegment(TIER, segment);
+ Assert.assertFalse(eligibleServers.hasNext());
+ }
+
+ @Test
+ public void testNoEligibleServerInTier()
+ {
+ DruidCluster cluster = DruidClusterBuilder
+ .newBuilder()
+ .addTier(
+ TIER,
+ createHistorical("server1", 40),
+ createHistorical("server2", 30),
+ createHistorical("server3", 10),
+ createHistorical("server4", 20)
+ )
+ .build();
+ final RoundRobinServerSelector selector = new
RoundRobinServerSelector(cluster);
+
+ // Verify that only eligible servers are returned in order of available
size
+ Iterator<ServerHolder> eligibleServers =
selector.getServersInTierToLoadSegment(TIER, segment);
+ Assert.assertFalse(eligibleServers.hasNext());
+ }
+
+ private ServerHolder createHistorical(String name, long size)
+ {
+ return new ServerHolder(
+ new DruidServer(name, name, null, size, ServerType.HISTORICAL, TIER,
1).toImmutableDruidServer(),
+ new LoadQueuePeonTester()
+ );
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
index 99a975caff..b039834e7b 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
@@ -48,6 +48,7 @@ import
org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.LoadQueuePeon;
import org.apache.druid.server.coordinator.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.ReplicationThrottler;
+import org.apache.druid.server.coordinator.RoundRobinServerSelector;
import org.apache.druid.server.coordinator.SegmentReplicantLookup;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.cost.ClusterCostCache;
@@ -61,8 +62,11 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -73,6 +77,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
*/
+@RunWith(Parameterized.class)
public class LoadRuleTest
{
private static final Logger log = new Logger(LoadRuleTest.class);
@@ -95,8 +100,20 @@ public class LoadRuleTest
private CachingCostBalancerStrategy cachingCostBalancerStrategy;
+ private final boolean useRoundRobinAssignment;
private BalancerStrategy mockBalancerStrategy;
+ @Parameterized.Parameters(name = "useRoundRobin = {0}")
+ public static List<Boolean> getTestParams()
+ {
+ return Arrays.asList(true, false);
+ }
+
+ public LoadRuleTest(boolean useRoundRobinAssignment)
+ {
+ this.useRoundRobinAssignment = useRoundRobinAssignment;
+ }
+
@Before
public void setUp()
{
@@ -106,7 +123,6 @@ public class LoadRuleTest
exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
balancerStrategy = new
CostBalancerStrategyFactory().createBalancerStrategy(exec);
-
cachingCostBalancerStrategy = new
CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);
mockBalancerStrategy = EasyMock.createMock(BalancerStrategy.class);
@@ -138,9 +154,11 @@ public class LoadRuleTest
throttler.registerReplicantCreation(DruidServer.DEFAULT_TIER,
segment.getId(), "hostNorm");
EasyMock.expectLastCall().once();
-
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
EasyMock.anyObject()))
- .andDelegateTo(balancerStrategy)
- .times(3);
+ if (!useRoundRobinAssignment) {
+
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
EasyMock.anyObject()))
+ .andDelegateTo(balancerStrategy)
+ .times(3);
+ }
EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);
@@ -189,6 +207,13 @@ public class LoadRuleTest
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster,
false))
.withReplicationManager(throttler)
+ .withDynamicConfigs(
+ CoordinatorDynamicConfig
+ .builder()
+ .withUseRoundRobinSegmentAssignment(useRoundRobinAssignment)
+ .build()
+ )
+ .withRoundRobinServerSelector(useRoundRobinAssignment ? new
RoundRobinServerSelector(druidCluster) : null)
.withBalancerStrategy(mockBalancerStrategy)
.withUsedSegmentsInTest(usedSegments)
.build();
@@ -440,9 +465,11 @@ public class LoadRuleTest
mockPeon2.loadSegment(EasyMock.anyObject(), EasyMock.isNull());
EasyMock.expectLastCall().once();
-
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
EasyMock.anyObject()))
- .andDelegateTo(balancerStrategy)
- .times(2);
+ if (!useRoundRobinAssignment) {
+
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
EasyMock.anyObject()))
+ .andDelegateTo(balancerStrategy)
+ .times(2);
+ }
EasyMock.replay(throttler, mockPeon1, mockPeon2, mockBalancerStrategy);
@@ -626,6 +653,7 @@ public class LoadRuleTest
@Test
public void testMaxLoadingQueueSize()
{
+ final int maxSegmentsInLoadQueue = 2;
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(2);
@@ -643,7 +671,9 @@ public class LoadRuleTest
new ServerHolder(
new DruidServer("serverHot", "hostHot", null, 1000,
ServerType.HISTORICAL, "hot", 0)
.toImmutableDruidServer(),
- peon
+ peon,
+ false,
+ maxSegmentsInLoadQueue
)
)
.build();
@@ -659,8 +689,11 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withUsedSegmentsInTest(dataSegment1, dataSegment2, dataSegment3)
-
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsInNodeLoadingQueue(2).build())
- .build();
+ .withDynamicConfigs(
+ CoordinatorDynamicConfig.builder()
+
.withMaxSegmentsInNodeLoadingQueue(maxSegmentsInLoadQueue)
+ .build()
+ ).build();
CoordinatorStats stats1 = rule.run(null, params, dataSegment1);
CoordinatorStats stats2 = rule.run(null, params, dataSegment2);
@@ -687,9 +720,11 @@ public class LoadRuleTest
final DataSegment segment = createDataSegment("foo");
-
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
EasyMock.anyObject()))
- .andDelegateTo(balancerStrategy)
- .times(1);
+ if (!useRoundRobinAssignment) {
+
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
EasyMock.anyObject()))
+ .andDelegateTo(balancerStrategy)
+ .times(1);
+ }
EasyMock.replay(mockPeon1, mockPeon2, mockBalancerStrategy);
@@ -732,12 +767,14 @@ public class LoadRuleTest
ServerHolder holder3 = createServerHolder("tier2", mockPeon3, false);
ServerHolder holder4 = createServerHolder("tier2", mockPeon4, false);
- EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment,
ImmutableList.of(holder2)))
- .andReturn(holder2);
- EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment,
ImmutableList.of(holder4, holder3)))
- .andReturn(holder3);
- EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment,
ImmutableList.of(holder4)))
- .andReturn(holder4);
+ if (!useRoundRobinAssignment) {
+
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment,
ImmutableList.of(holder2)))
+ .andReturn(holder2);
+
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment,
ImmutableList.of(holder4, holder3)))
+ .andReturn(holder3);
+
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment,
ImmutableList.of(holder4)))
+ .andReturn(holder4);
+ }
EasyMock.replay(throttler, mockPeon1, mockPeon2, mockPeon3, mockPeon4,
mockBalancerStrategy);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
index c4785fd0bc..f79a9bf480 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.simulate;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.timeline.DataSegment;
import java.util.List;
@@ -108,5 +109,10 @@ public interface CoordinatorSimulation
* Adds the specified server to the cluster.
*/
void addServer(DruidServer server);
+
+ /**
+ * Publishes the given segments to the cluster.
+ */
+ void addSegments(List<DataSegment> segments);
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
index b3980630d7..d4e8b7760d 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -135,6 +135,12 @@ public abstract class CoordinatorSimulationBaseTest
sim.cluster().addServer(server);
}
+ @Override
+ public void addSegments(List<DataSegment> segments)
+ {
+ sim.cluster().addSegments(segments);
+ }
+
@Override
public double getLoadPercentage(String datasource)
{
@@ -260,6 +266,7 @@ public abstract class CoordinatorSimulationBaseTest
static class DS
{
static final String WIKI = "wiki";
+ static final String KOALA = "koala";
}
static class Tier
@@ -282,7 +289,7 @@ public abstract class CoordinatorSimulationBaseTest
{
/**
* Segments of datasource {@link DS#WIKI}, size 500 MB each,
- * spanning 1 day containing 10 partitions.
+ * spanning 1 day containing 10 partitions each.
*/
static final List<DataSegment> WIKI_10X1D =
CreateDataSegments.ofDatasource(DS.WIKI)
@@ -293,7 +300,7 @@ public abstract class CoordinatorSimulationBaseTest
/**
* Segments of datasource {@link DS#WIKI}, size 500 MB each,
- * spanning 100 days containing 10 partitions.
+ * spanning 100 days containing 10 partitions each.
*/
static final List<DataSegment> WIKI_10X100D =
CreateDataSegments.ofDatasource(DS.WIKI)
@@ -301,6 +308,17 @@ public abstract class CoordinatorSimulationBaseTest
.startingAt("2022-01-01")
.withNumPartitions(10)
.eachOfSizeInMb(500);
+
+ /**
+ * Segments of datasource {@link DS#KOALA}, size 500 MB each,
+ * spanning 100 days containing 100 partitions each.
+ */
+ static final List<DataSegment> KOALA_100X100D =
+ CreateDataSegments.ofDatasource(DS.KOALA)
+ .forIntervals(100, Granularities.DAY)
+ .startingAt("2022-01-01")
+ .withNumPartitions(100)
+ .eachOfSizeInMb(500);
}
/**
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 2e85f81845..28b84f62e9 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -176,26 +176,21 @@ public class CoordinatorSimulationBuilder
final TestServerInventoryView serverInventoryView = new
TestServerInventoryView();
servers.forEach(serverInventoryView::addServer);
- final TestSegmentsMetadataManager segmentManager = new
TestSegmentsMetadataManager();
- if (segments != null) {
- segments.forEach(segmentManager::addSegment);
- }
-
- final TestMetadataRuleManager ruleManager = new TestMetadataRuleManager();
- datasourceRules.forEach(
- (datasource, rules) ->
- ruleManager.overrideRule(datasource, rules, null)
- );
-
final Environment env = new Environment(
serverInventoryView,
- segmentManager,
- ruleManager,
dynamicConfig,
loadImmediately,
autoSyncInventory
);
+ if (segments != null) {
+ segments.forEach(env.segmentManager::addSegment);
+ }
+ datasourceRules.forEach(
+ (datasource, rules) ->
+ env.ruleManager.overrideRule(datasource, rules, null)
+ );
+
// Build the coordinator
final DruidCoordinator coordinator = new DruidCoordinator(
env.coordinatorConfig,
@@ -375,6 +370,14 @@ public class CoordinatorSimulationBuilder
env.inventory.addServer(server);
}
+ @Override
+ public void addSegments(List<DataSegment> segments)
+ {
+ if (segments != null) {
+ segments.forEach(env.segmentManager::addSegment);
+ }
+ }
+
private void verifySimulationRunning()
{
if (!running.get()) {
@@ -409,8 +412,8 @@ public class CoordinatorSimulationBuilder
= new TestDruidLeaderSelector();
private final ExecutorFactory executorFactory;
- private final TestSegmentsMetadataManager segmentManager;
- private final TestMetadataRuleManager ruleManager;
+ private final TestSegmentsMetadataManager segmentManager = new
TestSegmentsMetadataManager();
+ private final TestMetadataRuleManager ruleManager = new
TestMetadataRuleManager();
private final LoadQueueTaskMaster loadQueueTaskMaster;
@@ -437,16 +440,12 @@ public class CoordinatorSimulationBuilder
private Environment(
TestServerInventoryView clusterInventory,
- TestSegmentsMetadataManager segmentManager,
- TestMetadataRuleManager ruleManager,
CoordinatorDynamicConfig dynamicConfig,
boolean loadImmediately,
boolean autoSyncInventory
)
{
this.inventory = clusterInventory;
- this.segmentManager = segmentManager;
- this.ruleManager = ruleManager;
this.loadImmediately = loadImmediately;
this.autoSyncInventory = autoSyncInventory;
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/RoundRobinAssignmentTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/RoundRobinAssignmentTest.java
new file mode 100644
index 0000000000..cec100691a
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/RoundRobinAssignmentTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RoundRobinAssignmentTest extends CoordinatorSimulationBaseTest
+{
+ private static final long SIZE_1TB = 1_000_000;
+
+ private List<DruidServer> historicals;
+
+ @Override
+ public void setUp()
+ {
+ historicals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ historicals.add(createHistorical(i, Tier.T1, SIZE_1TB));
+ }
+ }
+
+ @Test
+ public void testSegmentsAreAssignedUniformly()
+ {
+ CoordinatorDynamicConfig config =
+ CoordinatorDynamicConfig.builder()
+ .withMaxSegmentsToMove(0)
+ .withMaxSegmentsInNodeLoadingQueue(0)
+ .withReplicationThrottleLimit(20000)
+ .withUseRoundRobinSegmentAssignment(true)
+ .build();
+
+ CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withDynamicConfig(config)
+ .withBalancer("random")
+ .withRules(DS.WIKI, Load.on(Tier.T1, 2).forever())
+ .withServers(historicals)
+ .withSegments(Segments.WIKI_10X100D)
+ .build();
+ startSimulation(sim);
+
+ // Run 1: all segments are assigned and loaded
+ runCoordinatorCycle();
+ loadQueuedSegments();
+ verifyValue(Metric.ASSIGNED_COUNT, 2000L);
+
+ for (DruidServer historical : historicals) {
+ Assert.assertEquals(200, historical.getTotalSegments());
+ }
+ }
+
+ @Test
+ public void testMultipleDatasourceSegmentsAreAssignedUniformly()
+ {
+ final List<DataSegment> segments = new ArrayList<>(Segments.WIKI_10X100D);
+ segments.addAll(Segments.KOALA_100X100D);
+
+ CoordinatorDynamicConfig config =
+ CoordinatorDynamicConfig.builder()
+ .withMaxSegmentsToMove(0)
+ .withMaxSegmentsInNodeLoadingQueue(0)
+ .withReplicationThrottleLimit(20000)
+ .withUseRoundRobinSegmentAssignment(true)
+ .build();
+
+ CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withDynamicConfig(config)
+ .withBalancer("random")
+ .withRules(DS.WIKI, Load.on(Tier.T1, 3).forever())
+ .withRules(DS.KOALA, Load.on(Tier.T1,
1).forever())
+ .withServers(historicals)
+ .withSegments(segments)
+ .build();
+ startSimulation(sim);
+
+ // Run 1: all segments are assigned and loaded
+ runCoordinatorCycle();
+ loadQueuedSegments();
+ verifyValue(Metric.ASSIGNED_COUNT, 13000L);
+
+ for (DruidServer historical : historicals) {
+ Assert.assertEquals(1300, historical.getTotalSegments());
+ }
+ }
+
+}
diff --git
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index 73f7cd29e4..7248903619 100644
---
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -228,7 +228,8 @@ public class CoordinatorDynamicConfigTest
@Test
public void
testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources()
{
- CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(1,
+ CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(
+ 1,
1,
1,
1,
@@ -245,7 +246,9 @@ public class CoordinatorDynamicConfigTest
5,
true,
true,
- 10);
+ 10,
+ false
+ );
Assert.assertTrue(config.isKillUnusedSegmentsInAllDataSources());
Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty());
}
@@ -253,24 +256,27 @@ public class CoordinatorDynamicConfigTest
@Test
public void
testConstructorWithSpecificDataSourcesToKillShouldNotKillUnusedSegmentsInAllDatasources()
{
- CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(1,
- 1,
- 1,
- 1,
- null,
- false,
- 1,
- 2,
- 10,
- true,
-
ImmutableSet.of("test1"),
- null,
- null,
-
ImmutableSet.of("host1"),
- 5,
- true,
- true,
- 10);
+ CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(
+ 1,
+ 1,
+ 1,
+ 1,
+ null,
+ false,
+ 1,
+ 2,
+ 10,
+ true,
+ ImmutableSet.of("test1"),
+ null,
+ null,
+ ImmutableSet.of("host1"),
+ 5,
+ true,
+ true,
+ 10,
+ false
+ );
Assert.assertFalse(config.isKillUnusedSegmentsInAllDataSources());
Assert.assertEquals(ImmutableSet.of("test1"),
config.getSpecificDataSourcesToKillUnusedSegmentsIn());
}
@@ -732,6 +738,7 @@ public class CoordinatorDynamicConfigTest
null,
null,
null,
+ null,
null
).build(current)
);
diff --git
a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
index 6683a721d1..1d148e6b18 100644
---
a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
+++
b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
@@ -207,6 +207,18 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS:
Field<CoordinatorDynamicConfig>[
</>
),
},
+ {
+ name: 'useRoundRobinSegmentAssignment',
+ type: 'boolean',
+ defaultValue: false,
+ info: (
+ <>
+ Boolean flag for whether segments should be assigned to historicals in
a round-robin
+ fashion. If enabled, this can speed up initial segment loading leaving
segment balancing to
+ make cost-based decisions and find the optimal location of a segment.
+ </>
+ ),
+ },
{
name: 'percentOfSegmentsToConsiderPerMove',
type: 'number',
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]