This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new cefac4765cb Add coordinator dynamic config `cloneServers` to clone an 
existing historical (#17863)
cefac4765cb is described below

commit cefac4765cb8ab3fda260ac8c433adc62ca639cb
Author: Adarsh Sanjeev <adarshsanj...@gmail.com>
AuthorDate: Wed Apr 9 09:22:07 2025 +0530

    Add coordinator dynamic config `cloneServers` to clone an existing 
historical (#17863)
    
    Changes
    ---------
    - Add coordinator dynamic config `cloneServers`, which contains mapping 
from clone target to clone source.
    - Mark clone targets as "unmanaged" and exclude them from segment 
assignment, drop and balancing
    - Add duty `CloneHistoricals` that ensures that clone target remains in 
sync with clone source
    - Add simulation tests for cloning
---
 docs/api-reference/dynamic-configuration-api.md    |  10 +-
 docs/configuration/index.md                        |   1 +
 .../coordinator/CoordinatorDynamicConfig.java      |  35 ++-
 .../druid/server/coordinator/DruidCluster.java     |  50 +++-
 .../druid/server/coordinator/DruidCoordinator.java |   2 +
 .../druid/server/coordinator/ServerHolder.java     |  58 +++-
 .../coordinator/balancer/CostBalancerStrategy.java |   2 +-
 .../balancer/SegmentToMoveCalculator.java          |  14 +-
 .../coordinator/balancer/TierSegmentBalancer.java  |   2 +-
 .../server/coordinator/duty/BalanceSegments.java   |   4 +-
 .../server/coordinator/duty/CloneHistoricals.java  | 119 ++++++++
 .../duty/MarkOvershadowedSegmentsAsUnused.java     |   2 +-
 .../duty/PrepareBalancerAndLoadQueues.java         |  16 +-
 .../coordinator/duty/UnloadUnusedSegments.java     |   2 +-
 .../coordinator/loading/HttpLoadQueuePeon.java     |   3 +-
 .../loading/RoundRobinServerSelector.java          |   2 +-
 .../loading/SegmentReplicaCountMap.java            |   2 +-
 .../loading/StrategicSegmentAssigner.java          |   8 +-
 .../druid/server/coordinator/stats/Stats.java      |   8 +
 .../druid/server/coordinator/DruidClusterTest.java |   4 +-
 .../coordinator/loading/HttpLoadQueuePeonTest.java |  33 ++-
 .../simulate/HistoricalCloningTest.java            | 300 +++++++++++++++++++++
 .../server/http/CoordinatorDynamicConfigTest.java  |   6 +-
 23 files changed, 611 insertions(+), 72 deletions(-)

diff --git a/docs/api-reference/dynamic-configuration-api.md 
b/docs/api-reference/dynamic-configuration-api.md
index 971aa81d206..cad61e4b88f 100644
--- a/docs/api-reference/dynamic-configuration-api.md
+++ b/docs/api-reference/dynamic-configuration-api.md
@@ -106,7 +106,9 @@ Host: http://ROUTER_IP:ROUTER_PORT
     "useRoundRobinSegmentAssignment": true,
     "smartSegmentLoading": true,
     "debugDimensions": null,
-    "turboLoadingNodes": []
+    "turboLoadingNodes": [],
+    "cloneServers": {}
+
 }
 ```
 
@@ -174,7 +176,8 @@ curl 
"http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/config"; \
   "replicateAfterLoadTimeout": false,
   "maxNonPrimaryReplicantsToLoad": 2147483647,
   "useRoundRobinSegmentAssignment": true,
-  "turboLoadingNodes": []
+  "turboLoadingNodes": [],
+  "cloneServers": {}
 }'
 ```
 
@@ -206,7 +209,8 @@ Content-Length: 683
   "replicateAfterLoadTimeout": false,
   "maxNonPrimaryReplicantsToLoad": 2147483647,
   "useRoundRobinSegmentAssignment": true,
-  "turboLoadingNodes": []
+  "turboLoadingNodes": [],
+  "cloneServers": {}
 }
 ```
 
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 91168d9471a..328ab350825 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -954,6 +954,7 @@ The following table shows the dynamic configuration 
properties for the Coordinat
 |`pauseCoordination`|Boolean flag for whether or not the Coordinator should 
execute its various duties of coordinating the cluster. Setting this to true 
essentially pauses all coordination work while allowing the API to remain up. 
Duties that are paused include all classes that implement the `CoordinatorDuty` 
interface. Such duties include: segment balancing, segment compaction, 
submitting kill tasks for unused segments (if enabled), logging of used 
segments in the cluster, marking of ne [...]
 |`replicateAfterLoadTimeout`|Boolean flag for whether or not additional 
replication is needed for segments that have failed to load due to the expiry 
of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator 
will attempt to replicate the failed segment on a different historical server. 
This helps improve the segment availability if there are a few slow Historicals 
in the cluster. However, the slow Historical may still load the segment later 
and the Coordinator may issu [...]
 |`turboLoadingNodes`| Experimental. List of Historical servers to place in 
turbo loading mode. These servers use a larger thread-pool to load segments 
faster but at the cost of query performance. For servers specified in 
`turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is 
ignored and the coordinator uses the value of the respective 
`numLoadingThreads` instead.<br/>Please use this config with caution. All 
servers should eventually be removed from this list once the se [...]
+|`cloneServers`| Experimental. Map from target Historical server to source 
Historical server which should be cloned by the target. The target Historical 
does not participate in regular segment assignment or balancing. Instead, the 
Coordinator mirrors any segment assignment made to the source Historical onto 
the target Historical, so that the target becomes an exact copy of the source. 
Segments on the target Historical do not count towards replica counts either. 
If the source disappears,  [...]
 
 ##### Smart segment loading
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index d3ee720f814..8a219484acf 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -74,6 +74,7 @@ public class CoordinatorDynamicConfig
   private final Map<Dimension, String> validDebugDimensions;
 
   private final Set<String> turboLoadingNodes;
+  private final Map<String, String> cloneServers;
 
   /**
    * Stale pending segments belonging to the data sources in this list are not 
killed by {@code
@@ -124,7 +125,8 @@ public class CoordinatorDynamicConfig
       @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean 
useRoundRobinSegmentAssignment,
       @JsonProperty("smartSegmentLoading") @Nullable Boolean 
smartSegmentLoading,
       @JsonProperty("debugDimensions") @Nullable Map<String, String> 
debugDimensions,
-      @JsonProperty("turboLoadingNodes") @Nullable Set<String> 
turboLoadingNodes
+      @JsonProperty("turboLoadingNodes") @Nullable Set<String> 
turboLoadingNodes,
+      @JsonProperty("cloneServers") @Nullable Map<String, String> cloneServers
   )
   {
     this.markSegmentAsUnusedDelayMillis =
@@ -169,6 +171,7 @@ public class CoordinatorDynamicConfig
     this.debugDimensions = debugDimensions;
     this.validDebugDimensions = validateDebugDimensions(debugDimensions);
     this.turboLoadingNodes = Configs.valueOrDefault(turboLoadingNodes, 
Set.of());
+    this.cloneServers = Configs.valueOrDefault(cloneServers, Map.of());
   }
 
   private Map<Dimension, String> validateDebugDimensions(Map<String, String> 
debugDimensions)
@@ -322,6 +325,19 @@ public class CoordinatorDynamicConfig
     return replicateAfterLoadTimeout;
   }
 
+  /**
+   * Map from target Historical server to source Historical server which 
should be cloned by the target. The target
+   * Historical does not participate in regular segment assignment or 
balancing. Instead, the Coordinator mirrors any
+   * segment assignment made to the source Historical onto the target 
Historical, so that the target becomes an exact
+   * copy of the source. Segments on the target Historical do not count 
towards replica counts either. If the source
+   * disappears, the target remains in the last known state of the source 
server until removed from the cloneServers.
+   */
+  @JsonProperty
+  public Map<String, String> getCloneServers()
+  {
+    return cloneServers;
+  }
+
   /**
    * List of servers to put in turbo-loading mode. These servers will use a 
larger thread pool to load
    * segments. This causes decreases the average time taken to load segments. 
However, this also means less resources
@@ -464,6 +480,7 @@ public class CoordinatorDynamicConfig
     private Boolean useRoundRobinSegmentAssignment;
     private Boolean smartSegmentLoading;
     private Set<String> turboLoadingNodes;
+    private Map<String, String> cloneServers;
 
     public Builder()
     {
@@ -487,7 +504,8 @@ public class CoordinatorDynamicConfig
         @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean 
useRoundRobinSegmentAssignment,
         @JsonProperty("smartSegmentLoading") @Nullable Boolean 
smartSegmentLoading,
         @JsonProperty("debugDimensions") @Nullable Map<String, String> 
debugDimensions,
-        @JsonProperty("turboLoadingNodes") @Nullable Set<String> 
turboLoadingNodes
+        @JsonProperty("turboLoadingNodes") @Nullable Set<String> 
turboLoadingNodes,
+        @JsonProperty("cloneServers") @Nullable Map<String, String> 
cloneServers
     )
     {
       this.markSegmentAsUnusedDelayMillis = markSegmentAsUnusedDelayMillis;
@@ -507,6 +525,7 @@ public class CoordinatorDynamicConfig
       this.smartSegmentLoading = smartSegmentLoading;
       this.debugDimensions = debugDimensions;
       this.turboLoadingNodes = turboLoadingNodes;
+      this.cloneServers = cloneServers;
     }
 
     public Builder withMarkSegmentAsUnusedDelayMillis(long leadingTimeMillis)
@@ -599,6 +618,12 @@ public class CoordinatorDynamicConfig
       return this;
     }
 
+    public Builder withCloneServers(Map<String, String> cloneServers)
+    {
+      this.cloneServers = cloneServers;
+      return this;
+    }
+
     /**
      * Builds a CoordinatoryDynamicConfig using either the configured values, 
or
      * the default value if not configured.
@@ -625,7 +650,8 @@ public class CoordinatorDynamicConfig
           valueOrDefault(useRoundRobinSegmentAssignment, 
Defaults.USE_ROUND_ROBIN_ASSIGNMENT),
           valueOrDefault(smartSegmentLoading, Defaults.SMART_SEGMENT_LOADING),
           debugDimensions,
-          turboLoadingNodes
+          turboLoadingNodes,
+          cloneServers
       );
     }
 
@@ -656,7 +682,8 @@ public class CoordinatorDynamicConfig
           valueOrDefault(useRoundRobinSegmentAssignment, 
defaults.isUseRoundRobinSegmentAssignment()),
           valueOrDefault(smartSegmentLoading, 
defaults.isSmartSegmentLoading()),
           valueOrDefault(debugDimensions, defaults.getDebugDimensions()),
-          valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes())
+          valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes()),
+          valueOrDefault(cloneServers, defaults.getCloneServers())
       );
     }
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
index da0b8e8b2e7..5e3bcfd31de 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Contains a representation of the current state of the cluster by tier.
@@ -44,8 +45,9 @@ public class DruidCluster
 
   private final Set<ServerHolder> realtimes;
   private final Map<String, NavigableSet<ServerHolder>> historicals;
+  private final Map<String, NavigableSet<ServerHolder>> managedHistoricals;
   private final Set<ServerHolder> brokers;
-  private final List<ServerHolder> allServers;
+  private final List<ServerHolder> allManagedServers;
 
   private DruidCluster(
       Set<ServerHolder> realtimes,
@@ -58,8 +60,18 @@ public class DruidCluster
         historicals,
         holders -> CollectionUtils.newTreeSet(Comparator.naturalOrder(), 
holders)
     );
+    this.managedHistoricals = CollectionUtils.mapValues(
+        historicals,
+        holders -> {
+          List<ServerHolder> managedServers = holders.stream()
+                                                     .filter(serverHolder -> 
!serverHolder.isUnmanaged())
+                                                     
.collect(Collectors.toList());
+
+          return CollectionUtils.newTreeSet(Comparator.naturalOrder(), 
managedServers);
+        }
+    );
     this.brokers = Collections.unmodifiableSet(brokers);
-    this.allServers = initAllServers();
+    this.allManagedServers = initAllManagedServers();
   }
 
   public Set<ServerHolder> getRealtimes()
@@ -67,11 +79,23 @@ public class DruidCluster
     return realtimes;
   }
 
+  /**
+   * Return all historicals.
+   */
   public Map<String, NavigableSet<ServerHolder>> getHistoricals()
   {
     return historicals;
   }
 
+  /**
+   * Returns all managed historicals. Managed historicals are historicals 
which can participate in segment assignment,
+   * drop or balancing.
+   */
+  public Map<String, NavigableSet<ServerHolder>> getManagedHistoricals()
+  {
+    return managedHistoricals;
+  }
+
   public Set<ServerHolder> getBrokers()
   {
     return brokers;
@@ -82,26 +106,26 @@ public class DruidCluster
     return historicals.keySet();
   }
 
-  public NavigableSet<ServerHolder> getHistoricalsByTier(String tier)
+  public NavigableSet<ServerHolder> getManagedHistoricalsByTier(String tier)
   {
-    return historicals.get(tier);
+    return managedHistoricals.get(tier);
   }
 
-  public List<ServerHolder> getAllServers()
+  public List<ServerHolder> getAllManagedServers()
   {
-    return allServers;
+    return allManagedServers;
   }
 
-  private List<ServerHolder> initAllServers()
+  private List<ServerHolder> initAllManagedServers()
   {
-    final int historicalSize = 
historicals.values().stream().mapToInt(Collection::size).sum();
+    final int historicalSize = 
managedHistoricals.values().stream().mapToInt(Collection::size).sum();
     final int realtimeSize = realtimes.size();
-    final List<ServerHolder> allServers = new ArrayList<>(historicalSize + 
realtimeSize);
+    final List<ServerHolder> allManagedServers = new 
ArrayList<>(historicalSize + realtimeSize);
 
-    historicals.values().forEach(allServers::addAll);
-    allServers.addAll(brokers);
-    allServers.addAll(realtimes);
-    return allServers;
+    managedHistoricals.values().forEach(allManagedServers::addAll);
+    allManagedServers.addAll(brokers);
+    allManagedServers.addAll(realtimes);
+    return allManagedServers;
   }
 
   public boolean isEmpty()
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 61b56e58229..8174d829df1 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -63,6 +63,7 @@ import 
org.apache.druid.server.coordinator.config.CoordinatorKillConfigs;
 import org.apache.druid.server.coordinator.config.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.config.KillUnusedSegmentsConfig;
 import org.apache.druid.server.coordinator.duty.BalanceSegments;
+import org.apache.druid.server.coordinator.duty.CloneHistoricals;
 import org.apache.druid.server.coordinator.duty.CompactSegments;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
@@ -558,6 +559,7 @@ public class DruidCoordinator
         new MarkOvershadowedSegmentsAsUnused(deleteSegments),
         new MarkEternityTombstonesAsUnused(deleteSegments),
         new BalanceSegments(config.getCoordinatorPeriod()),
+        new CloneHistoricals(loadQueueManager),
         new CollectLoadQueueStats()
     );
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java 
b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
index 5de1bd5ee06..b4a924dbf2c 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
@@ -32,9 +32,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -57,6 +59,7 @@ public class ServerHolder implements Comparable<ServerHolder>
   private final ImmutableDruidServer server;
   private final LoadQueuePeon peon;
   private final boolean isDecommissioning;
+  private final boolean isUnmanaged;
   private final int maxAssignmentsInRun;
   private final int maxLifetimeInQueue;
 
@@ -73,7 +76,7 @@ public class ServerHolder implements Comparable<ServerHolder>
    */
   private final Map<DataSegment, SegmentAction> queuedSegments = new 
HashMap<>();
 
-  private final SegmentCountsPerInterval projectedSegments = new 
SegmentCountsPerInterval();
+  private final SegmentCountsPerInterval projectedSegmentCounts = new 
SegmentCountsPerInterval();
 
   public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon)
   {
@@ -85,12 +88,25 @@ public class ServerHolder implements 
Comparable<ServerHolder>
     this(server, peon, isDecommissioning, 0, 1);
   }
 
+  public ServerHolder(
+      ImmutableDruidServer server,
+      LoadQueuePeon peon,
+      boolean isDecommissioning,
+      int maxSegmentsInLoadQueue,
+      int maxLifetimeInQueue
+  )
+  {
+    this(server, peon, isDecommissioning, false, maxSegmentsInLoadQueue, 
maxLifetimeInQueue);
+  }
+
   /**
    * Creates a new ServerHolder valid for a single coordinator run.
    *
    * @param server                 Underlying Druid server
    * @param peon                   Load queue peon for this server
    * @param isDecommissioning      Whether the server is decommissioning
+   * @param isUnmanaged            Whether this server is unmanaged and should 
not participate in segment assignment,
+   *                               drop or balancing.
    * @param maxSegmentsInLoadQueue Max number of segments that can be present 
in
    *                               the load queue at any point. If this is 0, 
the
    *                               load queue can have an unlimited number of 
segments.
@@ -101,6 +117,7 @@ public class ServerHolder implements 
Comparable<ServerHolder>
       ImmutableDruidServer server,
       LoadQueuePeon peon,
       boolean isDecommissioning,
+      boolean isUnmanaged,
       int maxSegmentsInLoadQueue,
       int maxLifetimeInQueue
   )
@@ -108,6 +125,7 @@ public class ServerHolder implements 
Comparable<ServerHolder>
     this.server = server;
     this.peon = peon;
     this.isDecommissioning = isDecommissioning;
+    this.isUnmanaged = isUnmanaged;
 
     this.maxAssignmentsInRun = maxSegmentsInLoadQueue == 0
                                ? Integer.MAX_VALUE
@@ -128,7 +146,7 @@ public class ServerHolder implements 
Comparable<ServerHolder>
   )
   {
     for (DataSegment segment : server.iterateAllSegments()) {
-      projectedSegments.addSegment(segment);
+      projectedSegmentCounts.addSegment(segment);
     }
 
     final List<SegmentHolder> expiredSegments = new ArrayList<>();
@@ -213,6 +231,14 @@ public class ServerHolder implements 
Comparable<ServerHolder>
     return isDecommissioning;
   }
 
+  /**
+   * Returns true if this server is unmanaged and should not participate in 
segment assignment, drop or balancing.
+   */
+  public boolean isUnmanaged()
+  {
+    return isUnmanaged;
+  }
+
   public boolean isLoadQueueFull()
   {
     return totalAssignmentsInRun >= maxAssignmentsInRun;
@@ -264,11 +290,27 @@ public class ServerHolder implements 
Comparable<ServerHolder>
   }
 
   /**
-   * Segments that are expected to be loaded on this server once all the
+   * Counts for segments that are expected to be loaded on this server once 
all the
    * operations in progress have completed.
    */
-  public SegmentCountsPerInterval getProjectedSegments()
+  public SegmentCountsPerInterval getProjectedSegmentCounts()
   {
+    return projectedSegmentCounts;
+  }
+
+  /**
+   * Segments that are expected to be loaded on this server once all the 
operations in progress have completed.
+   */
+  public Set<DataSegment> getProjectedSegments()
+  {
+    final Set<DataSegment> projectedSegments = new 
HashSet<>(getServedSegments());
+    queuedSegments.forEach((segment, action) -> {
+      if (action.isLoad()) {
+        projectedSegments.add(segment);
+      } else {
+        projectedSegments.remove(segment);
+      }
+    });
     return projectedSegments;
   }
 
@@ -393,10 +435,10 @@ public class ServerHolder implements 
Comparable<ServerHolder>
 
     // Add to projected if load is started, remove from projected if drop has 
started
     if (action.isLoad()) {
-      projectedSegments.addSegment(segment);
+      projectedSegmentCounts.addSegment(segment);
       sizeOfLoadingSegments += segment.getSize();
     } else {
-      projectedSegments.removeSegment(segment);
+      projectedSegmentCounts.removeSegment(segment);
       if (action == SegmentAction.DROP) {
         sizeOfDroppingSegments += segment.getSize();
       }
@@ -410,10 +452,10 @@ public class ServerHolder implements 
Comparable<ServerHolder>
     queuedSegments.remove(segment);
 
     if (action.isLoad()) {
-      projectedSegments.removeSegment(segment);
+      projectedSegmentCounts.removeSegment(segment);
       sizeOfLoadingSegments -= segment.getSize();
     } else {
-      projectedSegments.addSegment(segment);
+      projectedSegmentCounts.addSegment(segment);
       if (action == SegmentAction.DROP) {
         sizeOfDroppingSegments -= segment.getSize();
       }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
index 96a6ccccf5c..d9c883bf6c5 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
@@ -284,7 +284,7 @@ public class CostBalancerStrategy implements 
BalancerStrategy
     // Compute number of segments in each interval
     final Object2IntOpenHashMap<Interval> intervalToSegmentCount = new 
Object2IntOpenHashMap<>();
 
-    final SegmentCountsPerInterval projectedSegments = 
server.getProjectedSegments();
+    final SegmentCountsPerInterval projectedSegments = 
server.getProjectedSegmentCounts();
     
projectedSegments.getIntervalToTotalSegmentCount().object2IntEntrySet().forEach(entry
 -> {
       final Interval interval = entry.getKey();
       if (costComputeInterval.overlaps(interval)) {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java
index 6a1c6199911..23db0ac2295 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java
@@ -71,7 +71,7 @@ public class SegmentToMoveCalculator
   )
   {
     final int totalSegments = historicals.stream().mapToInt(
-        server -> server.getProjectedSegments().getTotalSegmentCount()
+        server -> server.getProjectedSegmentCounts().getTotalSegmentCount()
     ).sum();
 
     // Move at least some segments to ensure that the cluster is always 
balancing itself
@@ -187,8 +187,8 @@ public class SegmentToMoveCalculator
     int totalSegmentCount = 0;
     long totalUsageBytes = 0;
     for (ServerHolder server : servers) {
-      totalSegmentCount += 
server.getProjectedSegments().getTotalSegmentCount();
-      totalUsageBytes += server.getProjectedSegments().getTotalSegmentBytes();
+      totalSegmentCount += 
server.getProjectedSegmentCounts().getTotalSegmentCount();
+      totalUsageBytes += 
server.getProjectedSegmentCounts().getTotalSegmentBytes();
     }
 
     if (totalSegmentCount <= 0 || totalUsageBytes <= 0) {
@@ -209,7 +209,7 @@ public class SegmentToMoveCalculator
   {
     // Find all the datasources
     final Set<String> datasources = servers.stream().flatMap(
-        s -> 
s.getProjectedSegments().getDatasourceToTotalSegmentCount().keySet().stream()
+        s -> 
s.getProjectedSegmentCounts().getDatasourceToTotalSegmentCount().keySet().stream()
     ).collect(Collectors.toSet());
     if (datasources.isEmpty()) {
       return 0;
@@ -220,7 +220,7 @@ public class SegmentToMoveCalculator
     final Object2IntMap<String> datasourceToMinSegments = new 
Object2IntOpenHashMap<>();
     for (ServerHolder server : servers) {
       final Object2IntMap<String> datasourceToSegmentCount
-          = server.getProjectedSegments().getDatasourceToTotalSegmentCount();
+          = 
server.getProjectedSegmentCounts().getDatasourceToTotalSegmentCount();
       for (String datasource : datasources) {
         int count = datasourceToSegmentCount.getInt(datasource);
         datasourceToMaxSegments.mergeInt(datasource, count, Math::max);
@@ -243,7 +243,7 @@ public class SegmentToMoveCalculator
     int minNumSegments = Integer.MAX_VALUE;
     int maxNumSegments = 0;
     for (ServerHolder server : servers) {
-      int countForSkewedDatasource = server.getProjectedSegments()
+      int countForSkewedDatasource = server.getProjectedSegmentCounts()
                                            .getDatasourceToTotalSegmentCount()
                                            .getInt(mostUnbalancedDatasource);
 
@@ -276,7 +276,7 @@ public class SegmentToMoveCalculator
     long maxUsageBytes = 0;
     long minUsageBytes = Long.MAX_VALUE;
     for (ServerHolder server : servers) {
-      final SegmentCountsPerInterval projectedSegments = 
server.getProjectedSegments();
+      final SegmentCountsPerInterval projectedSegments = 
server.getProjectedSegmentCounts();
 
       // Track the maximum and minimum values
       long serverUsageBytes = projectedSegments.getTotalSegmentBytes();
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
index 36be8a61de9..9643a547c99 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
@@ -205,7 +205,7 @@ public class TierSegmentBalancer
       return 0;
     } else {
       final int decommSegmentsToMove = 
decommissioningServers.stream().mapToInt(
-          server -> server.getProjectedSegments().getTotalSegmentCount()
+          server -> server.getProjectedSegmentCounts().getTotalSegmentCount()
       ).sum();
       return Math.min(decommSegmentsToMove, maxSegmentsToMove);
     }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
index c27f95a002d..e3791c9dc23 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
@@ -60,7 +60,7 @@ public class BalanceSegments implements CoordinatorDuty
       return params;
     }
 
-    params.getDruidCluster().getHistoricals().forEach(
+    params.getDruidCluster().getManagedHistoricals().forEach(
         (tier, servers) -> new TierSegmentBalancer(tier, servers, 
maxSegmentsToMove, params).run()
     );
 
@@ -113,7 +113,7 @@ public class BalanceSegments implements CoordinatorDuty
     int numHistoricals = 0;
     int numSegments = 0;
 
-    for (Set<ServerHolder> historicals : cluster.getHistoricals().values()) {
+    for (Set<ServerHolder> historicals : 
cluster.getManagedHistoricals().values()) {
       for (ServerHolder historical : historicals) {
         ++numHistoricals;
         numSegments += historical.getServer().getNumSegments() + 
historical.getNumQueuedSegments();
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
new file mode 100644
index 00000000000..25534f1ddfb
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.loading.SegmentAction;
+import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Handles cloning of historicals. Given the historical to historical clone 
mappings, based on
+ * {@link CoordinatorDynamicConfig#getCloneServers()}, copies any segments 
load or unload requests from the source
+ * historical to the target historical.
+ */
+public class CloneHistoricals implements CoordinatorDuty
+{
+  private static final Logger log = new Logger(CloneHistoricals.class);
+  private final SegmentLoadQueueManager loadQueueManager;
+
+  public CloneHistoricals(SegmentLoadQueueManager loadQueueManager)
+  {
+    this.loadQueueManager = loadQueueManager;
+  }
+
+  @Override
+  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
+  {
+    final Map<String, String> cloneServers = 
params.getCoordinatorDynamicConfig().getCloneServers();
+    final CoordinatorRunStats stats = params.getCoordinatorStats();
+
+    if (cloneServers.isEmpty()) {
+      // No servers to be cloned.
+      return params;
+    }
+
+    // Create a map of host to historical.
+    final Map<String, ServerHolder> historicalMap = params.getDruidCluster()
+                                                          .getHistoricals()
+                                                          .values()
+                                                          .stream()
+                                                          
.flatMap(Collection::stream)
+                                                          
.collect(Collectors.toMap(
+                                                              serverHolder -> 
serverHolder.getServer().getHost(),
+                                                              serverHolder -> 
serverHolder
+                                                          ));
+
+    for (Map.Entry<String, String> entry : cloneServers.entrySet()) {
+      final String targetHistoricalName = entry.getKey();
+      final ServerHolder targetServer = 
historicalMap.get(targetHistoricalName);
+
+      final String sourceHistoricalName = entry.getValue();
+      final ServerHolder sourceServer = 
historicalMap.get(sourceHistoricalName);
+
+      if (sourceServer == null || targetServer == null) {
+        log.error(
+            "Could not process clone mapping[%s] as historical[%s] does not 
exist.",
+            entry,
+            (sourceServer == null ? sourceHistoricalName : 
targetHistoricalName)
+        );
+        continue;
+      }
+
+      final Set<DataSegment> sourceProjectedSegments = 
sourceServer.getProjectedSegments();
+      final Set<DataSegment> targetProjectedSegments = 
targetServer.getProjectedSegments();
+      // Load any segments missing in the clone target.
+      for (DataSegment segment : sourceProjectedSegments) {
+        if (!targetProjectedSegments.contains(segment) && 
loadQueueManager.loadSegment(segment, targetServer, SegmentAction.LOAD)) {
+          stats.add(
+              Stats.Segments.ASSIGNED_TO_CLONE,
+              RowKey.of(Dimension.SERVER, targetServer.getServer().getHost()),
+              1L
+          );
+        }
+      }
+
+      // Drop any segments missing from the clone source.
+      for (DataSegment segment : targetProjectedSegments) {
+        if (!sourceProjectedSegments.contains(segment) && 
loadQueueManager.dropSegment(segment, targetServer)) {
+          stats.add(
+              Stats.Segments.DROPPED_FROM_CLONE,
+              RowKey.of(Dimension.SERVER, targetServer.getServer().getHost()),
+              1L
+          );
+        }
+      }
+    }
+
+    return params;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java
index 596f9df7017..dba11d2c290 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java
@@ -83,7 +83,7 @@ public class MarkOvershadowedSegmentsAsUnused implements 
CoordinatorDuty
     final DruidCluster cluster = params.getDruidCluster();
     final Map<String, SegmentTimeline> timelines = new HashMap<>();
 
-    cluster.getHistoricals().values().forEach(
+    cluster.getManagedHistoricals().values().forEach(
         historicals -> historicals.forEach(
             historical -> addSegmentsFromServer(historical, timelines)
         )
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java
index a9e926ea4f7..8e99d15c07e 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java
@@ -38,6 +38,7 @@ import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -119,7 +120,7 @@ public class PrepareBalancerAndLoadQueues implements 
CoordinatorDuty
   {
     final AtomicInteger cancelledCount = new AtomicInteger(0);
     final List<ServerHolder> decommissioningServers
-        = cluster.getAllServers().stream()
+        = cluster.getAllManagedServers().stream()
                  .filter(ServerHolder::isDecommissioning)
                  .collect(Collectors.toList());
 
@@ -152,6 +153,7 @@ public class PrepareBalancerAndLoadQueues implements 
CoordinatorDuty
   )
   {
     final Set<String> decommissioningServers = 
dynamicConfig.getDecommissioningNodes();
+    final Set<String> unmanagedServers = new 
HashSet<>(dynamicConfig.getCloneServers().keySet());
     final DruidCluster.Builder cluster = DruidCluster.builder();
     for (ImmutableDruidServer server : currentServers) {
       cluster.add(
@@ -159,6 +161,7 @@ public class PrepareBalancerAndLoadQueues implements 
CoordinatorDuty
               server,
               taskMaster.getPeonForServer(server),
               decommissioningServers.contains(server.getHost()),
+              unmanagedServers.contains(server.getHost()),
               segmentLoadingConfig.getMaxSegmentsInLoadQueue(),
               segmentLoadingConfig.getMaxLifetimeInLoadQueue()
           )
@@ -173,7 +176,16 @@ public class PrepareBalancerAndLoadQueues implements 
CoordinatorDuty
       RowKey rowKey = RowKey.of(Dimension.TIER, tier);
       stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size());
 
-      long totalCapacity = 
historicals.stream().mapToLong(ServerHolder::getMaxSize).sum();
+      long totalCapacity = 0;
+      long cloneCount = 0;
+      for (ServerHolder holder : historicals) {
+        if (holder.isUnmanaged()) {
+          cloneCount += 1;
+        } else {
+          totalCapacity += holder.getMaxSize();
+        }
+      }
+      stats.add(Stats.Tier.CLONE_COUNT, rowKey, cloneCount);
       stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity);
     });
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
index 2bd9fd29548..761e3383ede 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
@@ -62,7 +62,7 @@ public class UnloadUnusedSegments implements CoordinatorDuty
       broadcastStatusByDatasource.put(broadcastDatasource, true);
     }
 
-    final List<ServerHolder> allServers = 
params.getDruidCluster().getAllServers();
+    final List<ServerHolder> allServers = 
params.getDruidCluster().getAllManagedServers();
     int numCancelledLoads = allServers.stream().mapToInt(
         server -> cancelLoadOfUnusedSegments(server, 
broadcastStatusByDatasource, params)
     ).sum();
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
index 38eeb42413e..e4fa849beb1 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
@@ -150,8 +150,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
     this.serverCapabilities = fetchSegmentLoadingCapabilities();
   }
 
-  @VisibleForTesting
-  SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()
+  private SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()
   {
     try {
       final URL segmentLoadingCapabilitiesURL = new URL(
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java
index 1f9307cae56..cc007449b57 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java
@@ -51,7 +51,7 @@ public class RoundRobinServerSelector
 
   public RoundRobinServerSelector(DruidCluster cluster)
   {
-    cluster.getHistoricals().forEach(
+    cluster.getManagedHistoricals().forEach(
         (tier, servers) -> tierToServers.put(tier, new 
CircularServerList(servers))
     );
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java
index 3d3e34072fb..241759f8b95 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java
@@ -46,7 +46,7 @@ public class SegmentReplicaCountMap
 
   private void initReplicaCounts(DruidCluster cluster)
   {
-    cluster.getHistoricals().forEach(
+    cluster.getManagedHistoricals().forEach(
         (tier, historicals) -> historicals.forEach(
             serverHolder -> {
               // Add segments already loaded on this server
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
index a5e38eee7d4..654fe42b220 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
@@ -88,7 +88,7 @@ public class StrategicSegmentAssigner implements 
SegmentActionHandler
     this.useRoundRobinAssignment = 
loadingConfig.isUseRoundRobinSegmentAssignment();
     this.serverSelector = useRoundRobinAssignment ? new 
RoundRobinServerSelector(cluster) : null;
 
-    cluster.getHistoricals().forEach(
+    cluster.getManagedHistoricals().forEach(
         (tier, historicals) -> tierToHistoricalCount.put(tier, 
historicals.size())
     );
   }
@@ -275,7 +275,7 @@ public class StrategicSegmentAssigner implements 
SegmentActionHandler
     }
 
     final SegmentStatusInTier segmentStatus =
-        new SegmentStatusInTier(segment, cluster.getHistoricalsByTier(tier));
+        new SegmentStatusInTier(segment, 
cluster.getManagedHistoricalsByTier(tier));
 
     // Cancel all moves in this tier if it does not need to have replicas
     if (shouldCancelMoves) {
@@ -326,7 +326,7 @@ public class StrategicSegmentAssigner implements 
SegmentActionHandler
   public void broadcastSegment(DataSegment segment)
   {
     final Object2IntOpenHashMap<String> tierToRequiredReplicas = new 
Object2IntOpenHashMap<>();
-    for (ServerHolder server : cluster.getAllServers()) {
+    for (ServerHolder server : cluster.getAllManagedServers()) {
       // Ignore servers which are not broadcast targets
       if (!server.getServer().getType().isSegmentBroadcastTarget()) {
         continue;
@@ -577,7 +577,7 @@ public class StrategicSegmentAssigner implements 
SegmentActionHandler
   {
     final Map<String, Integer> tierToLoadingReplicaCount = new HashMap<>();
 
-    cluster.getHistoricals().forEach(
+    cluster.getManagedHistoricals().forEach(
         (tier, historicals) -> {
           int numLoadingReplicas = 
historicals.stream().mapToInt(ServerHolder::getNumLoadingReplicas).sum();
           tierToLoadingReplicaCount.put(tier, numLoadingReplicas);
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
index 0bc3b609bd3..0571245c985 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
@@ -65,6 +65,12 @@ public class Stats
     // Values computed in a run
     public static final CoordinatorStat REPLICATION_THROTTLE_LIMIT
         = CoordinatorStat.toDebugOnly("replicationThrottleLimit");
+
+    // Cloned segments in a run
+    public static final CoordinatorStat ASSIGNED_TO_CLONE
+        = CoordinatorStat.toDebugAndEmit("cloneLoad", 
"segment/clone/assigned/count");
+    public static final CoordinatorStat DROPPED_FROM_CLONE
+        = CoordinatorStat.toDebugAndEmit("cloneDrop", 
"segment/clone/dropped/count");
   }
 
   public static class SegmentQueue
@@ -98,6 +104,8 @@ public class Stats
         = CoordinatorStat.toDebugAndEmit("maxRepFactor", 
"tier/replication/factor");
     public static final CoordinatorStat HISTORICAL_COUNT
         = CoordinatorStat.toDebugAndEmit("numHistorical", 
"tier/historical/count");
+    public static final CoordinatorStat CLONE_COUNT
+        = CoordinatorStat.toDebugAndEmit("numClones", 
"tier/historical/clone/count");
   }
 
   public static class Compaction
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java
index 17a4de1d73f..d47cf2fb741 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java
@@ -98,7 +98,7 @@ public class DruidClusterTest
   }
 
   @Test
-  public void testGetAllServers()
+  public void testGetAllManagedServers()
   {
     clusterBuilder.add(NEW_REALTIME);
     clusterBuilder.add(NEW_HISTORICAL);
@@ -107,7 +107,7 @@ public class DruidClusterTest
     final Set<ServerHolder> expectedRealtimes = cluster.getRealtimes();
     final Map<String, NavigableSet<ServerHolder>> expectedHistoricals = 
cluster.getHistoricals();
 
-    final Collection<ServerHolder> allServers = cluster.getAllServers();
+    final Collection<ServerHolder> allServers = cluster.getAllManagedServers();
     Assert.assertEquals(4, allServers.size());
     Assert.assertTrue(allServers.containsAll(cluster.getRealtimes()));
     Assert.assertTrue(
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
index fba98bff3ae..cb4a8f1d7f3 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
@@ -73,10 +73,12 @@ public class HttpLoadQueuePeonTest
 
   private TestHttpClient httpClient;
   private HttpLoadQueuePeon httpLoadQueuePeon;
+  private SegmentLoadingCapabilities segmentLoadingCapabilities;
 
   @Before
   public void setUp()
   {
+    segmentLoadingCapabilities = new SegmentLoadingCapabilities(1, 3);
     httpClient = new TestHttpClient();
     httpLoadQueuePeon = new HttpLoadQueuePeon(
         "http://dummy:4000";,
@@ -90,14 +92,7 @@ public class HttpLoadQueuePeonTest
             true
         ),
         httpClient.callbackExecutor
-    )
-    {
-      @Override
-      SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()
-      {
-        return new SegmentLoadingCapabilities(1, 3);
-      }
-    };
+    );
     httpLoadQueuePeon.start();
   }
 
@@ -344,14 +339,7 @@ public class HttpLoadQueuePeonTest
             true
         ),
         httpClient.callbackExecutor
-    )
-    {
-      @Override
-      SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()
-      {
-        return new SegmentLoadingCapabilities(1, 3);
-      }
-    };
+    );
 
     Assert.assertEquals(1, 
httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.NORMAL));
     Assert.assertEquals(3, 
httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.TURBO));
@@ -362,7 +350,7 @@ public class HttpLoadQueuePeonTest
     return success -> httpClient.processedSegments.add(segment);
   }
 
-  private static class TestHttpClient implements HttpClient, 
DataSegmentChangeHandler
+  private class TestHttpClient implements HttpClient, DataSegmentChangeHandler
   {
     final BlockingExecutorService processingExecutor = new 
BlockingExecutorService("HttpLoadQueuePeonTest-%s");
     final BlockingExecutorService callbackExecutor = new 
BlockingExecutorService("HttpLoadQueuePeonTest-cb");
@@ -379,6 +367,7 @@ public class HttpLoadQueuePeonTest
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public <Intermediate, Final> ListenableFuture<Final> go(
         Request request,
         HttpResponseHandler<Intermediate, Final> httpResponseHandler,
@@ -388,7 +377,17 @@ public class HttpLoadQueuePeonTest
       HttpResponse httpResponse = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
       httpResponse.setContent(ChannelBuffers.buffer(0));
       httpResponseHandler.handleResponse(httpResponse, null);
+
       try {
+        if (request.getUrl().toString().contains("/loadCapabilities")) {
+          return (ListenableFuture<Final>) Futures.immediateFuture(
+              new ByteArrayInputStream(
+                  MAPPER.writerFor(SegmentLoadingCapabilities.class)
+                        .writeValueAsBytes(segmentLoadingCapabilities)
+              )
+          );
+        }
+
         List<DataSegmentChangeRequest> changeRequests = MAPPER.readValue(
             request.getContent().array(),
             HttpLoadQueuePeon.REQUEST_ENTITY_TYPE_REF
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
new file mode 100644
index 00000000000..60be0e1496d
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class HistoricalCloningTest extends CoordinatorSimulationBaseTest
+{
+  private static final long SIZE_1TB = 1_000_000;
+
+  private DruidServer historicalT11;
+  private DruidServer historicalT12;
+  private DruidServer historicalT13;
+
+  private final String datasource = TestDataSource.WIKI;
+
+  @Override
+  public void setUp()
+  {
+    // Setup historicals for 1 tier, size 1 TB each
+    historicalT11 = createHistorical(1, Tier.T1, SIZE_1TB);
+    historicalT12 = createHistorical(2, Tier.T1, SIZE_1TB);
+    historicalT13 = createHistorical(3, Tier.T1, SIZE_1TB);
+  }
+
+  @Test
+  public void testSimpleCloning()
+  {
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(Segments.WIKI_10X1D)
+                             .withServers(historicalT11, historicalT12)
+                             .withRules(datasource, Load.on(Tier.T1, 
1).forever())
+                             .withDynamicConfig(
+                                 CoordinatorDynamicConfig.builder()
+                                                         
.withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost()))
+                                                         
.withSmartSegmentLoading(true)
+                                                         .build()
+                             )
+                             .withImmediateSegmentLoading(true)
+                             .build();
+
+    startSimulation(sim);
+    runCoordinatorCycle();
+
+    verifyValue(Metric.ASSIGNED_COUNT, 10L);
+    verifyValue(
+        Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(),
+        Map.of("server", historicalT12.getName()),
+        10L
+    );
+    verifyValue(
+        Metric.SUCCESS_ACTIONS,
+        Map.of("server", historicalT11.getName(), "description", "LOAD: 
NORMAL"),
+        10L
+    );
+    verifyValue(
+        Metric.SUCCESS_ACTIONS,
+        Map.of("server", historicalT12.getName(), "description", "LOAD: 
NORMAL"),
+        10L
+    );
+
+    Assert.assertEquals(Segments.WIKI_10X1D.size(), 
historicalT11.getTotalSegments());
+    Assert.assertEquals(Segments.WIKI_10X1D.size(), 
historicalT12.getTotalSegments());
+    Segments.WIKI_10X1D.forEach(segment -> {
+      Assert.assertEquals(segment, historicalT11.getSegment(segment.getId()));
+      Assert.assertEquals(segment, historicalT12.getSegment(segment.getId()));
+    });
+  }
+
+  @Test
+  public void testAddingNewHistorical()
+  {
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(Segments.WIKI_10X1D)
+                             .withServers(historicalT11, historicalT12)
+                             .withRules(datasource, Load.on(Tier.T1, 
1).forever())
+                             .withDynamicConfig(
+                                 CoordinatorDynamicConfig.builder()
+                                                         
.withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost()))
+                                                         
.withSmartSegmentLoading(true)
+                                                         .build()
+                             )
+                             .withImmediateSegmentLoading(true)
+                             .build();
+
+    // Run 1: Current state is a historical and clone already in sync.
+    Segments.WIKI_10X1D.forEach(segment -> {
+      historicalT11.addDataSegment(segment);
+      historicalT12.addDataSegment(segment);
+    });
+
+    startSimulation(sim);
+
+    runCoordinatorCycle();
+
+    // Confirm number of segments.
+    Assert.assertEquals(10, historicalT11.getTotalSegments());
+    Assert.assertEquals(10, historicalT12.getTotalSegments());
+
+    // Add a new historical.
+    final DruidServer newHistorical = createHistorical(3, Tier.T1, 10_000);
+    addServer(newHistorical);
+
+    // Run 2: Let the coordinator balance segments.
+    runCoordinatorCycle();
+
+    // Check that segments have been distributed to the new historical and 
have also been dropped by the clone
+    Assert.assertEquals(5, historicalT11.getTotalSegments());
+    Assert.assertEquals(5, historicalT12.getTotalSegments());
+    Assert.assertEquals(5, newHistorical.getTotalSegments());
+    verifyValue(
+        Stats.Segments.DROPPED_FROM_CLONE.getMetricName(),
+        Map.of("server", historicalT12.getName()),
+        5L
+    );
+  }
+
+  @Test
+  public void testCloningServerDisappearsAndRelaunched()
+  {
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(Segments.WIKI_10X1D)
+                             .withServers(historicalT11, historicalT12)
+                             .withRules(datasource, Load.on(Tier.T1, 
2).forever())
+                             .withDynamicConfig(
+                                 CoordinatorDynamicConfig.builder()
+                                                         
.withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost()))
+                                                         
.withSmartSegmentLoading(true)
+                                                         .build()
+                             )
+                             .withImmediateSegmentLoading(true)
+                             .build();
+
+    startSimulation(sim);
+
+    // Run 1: All segments are loaded.
+    runCoordinatorCycle();
+    Assert.assertEquals(10, historicalT11.getTotalSegments());
+    Assert.assertEquals(10, historicalT12.getTotalSegments());
+
+    // Target server disappears, loses loaded segments.
+    removeServer(historicalT12);
+    Segments.WIKI_10X1D.forEach(segment -> 
historicalT12.removeDataSegment(segment.getId()));
+
+    // Run 2: No change in source historical.
+    runCoordinatorCycle();
+
+    Assert.assertEquals(10, historicalT11.getTotalSegments());
+    Assert.assertEquals(0, historicalT12.getTotalSegments());
+
+    // Server readded
+    addServer(historicalT12);
+
+    // Run 3: Segments recloned.
+    runCoordinatorCycle();
+
+    Assert.assertEquals(10, historicalT11.getTotalSegments());
+    Assert.assertEquals(10, historicalT12.getTotalSegments());
+    verifyValue(
+        Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(),
+        Map.of("server", historicalT12.getName()),
+        10L
+    );
+    verifyValue(
+        Metric.SUCCESS_ACTIONS,
+        Map.of("server", historicalT12.getName(), "description", "LOAD: 
NORMAL"),
+        10L
+    );
+
+    Assert.assertEquals(Segments.WIKI_10X1D.size(), 
historicalT11.getTotalSegments());
+    Assert.assertEquals(Segments.WIKI_10X1D.size(), 
historicalT12.getTotalSegments());
+    Segments.WIKI_10X1D.forEach(segment -> {
+      Assert.assertEquals(segment, historicalT11.getSegment(segment.getId()));
+      Assert.assertEquals(segment, historicalT12.getSegment(segment.getId()));
+    });
+  }
+
+  @Test
+  public void testClonedServerDoesNotFollowReplicationLimit()
+  {
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(Segments.WIKI_10X100D)
+                             .withServers(historicalT11)
+                             .withRules(datasource, Load.on(Tier.T1, 
1).forever())
+                             .withDynamicConfig(
+                                 CoordinatorDynamicConfig.builder()
+                                                         
.withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost()))
+                                                         
.withSmartSegmentLoading(true)
+                                                         
.withReplicationThrottleLimit(2)
+                                                         .build()
+                             )
+                             .withImmediateSegmentLoading(true)
+                             .build();
+
+    Segments.WIKI_10X100D.forEach(segment -> 
historicalT11.addDataSegment(segment));
+    startSimulation(sim);
+
+    // Run 1: All segments are loaded on the source historical
+    runCoordinatorCycle();
+    Assert.assertEquals(1000, historicalT11.getTotalSegments());
+    Assert.assertEquals(0, historicalT12.getTotalSegments());
+
+    // Clone server now added.
+    addServer(historicalT12);
+
+    // Run 2: Assigns all segments to the cloned historical
+    runCoordinatorCycle();
+
+    Assert.assertEquals(1000, historicalT11.getTotalSegments());
+    Assert.assertEquals(1000, historicalT12.getTotalSegments());
+
+    verifyValue(
+        Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(),
+        Map.of("server", historicalT12.getName()),
+        1000L
+    );
+
+    verifyValue(
+        Metric.SUCCESS_ACTIONS,
+        Map.of("server", historicalT12.getName(), "description", "LOAD: 
NORMAL"),
+        1000L
+    );
+  }
+
+  @Test
+  public void testCloningHistoricalWithReplicationLimit()
+  {
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(Segments.WIKI_10X1D)
+                             .withServers(historicalT11, historicalT12, 
historicalT13)
+                             .withRules(datasource, Load.on(Tier.T1, 
2).forever())
+                             .withImmediateSegmentLoading(true)
+                             .withDynamicConfig(
+                                 CoordinatorDynamicConfig.builder()
+                                                         
.withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost()))
+                                                         
.withSmartSegmentLoading(false)
+                                                         
.withReplicationThrottleLimit(2)
+                                                         
.withMaxSegmentsToMove(0)
+                                                         .build()
+                             )
+                             .withImmediateSegmentLoading(true)
+                             .build();
+    Segments.WIKI_10X1D.forEach(historicalT13::addDataSegment);
+    startSimulation(sim);
+
+    // Check that only replication count segments are loaded each run and that 
the cloning server copies it.
+    while (historicalT11.getTotalSegments() < Segments.WIKI_10X1D.size()) {
+      runCoordinatorCycle();
+
+      // Check that all segments are cloned.
+      Assert.assertEquals(historicalT11.getTotalSegments(), 
historicalT12.getTotalSegments());
+
+      // Check that the replication throttling is respected.
+      verifyValue(Metric.ASSIGNED_COUNT, 2L);
+      verifyValue(
+          Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(),
+          Map.of("server", historicalT12.getName()),
+          2L
+      );
+    }
+
+    Assert.assertEquals(Segments.WIKI_10X1D.size(), 
historicalT11.getTotalSegments());
+    Assert.assertEquals(Segments.WIKI_10X1D.size(), 
historicalT12.getTotalSegments());
+    Assert.assertEquals(Segments.WIKI_10X1D.size(), 
historicalT13.getTotalSegments());
+    Segments.WIKI_10X1D.forEach(segment -> {
+      Assert.assertEquals(segment, historicalT11.getSegment(segment.getId()));
+      Assert.assertEquals(segment, historicalT12.getSegment(segment.getId()));
+      Assert.assertEquals(segment, historicalT13.getSegment(segment.getId()));
+    });
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index 1be987ddc27..98324792ef7 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -244,7 +244,8 @@ public class CoordinatorDynamicConfigTest
         false,
         false,
         null,
-        ImmutableSet.of("host1")
+        ImmutableSet.of("host1"),
+        null
     );
     
Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty());
   }
@@ -269,7 +270,8 @@ public class CoordinatorDynamicConfigTest
         false,
         false,
         null,
-        ImmutableSet.of("host1")
+        ImmutableSet.of("host1"),
+        null
     );
     Assert.assertEquals(ImmutableSet.of("test1"), 
config.getSpecificDataSourcesToKillUnusedSegmentsIn());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to