This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 71b133f3ff Add `RoundRobinServerSelector` to speed up segment 
assignments (#13367)
71b133f3ff is described below

commit 71b133f3ff1ba9c547f94ddeed63d72e14419e94
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Nov 16 20:05:17 2022 +0530

    Add `RoundRobinServerSelector` to speed up segment assignments (#13367)
    
    Segment assignments can take very long due to the strategy cost computation
    for a large number of segments. This commit allows segment assignments to be
    done in a round-robin fashion within a tier. Only segment balancing takes 
cost-based
    decisions to move segments around.
    
    Changes
    - Add dynamic config `useRoundRobinSegmentAssignment` with default value 
false
    - Add `RoundRobinServerSelector`. This does not implement the 
`BalancerStrategy`
    as it does not conform to that contract and may also be used in conjunction 
with a
    strategy (round-robin for `RunRules` and a cost strategy for 
`BalanceSegments`)
    - Drops are still cost-based even when round-robin assignment is enabled.
---
 .../coordinator/CoordinatorDynamicConfig.java      |  34 ++++-
 .../druid/server/coordinator/DruidCoordinator.java |  12 +-
 .../coordinator/DruidCoordinatorRuntimeParams.java |  22 +++
 .../coordinator/RoundRobinServerSelector.java      | 153 +++++++++++++++++++++
 .../druid/server/coordinator/ServerHolder.java     |  33 +++++
 .../druid/server/coordinator/rules/LoadRule.java   |  59 +++++---
 .../coordinator/RoundRobinServerSelectorTest.java  | 149 ++++++++++++++++++++
 .../server/coordinator/rules/LoadRuleTest.java     |  75 +++++++---
 .../simulate/CoordinatorSimulation.java            |   6 +
 .../simulate/CoordinatorSimulationBaseTest.java    |  22 ++-
 .../simulate/CoordinatorSimulationBuilder.java     |  37 +++--
 .../simulate/RoundRobinAssignmentTest.java         | 112 +++++++++++++++
 .../server/http/CoordinatorDynamicConfigTest.java  |  47 ++++---
 .../coordinator-dynamic-config.tsx                 |  12 ++
 14 files changed, 691 insertions(+), 82 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index 90574780ea..475a8f88cd 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -61,6 +61,7 @@ public class CoordinatorDynamicConfig
   private final int replicationThrottleLimit;
   private final int balancerComputeThreads;
   private final boolean emitBalancingStats;
+  private final boolean useRoundRobinSegmentAssignment;
 
   /**
    * List of specific data sources for which kill tasks are sent in {@link 
KillUnusedSegments}.
@@ -134,7 +135,8 @@ public class CoordinatorDynamicConfig
       @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int 
decommissioningMaxPercentOfMaxSegmentsToMove,
       @JsonProperty("pauseCoordination") boolean pauseCoordination,
       @JsonProperty("replicateAfterLoadTimeout") boolean 
replicateAfterLoadTimeout,
-      @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer 
maxNonPrimaryReplicantsToLoad
+      @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer 
maxNonPrimaryReplicantsToLoad,
+      @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean 
useRoundRobinSegmentAssignment
   )
   {
     this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
@@ -195,6 +197,12 @@ public class CoordinatorDynamicConfig
         "maxNonPrimaryReplicantsToLoad must be greater than or equal to 0."
     );
     this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad;
+
+    if (useRoundRobinSegmentAssignment == null) {
+      this.useRoundRobinSegmentAssignment = 
Builder.DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT;
+    } else {
+      this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
+    }
   }
 
   private static Set<String> parseJsonStringOrArray(Object jsonStringOrArray)
@@ -316,6 +324,12 @@ public class CoordinatorDynamicConfig
     return maxSegmentsInNodeLoadingQueue;
   }
 
+  @JsonProperty
+  public boolean isUseRoundRobinSegmentAssignment()
+  {
+    return useRoundRobinSegmentAssignment;
+  }
+
   /**
    * List of historical servers to 'decommission'. Coordinator will not assign 
new segments to 'decommissioning'
    * servers, and segments will be moved away from them to be placed on 
non-decommissioning servers at the maximum rate
@@ -509,6 +523,7 @@ public class CoordinatorDynamicConfig
     private static final boolean DEFAULT_PAUSE_COORDINATION = false;
     private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false;
     private static final int DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD = 
Integer.MAX_VALUE;
+    private static final boolean DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT = false;
 
     private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments;
     private Long mergeBytesLimit;
@@ -528,6 +543,7 @@ public class CoordinatorDynamicConfig
     private Boolean pauseCoordination;
     private Boolean replicateAfterLoadTimeout;
     private Integer maxNonPrimaryReplicantsToLoad;
+    private Boolean useRoundRobinSegmentAssignment;
 
     public Builder()
     {
@@ -554,7 +570,8 @@ public class CoordinatorDynamicConfig
         @Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove,
         @JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination,
         @JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean 
replicateAfterLoadTimeout,
-        @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer 
maxNonPrimaryReplicantsToLoad
+        @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer 
maxNonPrimaryReplicantsToLoad,
+        @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean 
useRoundRobinSegmentAssignment
     )
     {
       this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
@@ -576,6 +593,7 @@ public class CoordinatorDynamicConfig
       this.pauseCoordination = pauseCoordination;
       this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;
       this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad;
+      this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
     }
 
     public Builder 
withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long 
leadingTimeMillis)
@@ -681,6 +699,12 @@ public class CoordinatorDynamicConfig
       return this;
     }
 
+    public Builder withUseRoundRobinSegmentAssignment(boolean 
useRoundRobinSegmentAssignment)
+    {
+      this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
+      return this;
+    }
+
     public CoordinatorDynamicConfig build()
     {
       return new CoordinatorDynamicConfig(
@@ -709,7 +733,8 @@ public class CoordinatorDynamicConfig
           pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : 
pauseCoordination,
           replicateAfterLoadTimeout == null ? 
DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
           maxNonPrimaryReplicantsToLoad == null ? 
DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
-                                                : maxNonPrimaryReplicantsToLoad
+                                                : 
maxNonPrimaryReplicantsToLoad,
+          useRoundRobinSegmentAssignment == null ? 
DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment
       );
     }
 
@@ -747,7 +772,8 @@ public class CoordinatorDynamicConfig
           replicateAfterLoadTimeout == null ? 
defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
           maxNonPrimaryReplicantsToLoad == null
           ? defaults.getMaxNonPrimaryReplicantsToLoad()
-          : maxNonPrimaryReplicantsToLoad
+          : maxNonPrimaryReplicantsToLoad,
+          useRoundRobinSegmentAssignment == null ? 
defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment
       );
     }
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 50239db4ec..38fcd9efa5 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -993,10 +993,19 @@ public class DruidCoordinator
 
       stopPeonsForDisappearedServers(currentServers);
 
+      final RoundRobinServerSelector roundRobinServerSelector;
+      if 
(params.getCoordinatorDynamicConfig().isUseRoundRobinSegmentAssignment()) {
+        roundRobinServerSelector = new RoundRobinServerSelector(cluster);
+        log.info("Using round-robin segment assignment.");
+      } else {
+        roundRobinServerSelector = null;
+      }
+
       return params.buildFromExisting()
                    .withDruidCluster(cluster)
                    .withLoadManagementPeons(loadManagementPeons)
                    .withSegmentReplicantLookup(segmentReplicantLookup)
+                   .withRoundRobinServerSelector(roundRobinServerSelector)
                    .build();
     }
 
@@ -1044,7 +1053,8 @@ public class DruidCoordinator
             new ServerHolder(
                 server,
                 loadManagementPeons.get(server.getName()),
-                decommissioningServers.contains(server.getHost())
+                decommissioningServers.contains(server.getHost()),
+                
params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue()
             )
         );
       }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
index ed488607ed..f195a5ebad 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
@@ -70,6 +70,7 @@ public class DruidCoordinatorRuntimeParams
   private final CoordinatorStats stats;
   private final BalancerStrategy balancerStrategy;
   private final Set<String> broadcastDatasources;
+  private final @Nullable RoundRobinServerSelector roundRobinServerSelector;
 
   private DruidCoordinatorRuntimeParams(
       long startTimeNanos,
@@ -80,6 +81,7 @@ public class DruidCoordinatorRuntimeParams
       @Nullable DataSourcesSnapshot dataSourcesSnapshot,
       Map<String, LoadQueuePeon> loadManagementPeons,
       ReplicationThrottler replicationManager,
+      RoundRobinServerSelector roundRobinServerSelector,
       ServiceEmitter emitter,
       CoordinatorDynamicConfig coordinatorDynamicConfig,
       CoordinatorCompactionConfig coordinatorCompactionConfig,
@@ -96,6 +98,7 @@ public class DruidCoordinatorRuntimeParams
     this.dataSourcesSnapshot = dataSourcesSnapshot;
     this.loadManagementPeons = loadManagementPeons;
     this.replicationManager = replicationManager;
+    this.roundRobinServerSelector = roundRobinServerSelector;
     this.emitter = emitter;
     this.coordinatorDynamicConfig = coordinatorDynamicConfig;
     this.coordinatorCompactionConfig = coordinatorCompactionConfig;
@@ -150,6 +153,12 @@ public class DruidCoordinatorRuntimeParams
     return replicationManager;
   }
 
+  @Nullable
+  public RoundRobinServerSelector getRoundRobinServerSelector()
+  {
+    return roundRobinServerSelector;
+  }
+
   public ServiceEmitter getEmitter()
   {
     return emitter;
@@ -211,6 +220,7 @@ public class DruidCoordinatorRuntimeParams
         dataSourcesSnapshot,
         loadManagementPeons,
         replicationManager,
+        roundRobinServerSelector,
         emitter,
         coordinatorDynamicConfig,
         coordinatorCompactionConfig,
@@ -231,6 +241,7 @@ public class DruidCoordinatorRuntimeParams
         null, // dataSourcesSnapshot
         loadManagementPeons,
         replicationManager,
+        roundRobinServerSelector,
         emitter,
         coordinatorDynamicConfig,
         coordinatorCompactionConfig,
@@ -250,6 +261,7 @@ public class DruidCoordinatorRuntimeParams
     private @Nullable DataSourcesSnapshot dataSourcesSnapshot;
     private final Map<String, LoadQueuePeon> loadManagementPeons;
     private ReplicationThrottler replicationManager;
+    private @Nullable RoundRobinServerSelector roundRobinServerSelector;
     private ServiceEmitter emitter;
     private CoordinatorDynamicConfig coordinatorDynamicConfig;
     private CoordinatorCompactionConfig coordinatorCompactionConfig;
@@ -267,6 +279,7 @@ public class DruidCoordinatorRuntimeParams
       this.dataSourcesSnapshot = null;
       this.loadManagementPeons = new HashMap<>();
       this.replicationManager = null;
+      this.roundRobinServerSelector = null;
       this.emitter = null;
       this.stats = new CoordinatorStats();
       this.coordinatorDynamicConfig = 
CoordinatorDynamicConfig.builder().build();
@@ -283,6 +296,7 @@ public class DruidCoordinatorRuntimeParams
         @Nullable DataSourcesSnapshot dataSourcesSnapshot,
         Map<String, LoadQueuePeon> loadManagementPeons,
         ReplicationThrottler replicationManager,
+        RoundRobinServerSelector roundRobinServerSelector,
         ServiceEmitter emitter,
         CoordinatorDynamicConfig coordinatorDynamicConfig,
         CoordinatorCompactionConfig coordinatorCompactionConfig,
@@ -299,6 +313,7 @@ public class DruidCoordinatorRuntimeParams
       this.dataSourcesSnapshot = dataSourcesSnapshot;
       this.loadManagementPeons = loadManagementPeons;
       this.replicationManager = replicationManager;
+      this.roundRobinServerSelector = roundRobinServerSelector;
       this.emitter = emitter;
       this.coordinatorDynamicConfig = coordinatorDynamicConfig;
       this.coordinatorCompactionConfig = coordinatorCompactionConfig;
@@ -319,6 +334,7 @@ public class DruidCoordinatorRuntimeParams
           dataSourcesSnapshot,
           loadManagementPeons,
           replicationManager,
+          roundRobinServerSelector,
           emitter,
           coordinatorDynamicConfig,
           coordinatorCompactionConfig,
@@ -401,6 +417,12 @@ public class DruidCoordinatorRuntimeParams
       return this;
     }
 
+    public Builder withRoundRobinServerSelector(RoundRobinServerSelector 
roundRobinServerSelector)
+    {
+      this.roundRobinServerSelector = roundRobinServerSelector;
+      return this;
+    }
+
     public Builder withEmitter(ServiceEmitter emitter)
     {
       this.emitter = emitter;
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/RoundRobinServerSelector.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/RoundRobinServerSelector.java
new file mode 100644
index 0000000000..89459adcc4
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/RoundRobinServerSelector.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * Provides iterators over historicals for a given tier that can load a
+ * specified segment.
+ * <p>
+ * Once a selector is initialized with a {@link DruidCluster}, an iterator
+ * returned by {@link #getServersInTierToLoadSegment(String, DataSegment)}
+ * iterates over the historicals in a tier in a round robin fashion. The next
+ * invocation of this method picks up where the last iterator had left off.
+ * <p>
+ * This class is not thread-safe and must be used from a single thread.
+ */
+@NotThreadSafe
+public class RoundRobinServerSelector
+{
+  private final Map<String, CircularServerList> tierToServers = new 
HashMap<>();
+
+  public RoundRobinServerSelector(DruidCluster cluster)
+  {
+    cluster.getHistoricals().forEach(
+        (tier, servers) -> tierToServers.put(tier, new 
CircularServerList(servers))
+    );
+  }
+
+  /**
+   * Returns an iterator over the servers in this tier which are eligible to
+   * load the given segment.
+   */
+  public Iterator<ServerHolder> getServersInTierToLoadSegment(String tier, 
DataSegment segment)
+  {
+    final CircularServerList iterator = tierToServers.get(tier);
+    if (iterator == null) {
+      return Collections.emptyIterator();
+    }
+
+    return new EligibleServerIterator(segment, iterator);
+  }
+
+  /**
+   * Iterator over servers in a tier that are eligible to load a given segment.
+   */
+  private static class EligibleServerIterator implements Iterator<ServerHolder>
+  {
+    final CircularServerList delegate;
+    final DataSegment segment;
+
+    ServerHolder nextEligible;
+    int remainingIterations;
+
+    EligibleServerIterator(DataSegment segment, CircularServerList delegate)
+    {
+      this.delegate = delegate;
+      this.segment = segment;
+      this.remainingIterations = delegate.servers.size();
+      nextEligible = search();
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return nextEligible != null;
+    }
+
+    @Override
+    public ServerHolder next()
+    {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      ServerHolder previous = nextEligible;
+      delegate.advanceCursor();
+      nextEligible = search();
+      return previous;
+    }
+
+    ServerHolder search()
+    {
+      while (remainingIterations-- > 0) {
+        ServerHolder nextServer = delegate.peekNext();
+        if (nextServer.canLoadSegment(segment)) {
+          return nextServer;
+        } else {
+          delegate.advanceCursor();
+        }
+      }
+
+      return null;
+    }
+  }
+
+  /**
+   * Circular list over all servers in a tier. A single instance of this is
+   * maintained for each tier.
+   */
+  private static class CircularServerList
+  {
+    final List<ServerHolder> servers = new ArrayList<>();
+    int currentPosition;
+
+    CircularServerList(Set<ServerHolder> servers)
+    {
+      this.servers.addAll(servers);
+      //Collections.shuffle(this.servers);
+    }
+
+    void advanceCursor()
+    {
+      if (++currentPosition >= servers.size()) {
+        currentPosition = 0;
+      }
+    }
+
+    ServerHolder peekNext()
+    {
+      int nextPosition = currentPosition < servers.size() ? currentPosition : 
0;
+      return servers.get(nextPosition);
+    }
+  }
+
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java 
b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
index 43fdaaef1d..057dfb118c 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
@@ -34,6 +34,7 @@ public class ServerHolder implements Comparable<ServerHolder>
   private final ImmutableDruidServer server;
   private final LoadQueuePeon peon;
   private final boolean isDecommissioning;
+  private final int maxSegmentsInLoadQueue;
 
   public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon)
   {
@@ -41,10 +42,21 @@ public class ServerHolder implements 
Comparable<ServerHolder>
   }
 
   public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon, boolean 
isDecommissioning)
+  {
+    this(server, peon, isDecommissioning, 0);
+  }
+
+  public ServerHolder(
+      ImmutableDruidServer server,
+      LoadQueuePeon peon,
+      boolean isDecommissioning,
+      int maxSegmentsInNodeLoadingQueue
+  )
   {
     this.server = server;
     this.peon = peon;
     this.isDecommissioning = isDecommissioning;
+    this.maxSegmentsInLoadQueue = maxSegmentsInNodeLoadingQueue;
   }
 
   public ImmutableDruidServer getServer()
@@ -138,6 +150,27 @@ public class ServerHolder implements 
Comparable<ServerHolder>
     return server.getSegment(segmentId) != null;
   }
 
+  /**
+   * Checks if the server can load the given segment.
+   * <p>
+   * A load is possible only if the server meets all of the following criteria:
+   * <ul>
+   *   <li>is not being decommissioned</li>
+   *   <li>is not already serving the segment</li>
+   *   <li>is not performing any other action on the segment</li>
+   *   <li>has not already exceeded the load queue limit in this run</li>
+   *   <li>has available disk space</li>
+   * </ul>
+   */
+  public boolean canLoadSegment(DataSegment segment)
+  {
+    return !isDecommissioning
+           && !isServingSegment(segment.getId())
+           && !isLoadingSegment(segment)
+           && (maxSegmentsInLoadQueue == 0 || maxSegmentsInLoadQueue > 
peon.getNumberOfSegmentsInQueue())
+           && getAvailableSize() >= segment.getSize();
+  }
+
   @Override
   public int compareTo(ServerHolder serverHolder)
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
index 3b20cf0913..933b63e472 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
@@ -44,7 +44,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
-import java.util.Objects;
 import java.util.TreeSet;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -178,7 +177,7 @@ public abstract class LoadRule implements Rule
           targetReplicants.getOrDefault(tier, 0),
           numAssigned, // note that the currentReplicantsInTier is the 
just-assigned primary replica.
           params,
-          createLoadQueueSizeLimitingPredicate(params).and(holder -> 
!holder.equals(primaryHolderToLoad)),
+          createLoadQueueSizeLimitingPredicate(segment).and(holder -> 
!holder.equals(primaryHolderToLoad)),
           segment
       );
 
@@ -193,15 +192,10 @@ public abstract class LoadRule implements Rule
   }
 
   private static Predicate<ServerHolder> createLoadQueueSizeLimitingPredicate(
-      final DruidCoordinatorRuntimeParams params
+      final DataSegment segment
   )
   {
-    final int maxSegmentsInNodeLoadingQueue = 
params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
-    if (maxSegmentsInNodeLoadingQueue <= 0) {
-      return Objects::nonNull;
-    } else {
-      return s -> (s != null && s.getNumberOfSegmentsInQueue() < 
maxSegmentsInNodeLoadingQueue);
-    }
+    return server -> server != null && server.canLoadSegment(segment);
   }
 
   private static List<ServerHolder> getFilteredHolders(
@@ -219,6 +213,21 @@ public abstract class LoadRule implements Rule
     return 
queue.stream().filter(isActive.and(predicate)).collect(Collectors.toList());
   }
 
+  private Iterator<ServerHolder> getRoundRobinIterator(
+      DruidCoordinatorRuntimeParams params,
+      String tier,
+      DataSegment segment
+  )
+  {
+    if (params.getRoundRobinServerSelector() == null
+        || 
!params.getCoordinatorDynamicConfig().isUseRoundRobinSegmentAssignment()) {
+      return null;
+    }
+
+    return params.getRoundRobinServerSelector()
+                 .getServersInTierToLoadSegment(tier, segment);
+  }
+
   /**
    * Iterates through each tier and find the respective segment homes; with 
the found segment homes, selects the one
    * with the highest priority to be the holder for the primary replica.
@@ -230,6 +239,8 @@ public abstract class LoadRule implements Rule
   )
   {
     ServerHolder topCandidate = null;
+    final boolean useRoundRobinAssignment = 
params.getCoordinatorDynamicConfig()
+                                                  
.isUseRoundRobinSegmentAssignment();
     for (final Object2IntMap.Entry<String> entry : 
targetReplicants.object2IntEntrySet()) {
       final int targetReplicantsInTier = entry.getIntValue();
       // sanity check: target number of replicants should be more than zero.
@@ -248,7 +259,7 @@ public abstract class LoadRule implements Rule
       final List<ServerHolder> holders = getFilteredHolders(
           tier,
           params.getDruidCluster(),
-          createLoadQueueSizeLimitingPredicate(params)
+          createLoadQueueSizeLimitingPredicate(segment)
       );
       // no holders satisfy the predicate
       if (holders.isEmpty()) {
@@ -256,12 +267,20 @@ public abstract class LoadRule implements Rule
         continue;
       }
 
-      final ServerHolder candidate = 
params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders);
+      final ServerHolder candidate;
+      if (useRoundRobinAssignment) {
+        Iterator<ServerHolder> roundRobinIterator = 
getRoundRobinIterator(params, tier, segment);
+        candidate = roundRobinIterator.hasNext() ? roundRobinIterator.next() : 
null;
+      } else {
+        candidate = 
params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders);
+        if (candidate != null) {
+          strategyCache.put(tier, candidate);
+        }
+      }
+
       if (candidate == null) {
         log.warn(noAvailability);
       } else {
-        // cache the result for later use.
-        strategyCache.put(tier, candidate);
         if (topCandidate == null ||
             candidate.getServer().getPriority() > 
topCandidate.getServer().getPriority()) {
           topCandidate = candidate;
@@ -307,7 +326,7 @@ public abstract class LoadRule implements Rule
           entry.getIntValue(),
           
params.getSegmentReplicantLookup().getTotalReplicants(segment.getId(), tier),
           params,
-          createLoadQueueSizeLimitingPredicate(params),
+          createLoadQueueSizeLimitingPredicate(segment),
           segment
       );
       stats.addToGlobalStat(NON_PRIMARY_ASSIGNED_COUNT, numAssigned);
@@ -347,6 +366,7 @@ public abstract class LoadRule implements Rule
       return 0;
     }
 
+    final Iterator<ServerHolder> roundRobinServerIterator = 
getRoundRobinIterator(params, tier, segment);
     final ReplicationThrottler throttler = params.getReplicationManager();
     for (int numAssigned = 0; numAssigned < numToAssign; numAssigned++) {
       if (!throttler.canCreateReplicant(tier)) {
@@ -355,10 +375,15 @@ public abstract class LoadRule implements Rule
       }
 
       // Retrieves from cache if available
-      ServerHolder holder = strategyCache.remove(tier);
-      // Does strategy call if not in cache
-      if (holder == null) {
+      final ServerHolder holder;
+      if (strategyCache.containsKey(tier)) {
+        // found in cache
+        holder = strategyCache.remove(tier);
+      } else if (roundRobinServerIterator == null) {
+        // Call balancer strategy
         holder = 
params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders);
+      } else {
+        holder = roundRobinServerIterator.hasNext() ? 
roundRobinServerIterator.next() : null;
       }
 
       if (holder == null) {
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
new file mode 100644
index 0000000000..bcef7ca8a4
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+public class RoundRobinServerSelectorTest
+{
+  private static final String TIER = "normal";
+
+  private final DataSegment segment = new DataSegment(
+      "wiki",
+      Intervals.of("2022-01-01/2022-01-02"),
+      "1",
+      Collections.emptyMap(),
+      Collections.emptyList(),
+      Collections.emptyList(),
+      new NumberedShardSpec(1, 10),
+      IndexIO.CURRENT_VERSION_ID,
+      100
+  );
+
+  @Test
+  public void testSingleIterator()
+  {
+    final ServerHolder serverXL = createHistorical("serverXL", 1000);
+    final ServerHolder serverL = createHistorical("serverXL", 900);
+    final ServerHolder serverM = createHistorical("serverXL", 800);
+
+    // This server is too small to house the segment
+    final ServerHolder serverXS = createHistorical("serverXL", 10);
+
+    DruidCluster cluster = DruidClusterBuilder
+        .newBuilder()
+        .addTier(TIER, serverXL, serverM, serverXS, serverL)
+        .build();
+    final RoundRobinServerSelector selector = new 
RoundRobinServerSelector(cluster);
+
+    // Verify that only eligible servers are returned in order of available 
size
+    Iterator<ServerHolder> pickedServers = 
selector.getServersInTierToLoadSegment(TIER, segment);
+
+    Assert.assertTrue(pickedServers.hasNext());
+    Assert.assertEquals(serverXL, pickedServers.next());
+    Assert.assertEquals(serverL, pickedServers.next());
+    Assert.assertEquals(serverM, pickedServers.next());
+
+    Assert.assertFalse(pickedServers.hasNext());
+  }
+
+  @Test
+  public void testNextIteratorContinuesFromSamePosition()
+  {
+    final ServerHolder serverXL = createHistorical("serverXL", 1000);
+    final ServerHolder serverL = createHistorical("serverXL", 900);
+    final ServerHolder serverM = createHistorical("serverXL", 800);
+
+    // This server is too small to house the segment
+    final ServerHolder serverXS = createHistorical("serverXL", 10);
+
+    DruidCluster cluster = DruidClusterBuilder
+        .newBuilder()
+        .addTier(TIER, serverXL, serverM, serverXS, serverL)
+        .build();
+    final RoundRobinServerSelector selector = new 
RoundRobinServerSelector(cluster);
+
+    // Verify that only eligible servers are returned in order of available 
size
+    Iterator<ServerHolder> pickedServers = 
selector.getServersInTierToLoadSegment(TIER, segment);
+    Assert.assertTrue(pickedServers.hasNext());
+    Assert.assertEquals(serverXL, pickedServers.next());
+
+    // Second iterator starts from previous position but resets allowed number 
of iterations
+    pickedServers = selector.getServersInTierToLoadSegment(TIER, segment);
+    Assert.assertTrue(pickedServers.hasNext());
+
+    Assert.assertEquals(serverL, pickedServers.next());
+    Assert.assertEquals(serverM, pickedServers.next());
+    Assert.assertEquals(serverXL, pickedServers.next());
+
+    Assert.assertFalse(pickedServers.hasNext());
+  }
+
+  @Test
+  public void testNoServersInTier()
+  {
+    DruidCluster cluster = DruidClusterBuilder
+        .newBuilder()
+        .addTier(TIER)
+        .build();
+    final RoundRobinServerSelector selector = new 
RoundRobinServerSelector(cluster);
+
+    Iterator<ServerHolder> eligibleServers = 
selector.getServersInTierToLoadSegment(TIER, segment);
+    Assert.assertFalse(eligibleServers.hasNext());
+  }
+
+  @Test
+  public void testNoEligibleServerInTier()
+  {
+    DruidCluster cluster = DruidClusterBuilder
+        .newBuilder()
+        .addTier(
+            TIER,
+            createHistorical("server1", 40),
+            createHistorical("server2", 30),
+            createHistorical("server3", 10),
+            createHistorical("server4", 20)
+        )
+        .build();
+    final RoundRobinServerSelector selector = new 
RoundRobinServerSelector(cluster);
+
+    // Verify that only eligible servers are returned in order of available 
size
+    Iterator<ServerHolder> eligibleServers = 
selector.getServersInTierToLoadSegment(TIER, segment);
+    Assert.assertFalse(eligibleServers.hasNext());
+  }
+
+  private ServerHolder createHistorical(String name, long size)
+  {
+    return new ServerHolder(
+        new DruidServer(name, name, null, size, ServerType.HISTORICAL, TIER, 
1).toImmutableDruidServer(),
+        new LoadQueuePeonTester()
+    );
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
index 99a975caff..b039834e7b 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
@@ -48,6 +48,7 @@ import 
org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.LoadQueuePeon;
 import org.apache.druid.server.coordinator.LoadQueuePeonTester;
 import org.apache.druid.server.coordinator.ReplicationThrottler;
+import org.apache.druid.server.coordinator.RoundRobinServerSelector;
 import org.apache.druid.server.coordinator.SegmentReplicantLookup;
 import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.server.coordinator.cost.ClusterCostCache;
@@ -61,8 +62,11 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -73,6 +77,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  */
+@RunWith(Parameterized.class)
 public class LoadRuleTest
 {
   private static final Logger log = new Logger(LoadRuleTest.class);
@@ -95,8 +100,20 @@ public class LoadRuleTest
 
   private CachingCostBalancerStrategy cachingCostBalancerStrategy;
 
+  private final boolean useRoundRobinAssignment;
   private BalancerStrategy mockBalancerStrategy;
 
+  @Parameterized.Parameters(name = "useRoundRobin = {0}")
+  public static List<Boolean> getTestParams()
+  {
+    return Arrays.asList(true, false);
+  }
+
+  public LoadRuleTest(boolean useRoundRobinAssignment)
+  {
+    this.useRoundRobinAssignment = useRoundRobinAssignment;
+  }
+
   @Before
   public void setUp()
   {
@@ -106,7 +123,6 @@ public class LoadRuleTest
 
     exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
     balancerStrategy = new 
CostBalancerStrategyFactory().createBalancerStrategy(exec);
-
     cachingCostBalancerStrategy = new 
CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);
 
     mockBalancerStrategy = EasyMock.createMock(BalancerStrategy.class);
@@ -138,9 +154,11 @@ public class LoadRuleTest
     throttler.registerReplicantCreation(DruidServer.DEFAULT_TIER, 
segment.getId(), "hostNorm");
     EasyMock.expectLastCall().once();
 
-    
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
 EasyMock.anyObject()))
-            .andDelegateTo(balancerStrategy)
-            .times(3);
+    if (!useRoundRobinAssignment) {
+      
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
 EasyMock.anyObject()))
+              .andDelegateTo(balancerStrategy)
+              .times(3);
+    }
 
     EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);
 
@@ -189,6 +207,13 @@ public class LoadRuleTest
         .withDruidCluster(druidCluster)
         .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, 
false))
         .withReplicationManager(throttler)
+        .withDynamicConfigs(
+            CoordinatorDynamicConfig
+                .builder()
+                .withUseRoundRobinSegmentAssignment(useRoundRobinAssignment)
+                .build()
+        )
+        .withRoundRobinServerSelector(useRoundRobinAssignment ? new 
RoundRobinServerSelector(druidCluster) : null)
         .withBalancerStrategy(mockBalancerStrategy)
         .withUsedSegmentsInTest(usedSegments)
         .build();
@@ -440,9 +465,11 @@ public class LoadRuleTest
     mockPeon2.loadSegment(EasyMock.anyObject(), EasyMock.isNull());
     EasyMock.expectLastCall().once();
 
-    
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
 EasyMock.anyObject()))
-            .andDelegateTo(balancerStrategy)
-            .times(2);
+    if (!useRoundRobinAssignment) {
+      
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
 EasyMock.anyObject()))
+              .andDelegateTo(balancerStrategy)
+              .times(2);
+    }
 
     EasyMock.replay(throttler, mockPeon1, mockPeon2, mockBalancerStrategy);
 
@@ -626,6 +653,7 @@ public class LoadRuleTest
   @Test
   public void testMaxLoadingQueueSize()
   {
+    final int maxSegmentsInLoadQueue = 2;
     
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
 EasyMock.anyObject()))
             .andDelegateTo(balancerStrategy)
             .times(2);
@@ -643,7 +671,9 @@ public class LoadRuleTest
             new ServerHolder(
                 new DruidServer("serverHot", "hostHot", null, 1000, 
ServerType.HISTORICAL, "hot", 0)
                     .toImmutableDruidServer(),
-                peon
+                peon,
+                false,
+                maxSegmentsInLoadQueue
             )
         )
         .build();
@@ -659,8 +689,11 @@ public class LoadRuleTest
         .withReplicationManager(throttler)
         .withBalancerStrategy(mockBalancerStrategy)
         .withUsedSegmentsInTest(dataSegment1, dataSegment2, dataSegment3)
-        
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsInNodeLoadingQueue(2).build())
-        .build();
+        .withDynamicConfigs(
+            CoordinatorDynamicConfig.builder()
+                                    
.withMaxSegmentsInNodeLoadingQueue(maxSegmentsInLoadQueue)
+                                    .build()
+        ).build();
 
     CoordinatorStats stats1 = rule.run(null, params, dataSegment1);
     CoordinatorStats stats2 = rule.run(null, params, dataSegment2);
@@ -687,9 +720,11 @@ public class LoadRuleTest
 
     final DataSegment segment = createDataSegment("foo");
 
-    
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
 EasyMock.anyObject()))
-            .andDelegateTo(balancerStrategy)
-            .times(1);
+    if (!useRoundRobinAssignment) {
+      
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
 EasyMock.anyObject()))
+              .andDelegateTo(balancerStrategy)
+              .times(1);
+    }
 
     EasyMock.replay(mockPeon1, mockPeon2, mockBalancerStrategy);
 
@@ -732,12 +767,14 @@ public class LoadRuleTest
     ServerHolder holder3 = createServerHolder("tier2", mockPeon3, false);
     ServerHolder holder4 = createServerHolder("tier2", mockPeon4, false);
 
-    EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment, 
ImmutableList.of(holder2)))
-            .andReturn(holder2);
-    EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment, 
ImmutableList.of(holder4, holder3)))
-            .andReturn(holder3);
-    EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment, 
ImmutableList.of(holder4)))
-            .andReturn(holder4);
+    if (!useRoundRobinAssignment) {
+      
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment, 
ImmutableList.of(holder2)))
+              .andReturn(holder2);
+      
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment, 
ImmutableList.of(holder4, holder3)))
+              .andReturn(holder3);
+      
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment, 
ImmutableList.of(holder4)))
+              .andReturn(holder4);
+    }
 
     EasyMock.replay(throttler, mockPeon1, mockPeon2, mockPeon3, mockPeon4, 
mockBalancerStrategy);
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
index c4785fd0bc..f79a9bf480 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.simulate;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.timeline.DataSegment;
 
 import java.util.List;
 
@@ -108,5 +109,10 @@ public interface CoordinatorSimulation
      * Adds the specified server to the cluster.
      */
     void addServer(DruidServer server);
+
+    /**
+     * Publishes the given segments to the cluster.
+     */
+    void addSegments(List<DataSegment> segments);
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
index b3980630d7..d4e8b7760d 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -135,6 +135,12 @@ public abstract class CoordinatorSimulationBaseTest
     sim.cluster().addServer(server);
   }
 
+  @Override
+  public void addSegments(List<DataSegment> segments)
+  {
+    sim.cluster().addSegments(segments);
+  }
+
   @Override
   public double getLoadPercentage(String datasource)
   {
@@ -260,6 +266,7 @@ public abstract class CoordinatorSimulationBaseTest
   static class DS
   {
     static final String WIKI = "wiki";
+    static final String KOALA = "koala";
   }
 
   static class Tier
@@ -282,7 +289,7 @@ public abstract class CoordinatorSimulationBaseTest
   {
     /**
      * Segments of datasource {@link DS#WIKI}, size 500 MB each,
-     * spanning 1 day containing 10 partitions.
+     * spanning 1 day containing 10 partitions each.
      */
     static final List<DataSegment> WIKI_10X1D =
         CreateDataSegments.ofDatasource(DS.WIKI)
@@ -293,7 +300,7 @@ public abstract class CoordinatorSimulationBaseTest
 
     /**
      * Segments of datasource {@link DS#WIKI}, size 500 MB each,
-     * spanning 100 days containing 10 partitions.
+     * spanning 100 days containing 10 partitions each.
      */
     static final List<DataSegment> WIKI_10X100D =
         CreateDataSegments.ofDatasource(DS.WIKI)
@@ -301,6 +308,17 @@ public abstract class CoordinatorSimulationBaseTest
                           .startingAt("2022-01-01")
                           .withNumPartitions(10)
                           .eachOfSizeInMb(500);
+
+    /**
+     * Segments of datasource {@link DS#KOALA}, size 500 MB each,
+     * spanning 100 days containing 100 partitions each.
+     */
+    static final List<DataSegment> KOALA_100X100D =
+        CreateDataSegments.ofDatasource(DS.KOALA)
+                          .forIntervals(100, Granularities.DAY)
+                          .startingAt("2022-01-01")
+                          .withNumPartitions(100)
+                          .eachOfSizeInMb(500);
   }
 
   /**
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 2e85f81845..28b84f62e9 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -176,26 +176,21 @@ public class CoordinatorSimulationBuilder
     final TestServerInventoryView serverInventoryView = new 
TestServerInventoryView();
     servers.forEach(serverInventoryView::addServer);
 
-    final TestSegmentsMetadataManager segmentManager = new 
TestSegmentsMetadataManager();
-    if (segments != null) {
-      segments.forEach(segmentManager::addSegment);
-    }
-
-    final TestMetadataRuleManager ruleManager = new TestMetadataRuleManager();
-    datasourceRules.forEach(
-        (datasource, rules) ->
-            ruleManager.overrideRule(datasource, rules, null)
-    );
-
     final Environment env = new Environment(
         serverInventoryView,
-        segmentManager,
-        ruleManager,
         dynamicConfig,
         loadImmediately,
         autoSyncInventory
     );
 
+    if (segments != null) {
+      segments.forEach(env.segmentManager::addSegment);
+    }
+    datasourceRules.forEach(
+        (datasource, rules) ->
+            env.ruleManager.overrideRule(datasource, rules, null)
+    );
+
     // Build the coordinator
     final DruidCoordinator coordinator = new DruidCoordinator(
         env.coordinatorConfig,
@@ -375,6 +370,14 @@ public class CoordinatorSimulationBuilder
       env.inventory.addServer(server);
     }
 
+    @Override
+    public void addSegments(List<DataSegment> segments)
+    {
+      if (segments != null) {
+        segments.forEach(env.segmentManager::addSegment);
+      }
+    }
+
     private void verifySimulationRunning()
     {
       if (!running.get()) {
@@ -409,8 +412,8 @@ public class CoordinatorSimulationBuilder
         = new TestDruidLeaderSelector();
 
     private final ExecutorFactory executorFactory;
-    private final TestSegmentsMetadataManager segmentManager;
-    private final TestMetadataRuleManager ruleManager;
+    private final TestSegmentsMetadataManager segmentManager = new 
TestSegmentsMetadataManager();
+    private final TestMetadataRuleManager ruleManager = new 
TestMetadataRuleManager();
 
     private final LoadQueueTaskMaster loadQueueTaskMaster;
 
@@ -437,16 +440,12 @@ public class CoordinatorSimulationBuilder
 
     private Environment(
         TestServerInventoryView clusterInventory,
-        TestSegmentsMetadataManager segmentManager,
-        TestMetadataRuleManager ruleManager,
         CoordinatorDynamicConfig dynamicConfig,
         boolean loadImmediately,
         boolean autoSyncInventory
     )
     {
       this.inventory = clusterInventory;
-      this.segmentManager = segmentManager;
-      this.ruleManager = ruleManager;
       this.loadImmediately = loadImmediately;
       this.autoSyncInventory = autoSyncInventory;
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/RoundRobinAssignmentTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/RoundRobinAssignmentTest.java
new file mode 100644
index 0000000000..cec100691a
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/RoundRobinAssignmentTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RoundRobinAssignmentTest extends CoordinatorSimulationBaseTest
+{
+  private static final long SIZE_1TB = 1_000_000;
+
+  private List<DruidServer> historicals;
+
+  @Override
+  public void setUp()
+  {
+    historicals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      historicals.add(createHistorical(i, Tier.T1, SIZE_1TB));
+    }
+  }
+
+  @Test
+  public void testSegmentsAreAssignedUniformly()
+  {
+    CoordinatorDynamicConfig config =
+        CoordinatorDynamicConfig.builder()
+                                .withMaxSegmentsToMove(0)
+                                .withMaxSegmentsInNodeLoadingQueue(0)
+                                .withReplicationThrottleLimit(20000)
+                                .withUseRoundRobinSegmentAssignment(true)
+                                .build();
+
+    CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withDynamicConfig(config)
+                             .withBalancer("random")
+                             .withRules(DS.WIKI, Load.on(Tier.T1, 2).forever())
+                             .withServers(historicals)
+                             .withSegments(Segments.WIKI_10X100D)
+                             .build();
+    startSimulation(sim);
+
+    // Run 1: all segments are assigned and loaded
+    runCoordinatorCycle();
+    loadQueuedSegments();
+    verifyValue(Metric.ASSIGNED_COUNT, 2000L);
+
+    for (DruidServer historical : historicals) {
+      Assert.assertEquals(200, historical.getTotalSegments());
+    }
+  }
+
+  @Test
+  public void testMultipleDatasourceSegmentsAreAssignedUniformly()
+  {
+    final List<DataSegment> segments = new ArrayList<>(Segments.WIKI_10X100D);
+    segments.addAll(Segments.KOALA_100X100D);
+
+    CoordinatorDynamicConfig config =
+        CoordinatorDynamicConfig.builder()
+                                .withMaxSegmentsToMove(0)
+                                .withMaxSegmentsInNodeLoadingQueue(0)
+                                .withReplicationThrottleLimit(20000)
+                                .withUseRoundRobinSegmentAssignment(true)
+                                .build();
+
+    CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withDynamicConfig(config)
+                             .withBalancer("random")
+                             .withRules(DS.WIKI, Load.on(Tier.T1, 3).forever())
+                             .withRules(DS.KOALA, Load.on(Tier.T1, 
1).forever())
+                             .withServers(historicals)
+                             .withSegments(segments)
+                             .build();
+    startSimulation(sim);
+
+    // Run 1: all segments are assigned and loaded
+    runCoordinatorCycle();
+    loadQueuedSegments();
+    verifyValue(Metric.ASSIGNED_COUNT, 13000L);
+
+    for (DruidServer historical : historicals) {
+      Assert.assertEquals(1300, historical.getTotalSegments());
+    }
+  }
+
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index 73f7cd29e4..7248903619 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -228,7 +228,8 @@ public class CoordinatorDynamicConfigTest
   @Test
   public void 
testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources()
   {
-    CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(1,
+    CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(
+        1,
         1,
         1,
         1,
@@ -245,7 +246,9 @@ public class CoordinatorDynamicConfigTest
         5,
         true,
         true,
-        10);
+        10,
+        false
+    );
     Assert.assertTrue(config.isKillUnusedSegmentsInAllDataSources());
     
Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty());
   }
@@ -253,24 +256,27 @@ public class CoordinatorDynamicConfigTest
   @Test
   public void 
testConstructorWithSpecificDataSourcesToKillShouldNotKillUnusedSegmentsInAllDatasources()
   {
-    CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(1,
-                                                                   1,
-                                                                   1,
-                                                                   1,
-                                                                   null,
-                                                                   false,
-                                                                   1,
-                                                                   2,
-                                                                   10,
-                                                                   true,
-                                                                   
ImmutableSet.of("test1"),
-                                                                   null,
-                                                                   null,
-                                                                   
ImmutableSet.of("host1"),
-                                                                   5,
-                                                                   true,
-                                                                   true,
-                                                                   10);
+    CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(
+        1,
+        1,
+        1,
+        1,
+        null,
+        false,
+        1,
+        2,
+        10,
+        true,
+        ImmutableSet.of("test1"),
+        null,
+        null,
+        ImmutableSet.of("host1"),
+        5,
+        true,
+        true,
+        10,
+        false
+    );
     Assert.assertFalse(config.isKillUnusedSegmentsInAllDataSources());
     Assert.assertEquals(ImmutableSet.of("test1"), 
config.getSpecificDataSourcesToKillUnusedSegmentsIn());
   }
@@ -732,6 +738,7 @@ public class CoordinatorDynamicConfigTest
             null,
             null,
             null,
+            null,
             null
         ).build(current)
     );
diff --git 
a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
 
b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
index 6683a721d1..1d148e6b18 100644
--- 
a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
+++ 
b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
@@ -207,6 +207,18 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: 
Field<CoordinatorDynamicConfig>[
       </>
     ),
   },
+  {
+    name: 'useRoundRobinSegmentAssignment',
+    type: 'boolean',
+    defaultValue: false,
+    info: (
+      <>
+        Boolean flag for whether segments should be assigned to historicals in 
a round-robin
+        fashion. If enabled, this can speed up initial segment loading leaving 
segment balancing to
+        make cost-based decisions and find the optimal location of a segment.
+      </>
+    ),
+  },
   {
     name: 'percentOfSegmentsToConsiderPerMove',
     type: 'number',


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to