This is an automated email from the ASF dual-hosted git repository. kfaraz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new cefac4765cb Add coordinator dynamic config `cloneServers` to clone an existing historical (#17863) cefac4765cb is described below commit cefac4765cb8ab3fda260ac8c433adc62ca639cb Author: Adarsh Sanjeev <adarshsanj...@gmail.com> AuthorDate: Wed Apr 9 09:22:07 2025 +0530 Add coordinator dynamic config `cloneServers` to clone an existing historical (#17863) Changes --------- - Add coordinator dynamic config `cloneServers`, which contains mapping from clone target to clone source. - Mark clone targets as "unmanaged" and exclude them from segment assignment, drop and balancing - Add duty `CloneHistoricals` that ensures that clone target remains in sync with clone source - Add simulation tests for cloning --- docs/api-reference/dynamic-configuration-api.md | 10 +- docs/configuration/index.md | 1 + .../coordinator/CoordinatorDynamicConfig.java | 35 ++- .../druid/server/coordinator/DruidCluster.java | 50 +++- .../druid/server/coordinator/DruidCoordinator.java | 2 + .../druid/server/coordinator/ServerHolder.java | 58 +++- .../coordinator/balancer/CostBalancerStrategy.java | 2 +- .../balancer/SegmentToMoveCalculator.java | 14 +- .../coordinator/balancer/TierSegmentBalancer.java | 2 +- .../server/coordinator/duty/BalanceSegments.java | 4 +- .../server/coordinator/duty/CloneHistoricals.java | 119 ++++++++ .../duty/MarkOvershadowedSegmentsAsUnused.java | 2 +- .../duty/PrepareBalancerAndLoadQueues.java | 16 +- .../coordinator/duty/UnloadUnusedSegments.java | 2 +- .../coordinator/loading/HttpLoadQueuePeon.java | 3 +- .../loading/RoundRobinServerSelector.java | 2 +- .../loading/SegmentReplicaCountMap.java | 2 +- .../loading/StrategicSegmentAssigner.java | 8 +- .../druid/server/coordinator/stats/Stats.java | 8 + .../druid/server/coordinator/DruidClusterTest.java | 4 +- .../coordinator/loading/HttpLoadQueuePeonTest.java | 33 ++- .../simulate/HistoricalCloningTest.java | 300 +++++++++++++++++++++ .../server/http/CoordinatorDynamicConfigTest.java | 6 +- 23 files changed, 611 insertions(+), 72 deletions(-) diff --git a/docs/api-reference/dynamic-configuration-api.md b/docs/api-reference/dynamic-configuration-api.md index 971aa81d206..cad61e4b88f 100644 --- a/docs/api-reference/dynamic-configuration-api.md +++ b/docs/api-reference/dynamic-configuration-api.md @@ -106,7 +106,9 @@ Host: http://ROUTER_IP:ROUTER_PORT "useRoundRobinSegmentAssignment": true, "smartSegmentLoading": true, "debugDimensions": null, - "turboLoadingNodes": [] + "turboLoadingNodes": [], + "cloneServers": {} + } ``` @@ -174,7 +176,8 @@ curl "http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/config" \ "replicateAfterLoadTimeout": false, "maxNonPrimaryReplicantsToLoad": 2147483647, "useRoundRobinSegmentAssignment": true, - "turboLoadingNodes": [] + "turboLoadingNodes": [], + "cloneServers": {} }' ``` @@ -206,7 +209,8 @@ Content-Length: 683 "replicateAfterLoadTimeout": false, "maxNonPrimaryReplicantsToLoad": 2147483647, "useRoundRobinSegmentAssignment": true, - "turboLoadingNodes": [] + "turboLoadingNodes": [], + "cloneServers": {} } ``` diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 91168d9471a..328ab350825 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -954,6 +954,7 @@ The following table shows the dynamic configuration properties for the Coordinat |`pauseCoordination`|Boolean flag for whether or not the Coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` interface. Such duties include: segment balancing, segment compaction, submitting kill tasks for unused segments (if enabled), logging of used segments in the cluster, marking of ne [...] |`replicateAfterLoadTimeout`|Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow Historicals in the cluster. However, the slow Historical may still load the segment later and the Coordinator may issu [...] |`turboLoadingNodes`| Experimental. List of Historical servers to place in turbo loading mode. These servers use a larger thread-pool to load segments faster but at the cost of query performance. For servers specified in `turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is ignored and the coordinator uses the value of the respective `numLoadingThreads` instead.<br/>Please use this config with caution. All servers should eventually be removed from this list once the se [...] +|`cloneServers`| Experimental. Map from target Historical server to source Historical server which should be cloned by the target. The target Historical does not participate in regular segment assignment or balancing. Instead, the Coordinator mirrors any segment assignment made to the source Historical onto the target Historical, so that the target becomes an exact copy of the source. Segments on the target Historical do not count towards replica counts either. If the source disappears, [...] ##### Smart segment loading diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index d3ee720f814..8a219484acf 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -74,6 +74,7 @@ public class CoordinatorDynamicConfig private final Map<Dimension, String> validDebugDimensions; private final Set<String> turboLoadingNodes; + private final Map<String, String> cloneServers; /** * Stale pending segments belonging to the data sources in this list are not killed by {@code @@ -124,7 +125,8 @@ public class CoordinatorDynamicConfig @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment, @JsonProperty("smartSegmentLoading") @Nullable Boolean smartSegmentLoading, @JsonProperty("debugDimensions") @Nullable Map<String, String> debugDimensions, - @JsonProperty("turboLoadingNodes") @Nullable Set<String> turboLoadingNodes + @JsonProperty("turboLoadingNodes") @Nullable Set<String> turboLoadingNodes, + @JsonProperty("cloneServers") @Nullable Map<String, String> cloneServers ) { this.markSegmentAsUnusedDelayMillis = @@ -169,6 +171,7 @@ public class CoordinatorDynamicConfig this.debugDimensions = debugDimensions; this.validDebugDimensions = validateDebugDimensions(debugDimensions); this.turboLoadingNodes = Configs.valueOrDefault(turboLoadingNodes, Set.of()); + this.cloneServers = Configs.valueOrDefault(cloneServers, Map.of()); } private Map<Dimension, String> validateDebugDimensions(Map<String, String> debugDimensions) @@ -322,6 +325,19 @@ public class CoordinatorDynamicConfig return replicateAfterLoadTimeout; } + /** + * Map from target Historical server to source Historical server which should be cloned by the target. The target + * Historical does not participate in regular segment assignment or balancing. Instead, the Coordinator mirrors any + * segment assignment made to the source Historical onto the target Historical, so that the target becomes an exact + * copy of the source. Segments on the target Historical do not count towards replica counts either. If the source + * disappears, the target remains in the last known state of the source server until removed from the cloneServers. + */ + @JsonProperty + public Map<String, String> getCloneServers() + { + return cloneServers; + } + /** * List of servers to put in turbo-loading mode. These servers will use a larger thread pool to load * segments. This causes decreases the average time taken to load segments. However, this also means less resources @@ -464,6 +480,7 @@ public class CoordinatorDynamicConfig private Boolean useRoundRobinSegmentAssignment; private Boolean smartSegmentLoading; private Set<String> turboLoadingNodes; + private Map<String, String> cloneServers; public Builder() { @@ -487,7 +504,8 @@ public class CoordinatorDynamicConfig @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment, @JsonProperty("smartSegmentLoading") @Nullable Boolean smartSegmentLoading, @JsonProperty("debugDimensions") @Nullable Map<String, String> debugDimensions, - @JsonProperty("turboLoadingNodes") @Nullable Set<String> turboLoadingNodes + @JsonProperty("turboLoadingNodes") @Nullable Set<String> turboLoadingNodes, + @JsonProperty("cloneServers") @Nullable Map<String, String> cloneServers ) { this.markSegmentAsUnusedDelayMillis = markSegmentAsUnusedDelayMillis; @@ -507,6 +525,7 @@ public class CoordinatorDynamicConfig this.smartSegmentLoading = smartSegmentLoading; this.debugDimensions = debugDimensions; this.turboLoadingNodes = turboLoadingNodes; + this.cloneServers = cloneServers; } public Builder withMarkSegmentAsUnusedDelayMillis(long leadingTimeMillis) @@ -599,6 +618,12 @@ public class CoordinatorDynamicConfig return this; } + public Builder withCloneServers(Map<String, String> cloneServers) + { + this.cloneServers = cloneServers; + return this; + } + /** * Builds a CoordinatoryDynamicConfig using either the configured values, or * the default value if not configured. @@ -625,7 +650,8 @@ public class CoordinatorDynamicConfig valueOrDefault(useRoundRobinSegmentAssignment, Defaults.USE_ROUND_ROBIN_ASSIGNMENT), valueOrDefault(smartSegmentLoading, Defaults.SMART_SEGMENT_LOADING), debugDimensions, - turboLoadingNodes + turboLoadingNodes, + cloneServers ); } @@ -656,7 +682,8 @@ public class CoordinatorDynamicConfig valueOrDefault(useRoundRobinSegmentAssignment, defaults.isUseRoundRobinSegmentAssignment()), valueOrDefault(smartSegmentLoading, defaults.isSmartSegmentLoading()), valueOrDefault(debugDimensions, defaults.getDebugDimensions()), - valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes()) + valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes()), + valueOrDefault(cloneServers, defaults.getCloneServers()) ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java index da0b8e8b2e7..5e3bcfd31de 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Set; +import java.util.stream.Collectors; /** * Contains a representation of the current state of the cluster by tier. @@ -44,8 +45,9 @@ public class DruidCluster private final Set<ServerHolder> realtimes; private final Map<String, NavigableSet<ServerHolder>> historicals; + private final Map<String, NavigableSet<ServerHolder>> managedHistoricals; private final Set<ServerHolder> brokers; - private final List<ServerHolder> allServers; + private final List<ServerHolder> allManagedServers; private DruidCluster( Set<ServerHolder> realtimes, @@ -58,8 +60,18 @@ public class DruidCluster historicals, holders -> CollectionUtils.newTreeSet(Comparator.naturalOrder(), holders) ); + this.managedHistoricals = CollectionUtils.mapValues( + historicals, + holders -> { + List<ServerHolder> managedServers = holders.stream() + .filter(serverHolder -> !serverHolder.isUnmanaged()) + .collect(Collectors.toList()); + + return CollectionUtils.newTreeSet(Comparator.naturalOrder(), managedServers); + } + ); this.brokers = Collections.unmodifiableSet(brokers); - this.allServers = initAllServers(); + this.allManagedServers = initAllManagedServers(); } public Set<ServerHolder> getRealtimes() @@ -67,11 +79,23 @@ public class DruidCluster return realtimes; } + /** + * Return all historicals. + */ public Map<String, NavigableSet<ServerHolder>> getHistoricals() { return historicals; } + /** + * Returns all managed historicals. Managed historicals are historicals which can participate in segment assignment, + * drop or balancing. + */ + public Map<String, NavigableSet<ServerHolder>> getManagedHistoricals() + { + return managedHistoricals; + } + public Set<ServerHolder> getBrokers() { return brokers; @@ -82,26 +106,26 @@ public class DruidCluster return historicals.keySet(); } - public NavigableSet<ServerHolder> getHistoricalsByTier(String tier) + public NavigableSet<ServerHolder> getManagedHistoricalsByTier(String tier) { - return historicals.get(tier); + return managedHistoricals.get(tier); } - public List<ServerHolder> getAllServers() + public List<ServerHolder> getAllManagedServers() { - return allServers; + return allManagedServers; } - private List<ServerHolder> initAllServers() + private List<ServerHolder> initAllManagedServers() { - final int historicalSize = historicals.values().stream().mapToInt(Collection::size).sum(); + final int historicalSize = managedHistoricals.values().stream().mapToInt(Collection::size).sum(); final int realtimeSize = realtimes.size(); - final List<ServerHolder> allServers = new ArrayList<>(historicalSize + realtimeSize); + final List<ServerHolder> allManagedServers = new ArrayList<>(historicalSize + realtimeSize); - historicals.values().forEach(allServers::addAll); - allServers.addAll(brokers); - allServers.addAll(realtimes); - return allServers; + managedHistoricals.values().forEach(allManagedServers::addAll); + allManagedServers.addAll(brokers); + allManagedServers.addAll(realtimes); + return allManagedServers; } public boolean isEmpty() diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 61b56e58229..8174d829df1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -63,6 +63,7 @@ import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs; import org.apache.druid.server.coordinator.config.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.config.KillUnusedSegmentsConfig; import org.apache.druid.server.coordinator.duty.BalanceSegments; +import org.apache.druid.server.coordinator.duty.CloneHistoricals; import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups; @@ -558,6 +559,7 @@ public class DruidCoordinator new MarkOvershadowedSegmentsAsUnused(deleteSegments), new MarkEternityTombstonesAsUnused(deleteSegments), new BalanceSegments(config.getCoordinatorPeriod()), + new CloneHistoricals(loadQueueManager), new CollectLoadQueueStats() ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java index 5de1bd5ee06..b4a924dbf2c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java @@ -32,9 +32,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; /** @@ -57,6 +59,7 @@ public class ServerHolder implements Comparable<ServerHolder> private final ImmutableDruidServer server; private final LoadQueuePeon peon; private final boolean isDecommissioning; + private final boolean isUnmanaged; private final int maxAssignmentsInRun; private final int maxLifetimeInQueue; @@ -73,7 +76,7 @@ public class ServerHolder implements Comparable<ServerHolder> */ private final Map<DataSegment, SegmentAction> queuedSegments = new HashMap<>(); - private final SegmentCountsPerInterval projectedSegments = new SegmentCountsPerInterval(); + private final SegmentCountsPerInterval projectedSegmentCounts = new SegmentCountsPerInterval(); public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon) { @@ -85,12 +88,25 @@ public class ServerHolder implements Comparable<ServerHolder> this(server, peon, isDecommissioning, 0, 1); } + public ServerHolder( + ImmutableDruidServer server, + LoadQueuePeon peon, + boolean isDecommissioning, + int maxSegmentsInLoadQueue, + int maxLifetimeInQueue + ) + { + this(server, peon, isDecommissioning, false, maxSegmentsInLoadQueue, maxLifetimeInQueue); + } + /** * Creates a new ServerHolder valid for a single coordinator run. * * @param server Underlying Druid server * @param peon Load queue peon for this server * @param isDecommissioning Whether the server is decommissioning + * @param isUnmanaged Whether this server is unmanaged and should not participate in segment assignment, + * drop or balancing. * @param maxSegmentsInLoadQueue Max number of segments that can be present in * the load queue at any point. If this is 0, the * load queue can have an unlimited number of segments. @@ -101,6 +117,7 @@ public class ServerHolder implements Comparable<ServerHolder> ImmutableDruidServer server, LoadQueuePeon peon, boolean isDecommissioning, + boolean isUnmanaged, int maxSegmentsInLoadQueue, int maxLifetimeInQueue ) @@ -108,6 +125,7 @@ public class ServerHolder implements Comparable<ServerHolder> this.server = server; this.peon = peon; this.isDecommissioning = isDecommissioning; + this.isUnmanaged = isUnmanaged; this.maxAssignmentsInRun = maxSegmentsInLoadQueue == 0 ? Integer.MAX_VALUE @@ -128,7 +146,7 @@ public class ServerHolder implements Comparable<ServerHolder> ) { for (DataSegment segment : server.iterateAllSegments()) { - projectedSegments.addSegment(segment); + projectedSegmentCounts.addSegment(segment); } final List<SegmentHolder> expiredSegments = new ArrayList<>(); @@ -213,6 +231,14 @@ public class ServerHolder implements Comparable<ServerHolder> return isDecommissioning; } + /** + * Returns true if this server is unmanaged and should not participate in segment assignment, drop or balancing. + */ + public boolean isUnmanaged() + { + return isUnmanaged; + } + public boolean isLoadQueueFull() { return totalAssignmentsInRun >= maxAssignmentsInRun; @@ -264,11 +290,27 @@ public class ServerHolder implements Comparable<ServerHolder> } /** - * Segments that are expected to be loaded on this server once all the + * Counts for segments that are expected to be loaded on this server once all the * operations in progress have completed. */ - public SegmentCountsPerInterval getProjectedSegments() + public SegmentCountsPerInterval getProjectedSegmentCounts() { + return projectedSegmentCounts; + } + + /** + * Segments that are expected to be loaded on this server once all the operations in progress have completed. + */ + public Set<DataSegment> getProjectedSegments() + { + final Set<DataSegment> projectedSegments = new HashSet<>(getServedSegments()); + queuedSegments.forEach((segment, action) -> { + if (action.isLoad()) { + projectedSegments.add(segment); + } else { + projectedSegments.remove(segment); + } + }); return projectedSegments; } @@ -393,10 +435,10 @@ public class ServerHolder implements Comparable<ServerHolder> // Add to projected if load is started, remove from projected if drop has started if (action.isLoad()) { - projectedSegments.addSegment(segment); + projectedSegmentCounts.addSegment(segment); sizeOfLoadingSegments += segment.getSize(); } else { - projectedSegments.removeSegment(segment); + projectedSegmentCounts.removeSegment(segment); if (action == SegmentAction.DROP) { sizeOfDroppingSegments += segment.getSize(); } @@ -410,10 +452,10 @@ public class ServerHolder implements Comparable<ServerHolder> queuedSegments.remove(segment); if (action.isLoad()) { - projectedSegments.removeSegment(segment); + projectedSegmentCounts.removeSegment(segment); sizeOfLoadingSegments -= segment.getSize(); } else { - projectedSegments.addSegment(segment); + projectedSegmentCounts.addSegment(segment); if (action == SegmentAction.DROP) { sizeOfDroppingSegments -= segment.getSize(); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java index 96a6ccccf5c..d9c883bf6c5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java @@ -284,7 +284,7 @@ public class CostBalancerStrategy implements BalancerStrategy // Compute number of segments in each interval final Object2IntOpenHashMap<Interval> intervalToSegmentCount = new Object2IntOpenHashMap<>(); - final SegmentCountsPerInterval projectedSegments = server.getProjectedSegments(); + final SegmentCountsPerInterval projectedSegments = server.getProjectedSegmentCounts(); projectedSegments.getIntervalToTotalSegmentCount().object2IntEntrySet().forEach(entry -> { final Interval interval = entry.getKey(); if (costComputeInterval.overlaps(interval)) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java index 6a1c6199911..23db0ac2295 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java @@ -71,7 +71,7 @@ public class SegmentToMoveCalculator ) { final int totalSegments = historicals.stream().mapToInt( - server -> server.getProjectedSegments().getTotalSegmentCount() + server -> server.getProjectedSegmentCounts().getTotalSegmentCount() ).sum(); // Move at least some segments to ensure that the cluster is always balancing itself @@ -187,8 +187,8 @@ public class SegmentToMoveCalculator int totalSegmentCount = 0; long totalUsageBytes = 0; for (ServerHolder server : servers) { - totalSegmentCount += server.getProjectedSegments().getTotalSegmentCount(); - totalUsageBytes += server.getProjectedSegments().getTotalSegmentBytes(); + totalSegmentCount += server.getProjectedSegmentCounts().getTotalSegmentCount(); + totalUsageBytes += server.getProjectedSegmentCounts().getTotalSegmentBytes(); } if (totalSegmentCount <= 0 || totalUsageBytes <= 0) { @@ -209,7 +209,7 @@ public class SegmentToMoveCalculator { // Find all the datasources final Set<String> datasources = servers.stream().flatMap( - s -> s.getProjectedSegments().getDatasourceToTotalSegmentCount().keySet().stream() + s -> s.getProjectedSegmentCounts().getDatasourceToTotalSegmentCount().keySet().stream() ).collect(Collectors.toSet()); if (datasources.isEmpty()) { return 0; @@ -220,7 +220,7 @@ public class SegmentToMoveCalculator final Object2IntMap<String> datasourceToMinSegments = new Object2IntOpenHashMap<>(); for (ServerHolder server : servers) { final Object2IntMap<String> datasourceToSegmentCount - = server.getProjectedSegments().getDatasourceToTotalSegmentCount(); + = server.getProjectedSegmentCounts().getDatasourceToTotalSegmentCount(); for (String datasource : datasources) { int count = datasourceToSegmentCount.getInt(datasource); datasourceToMaxSegments.mergeInt(datasource, count, Math::max); @@ -243,7 +243,7 @@ public class SegmentToMoveCalculator int minNumSegments = Integer.MAX_VALUE; int maxNumSegments = 0; for (ServerHolder server : servers) { - int countForSkewedDatasource = server.getProjectedSegments() + int countForSkewedDatasource = server.getProjectedSegmentCounts() .getDatasourceToTotalSegmentCount() .getInt(mostUnbalancedDatasource); @@ -276,7 +276,7 @@ public class SegmentToMoveCalculator long maxUsageBytes = 0; long minUsageBytes = Long.MAX_VALUE; for (ServerHolder server : servers) { - final SegmentCountsPerInterval projectedSegments = server.getProjectedSegments(); + final SegmentCountsPerInterval projectedSegments = server.getProjectedSegmentCounts(); // Track the maximum and minimum values long serverUsageBytes = projectedSegments.getTotalSegmentBytes(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java index 36be8a61de9..9643a547c99 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java @@ -205,7 +205,7 @@ public class TierSegmentBalancer return 0; } else { final int decommSegmentsToMove = decommissioningServers.stream().mapToInt( - server -> server.getProjectedSegments().getTotalSegmentCount() + server -> server.getProjectedSegmentCounts().getTotalSegmentCount() ).sum(); return Math.min(decommSegmentsToMove, maxSegmentsToMove); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java index c27f95a002d..e3791c9dc23 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java @@ -60,7 +60,7 @@ public class BalanceSegments implements CoordinatorDuty return params; } - params.getDruidCluster().getHistoricals().forEach( + params.getDruidCluster().getManagedHistoricals().forEach( (tier, servers) -> new TierSegmentBalancer(tier, servers, maxSegmentsToMove, params).run() ); @@ -113,7 +113,7 @@ public class BalanceSegments implements CoordinatorDuty int numHistoricals = 0; int numSegments = 0; - for (Set<ServerHolder> historicals : cluster.getHistoricals().values()) { + for (Set<ServerHolder> historicals : cluster.getManagedHistoricals().values()) { for (ServerHolder historical : historicals) { ++numHistoricals; numSegments += historical.getServer().getNumSegments() + historical.getNumQueuedSegments(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java new file mode 100644 index 00000000000..25534f1ddfb --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.server.coordinator.loading.SegmentAction; +import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; +import org.apache.druid.timeline.DataSegment; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Handles cloning of historicals. Given the historical to historical clone mappings, based on + * {@link CoordinatorDynamicConfig#getCloneServers()}, copies any segments load or unload requests from the source + * historical to the target historical. + */ +public class CloneHistoricals implements CoordinatorDuty +{ + private static final Logger log = new Logger(CloneHistoricals.class); + private final SegmentLoadQueueManager loadQueueManager; + + public CloneHistoricals(SegmentLoadQueueManager loadQueueManager) + { + this.loadQueueManager = loadQueueManager; + } + + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + final Map<String, String> cloneServers = params.getCoordinatorDynamicConfig().getCloneServers(); + final CoordinatorRunStats stats = params.getCoordinatorStats(); + + if (cloneServers.isEmpty()) { + // No servers to be cloned. + return params; + } + + // Create a map of host to historical. + final Map<String, ServerHolder> historicalMap = params.getDruidCluster() + .getHistoricals() + .values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toMap( + serverHolder -> serverHolder.getServer().getHost(), + serverHolder -> serverHolder + )); + + for (Map.Entry<String, String> entry : cloneServers.entrySet()) { + final String targetHistoricalName = entry.getKey(); + final ServerHolder targetServer = historicalMap.get(targetHistoricalName); + + final String sourceHistoricalName = entry.getValue(); + final ServerHolder sourceServer = historicalMap.get(sourceHistoricalName); + + if (sourceServer == null || targetServer == null) { + log.error( + "Could not process clone mapping[%s] as historical[%s] does not exist.", + entry, + (sourceServer == null ? sourceHistoricalName : targetHistoricalName) + ); + continue; + } + + final Set<DataSegment> sourceProjectedSegments = sourceServer.getProjectedSegments(); + final Set<DataSegment> targetProjectedSegments = targetServer.getProjectedSegments(); + // Load any segments missing in the clone target. + for (DataSegment segment : sourceProjectedSegments) { + if (!targetProjectedSegments.contains(segment) && loadQueueManager.loadSegment(segment, targetServer, SegmentAction.LOAD)) { + stats.add( + Stats.Segments.ASSIGNED_TO_CLONE, + RowKey.of(Dimension.SERVER, targetServer.getServer().getHost()), + 1L + ); + } + } + + // Drop any segments missing from the clone source. + for (DataSegment segment : targetProjectedSegments) { + if (!sourceProjectedSegments.contains(segment) && loadQueueManager.dropSegment(segment, targetServer)) { + stats.add( + Stats.Segments.DROPPED_FROM_CLONE, + RowKey.of(Dimension.SERVER, targetServer.getServer().getHost()), + 1L + ); + } + } + } + + return params; + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java index 596f9df7017..dba11d2c290 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java @@ -83,7 +83,7 @@ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty final DruidCluster cluster = params.getDruidCluster(); final Map<String, SegmentTimeline> timelines = new HashMap<>(); - cluster.getHistoricals().values().forEach( + cluster.getManagedHistoricals().values().forEach( historicals -> historicals.forEach( historical -> addSegmentsFromServer(historical, timelines) ) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java index a9e926ea4f7..8e99d15c07e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java @@ -38,6 +38,7 @@ import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -119,7 +120,7 @@ public class PrepareBalancerAndLoadQueues implements CoordinatorDuty { final AtomicInteger cancelledCount = new AtomicInteger(0); final List<ServerHolder> decommissioningServers - = cluster.getAllServers().stream() + = cluster.getAllManagedServers().stream() .filter(ServerHolder::isDecommissioning) .collect(Collectors.toList()); @@ -152,6 +153,7 @@ public class PrepareBalancerAndLoadQueues implements CoordinatorDuty ) { final Set<String> decommissioningServers = dynamicConfig.getDecommissioningNodes(); + final Set<String> unmanagedServers = new HashSet<>(dynamicConfig.getCloneServers().keySet()); final DruidCluster.Builder cluster = DruidCluster.builder(); for (ImmutableDruidServer server : currentServers) { cluster.add( @@ -159,6 +161,7 @@ public class PrepareBalancerAndLoadQueues implements CoordinatorDuty server, taskMaster.getPeonForServer(server), decommissioningServers.contains(server.getHost()), + unmanagedServers.contains(server.getHost()), segmentLoadingConfig.getMaxSegmentsInLoadQueue(), segmentLoadingConfig.getMaxLifetimeInLoadQueue() ) @@ -173,7 +176,16 @@ public class PrepareBalancerAndLoadQueues implements CoordinatorDuty RowKey rowKey = RowKey.of(Dimension.TIER, tier); stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size()); - long totalCapacity = historicals.stream().mapToLong(ServerHolder::getMaxSize).sum(); + long totalCapacity = 0; + long cloneCount = 0; + for (ServerHolder holder : historicals) { + if (holder.isUnmanaged()) { + cloneCount += 1; + } else { + totalCapacity += holder.getMaxSize(); + } + } + stats.add(Stats.Tier.CLONE_COUNT, rowKey, cloneCount); stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity); }); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java index 2bd9fd29548..761e3383ede 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java @@ -62,7 +62,7 @@ public class UnloadUnusedSegments implements CoordinatorDuty broadcastStatusByDatasource.put(broadcastDatasource, true); } - final List<ServerHolder> allServers = params.getDruidCluster().getAllServers(); + final List<ServerHolder> allServers = params.getDruidCluster().getAllManagedServers(); int numCancelledLoads = allServers.stream().mapToInt( server -> cancelLoadOfUnusedSegments(server, broadcastStatusByDatasource, params) ).sum(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java index 38eeb42413e..e4fa849beb1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java @@ -150,8 +150,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon this.serverCapabilities = fetchSegmentLoadingCapabilities(); } - @VisibleForTesting - SegmentLoadingCapabilities fetchSegmentLoadingCapabilities() + private SegmentLoadingCapabilities fetchSegmentLoadingCapabilities() { try { final URL segmentLoadingCapabilitiesURL = new URL( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java index 1f9307cae56..cc007449b57 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java @@ -51,7 +51,7 @@ public class RoundRobinServerSelector public RoundRobinServerSelector(DruidCluster cluster) { - cluster.getHistoricals().forEach( + cluster.getManagedHistoricals().forEach( (tier, servers) -> tierToServers.put(tier, new CircularServerList(servers)) ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java index 3d3e34072fb..241759f8b95 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java @@ -46,7 +46,7 @@ public class SegmentReplicaCountMap private void initReplicaCounts(DruidCluster cluster) { - cluster.getHistoricals().forEach( + cluster.getManagedHistoricals().forEach( (tier, historicals) -> historicals.forEach( serverHolder -> { // Add segments already loaded on this server diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java index a5e38eee7d4..654fe42b220 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java @@ -88,7 +88,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler this.useRoundRobinAssignment = loadingConfig.isUseRoundRobinSegmentAssignment(); this.serverSelector = useRoundRobinAssignment ? new RoundRobinServerSelector(cluster) : null; - cluster.getHistoricals().forEach( + cluster.getManagedHistoricals().forEach( (tier, historicals) -> tierToHistoricalCount.put(tier, historicals.size()) ); } @@ -275,7 +275,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler } final SegmentStatusInTier segmentStatus = - new SegmentStatusInTier(segment, cluster.getHistoricalsByTier(tier)); + new SegmentStatusInTier(segment, cluster.getManagedHistoricalsByTier(tier)); // Cancel all moves in this tier if it does not need to have replicas if (shouldCancelMoves) { @@ -326,7 +326,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler public void broadcastSegment(DataSegment segment) { final Object2IntOpenHashMap<String> tierToRequiredReplicas = new Object2IntOpenHashMap<>(); - for (ServerHolder server : cluster.getAllServers()) { + for (ServerHolder server : cluster.getAllManagedServers()) { // Ignore servers which are not broadcast targets if (!server.getServer().getType().isSegmentBroadcastTarget()) { continue; @@ -577,7 +577,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler { final Map<String, Integer> tierToLoadingReplicaCount = new HashMap<>(); - cluster.getHistoricals().forEach( + cluster.getManagedHistoricals().forEach( (tier, historicals) -> { int numLoadingReplicas = historicals.stream().mapToInt(ServerHolder::getNumLoadingReplicas).sum(); tierToLoadingReplicaCount.put(tier, numLoadingReplicas); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java index 0bc3b609bd3..0571245c985 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java @@ -65,6 +65,12 @@ public class Stats // Values computed in a run public static final CoordinatorStat REPLICATION_THROTTLE_LIMIT = CoordinatorStat.toDebugOnly("replicationThrottleLimit"); + + // Cloned segments in a run + public static final CoordinatorStat ASSIGNED_TO_CLONE + = CoordinatorStat.toDebugAndEmit("cloneLoad", "segment/clone/assigned/count"); + public static final CoordinatorStat DROPPED_FROM_CLONE + = CoordinatorStat.toDebugAndEmit("cloneDrop", "segment/clone/dropped/count"); } public static class SegmentQueue @@ -98,6 +104,8 @@ public class Stats = CoordinatorStat.toDebugAndEmit("maxRepFactor", "tier/replication/factor"); public static final CoordinatorStat HISTORICAL_COUNT = CoordinatorStat.toDebugAndEmit("numHistorical", "tier/historical/count"); + public static final CoordinatorStat CLONE_COUNT + = CoordinatorStat.toDebugAndEmit("numClones", "tier/historical/clone/count"); } public static class Compaction diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java index 17a4de1d73f..d47cf2fb741 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java @@ -98,7 +98,7 @@ public class DruidClusterTest } @Test - public void testGetAllServers() + public void testGetAllManagedServers() { clusterBuilder.add(NEW_REALTIME); clusterBuilder.add(NEW_HISTORICAL); @@ -107,7 +107,7 @@ public class DruidClusterTest final Set<ServerHolder> expectedRealtimes = cluster.getRealtimes(); final Map<String, NavigableSet<ServerHolder>> expectedHistoricals = cluster.getHistoricals(); - final Collection<ServerHolder> allServers = cluster.getAllServers(); + final Collection<ServerHolder> allServers = cluster.getAllManagedServers(); Assert.assertEquals(4, allServers.size()); Assert.assertTrue(allServers.containsAll(cluster.getRealtimes())); Assert.assertTrue( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java index fba98bff3ae..cb4a8f1d7f3 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java @@ -73,10 +73,12 @@ public class HttpLoadQueuePeonTest private TestHttpClient httpClient; private HttpLoadQueuePeon httpLoadQueuePeon; + private SegmentLoadingCapabilities segmentLoadingCapabilities; @Before public void setUp() { + segmentLoadingCapabilities = new SegmentLoadingCapabilities(1, 3); httpClient = new TestHttpClient(); httpLoadQueuePeon = new HttpLoadQueuePeon( "http://dummy:4000", @@ -90,14 +92,7 @@ public class HttpLoadQueuePeonTest true ), httpClient.callbackExecutor - ) - { - @Override - SegmentLoadingCapabilities fetchSegmentLoadingCapabilities() - { - return new SegmentLoadingCapabilities(1, 3); - } - }; + ); httpLoadQueuePeon.start(); } @@ -344,14 +339,7 @@ public class HttpLoadQueuePeonTest true ), httpClient.callbackExecutor - ) - { - @Override - SegmentLoadingCapabilities fetchSegmentLoadingCapabilities() - { - return new SegmentLoadingCapabilities(1, 3); - } - }; + ); Assert.assertEquals(1, httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.NORMAL)); Assert.assertEquals(3, httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.TURBO)); @@ -362,7 +350,7 @@ public class HttpLoadQueuePeonTest return success -> httpClient.processedSegments.add(segment); } - private static class TestHttpClient implements HttpClient, DataSegmentChangeHandler + private class TestHttpClient implements HttpClient, DataSegmentChangeHandler { final BlockingExecutorService processingExecutor = new BlockingExecutorService("HttpLoadQueuePeonTest-%s"); final BlockingExecutorService callbackExecutor = new BlockingExecutorService("HttpLoadQueuePeonTest-cb"); @@ -379,6 +367,7 @@ public class HttpLoadQueuePeonTest } @Override + @SuppressWarnings("unchecked") public <Intermediate, Final> ListenableFuture<Final> go( Request request, HttpResponseHandler<Intermediate, Final> httpResponseHandler, @@ -388,7 +377,17 @@ public class HttpLoadQueuePeonTest HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); httpResponse.setContent(ChannelBuffers.buffer(0)); httpResponseHandler.handleResponse(httpResponse, null); + try { + if (request.getUrl().toString().contains("/loadCapabilities")) { + return (ListenableFuture<Final>) Futures.immediateFuture( + new ByteArrayInputStream( + MAPPER.writerFor(SegmentLoadingCapabilities.class) + .writeValueAsBytes(segmentLoadingCapabilities) + ) + ); + } + List<DataSegmentChangeRequest> changeRequests = MAPPER.readValue( request.getContent().array(), HttpLoadQueuePeon.REQUEST_ENTITY_TYPE_REF diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java new file mode 100644 index 00000000000..60be0e1496d --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import org.apache.druid.client.DruidServer; +import org.apache.druid.segment.TestDataSource; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.stats.Stats; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class HistoricalCloningTest extends CoordinatorSimulationBaseTest +{ + private static final long SIZE_1TB = 1_000_000; + + private DruidServer historicalT11; + private DruidServer historicalT12; + private DruidServer historicalT13; + + private final String datasource = TestDataSource.WIKI; + + @Override + public void setUp() + { + // Setup historicals for 1 tier, size 1 TB each + historicalT11 = createHistorical(1, Tier.T1, SIZE_1TB); + historicalT12 = createHistorical(2, Tier.T1, SIZE_1TB); + historicalT13 = createHistorical(3, Tier.T1, SIZE_1TB); + } + + @Test + public void testSimpleCloning() + { + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(Segments.WIKI_10X1D) + .withServers(historicalT11, historicalT12) + .withRules(datasource, Load.on(Tier.T1, 1).forever()) + .withDynamicConfig( + CoordinatorDynamicConfig.builder() + .withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost())) + .withSmartSegmentLoading(true) + .build() + ) + .withImmediateSegmentLoading(true) + .build(); + + startSimulation(sim); + runCoordinatorCycle(); + + verifyValue(Metric.ASSIGNED_COUNT, 10L); + verifyValue( + Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(), + Map.of("server", historicalT12.getName()), + 10L + ); + verifyValue( + Metric.SUCCESS_ACTIONS, + Map.of("server", historicalT11.getName(), "description", "LOAD: NORMAL"), + 10L + ); + verifyValue( + Metric.SUCCESS_ACTIONS, + Map.of("server", historicalT12.getName(), "description", "LOAD: NORMAL"), + 10L + ); + + Assert.assertEquals(Segments.WIKI_10X1D.size(), historicalT11.getTotalSegments()); + Assert.assertEquals(Segments.WIKI_10X1D.size(), historicalT12.getTotalSegments()); + Segments.WIKI_10X1D.forEach(segment -> { + Assert.assertEquals(segment, historicalT11.getSegment(segment.getId())); + Assert.assertEquals(segment, historicalT12.getSegment(segment.getId())); + }); + } + + @Test + public void testAddingNewHistorical() + { + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(Segments.WIKI_10X1D) + .withServers(historicalT11, historicalT12) + .withRules(datasource, Load.on(Tier.T1, 1).forever()) + .withDynamicConfig( + CoordinatorDynamicConfig.builder() + .withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost())) + .withSmartSegmentLoading(true) + .build() + ) + .withImmediateSegmentLoading(true) + .build(); + + // Run 1: Current state is a historical and clone already in sync. + Segments.WIKI_10X1D.forEach(segment -> { + historicalT11.addDataSegment(segment); + historicalT12.addDataSegment(segment); + }); + + startSimulation(sim); + + runCoordinatorCycle(); + + // Confirm number of segments. + Assert.assertEquals(10, historicalT11.getTotalSegments()); + Assert.assertEquals(10, historicalT12.getTotalSegments()); + + // Add a new historical. + final DruidServer newHistorical = createHistorical(3, Tier.T1, 10_000); + addServer(newHistorical); + + // Run 2: Let the coordinator balance segments. + runCoordinatorCycle(); + + // Check that segments have been distributed to the new historical and have also been dropped by the clone + Assert.assertEquals(5, historicalT11.getTotalSegments()); + Assert.assertEquals(5, historicalT12.getTotalSegments()); + Assert.assertEquals(5, newHistorical.getTotalSegments()); + verifyValue( + Stats.Segments.DROPPED_FROM_CLONE.getMetricName(), + Map.of("server", historicalT12.getName()), + 5L + ); + } + + @Test + public void testCloningServerDisappearsAndRelaunched() + { + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(Segments.WIKI_10X1D) + .withServers(historicalT11, historicalT12) + .withRules(datasource, Load.on(Tier.T1, 2).forever()) + .withDynamicConfig( + CoordinatorDynamicConfig.builder() + .withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost())) + .withSmartSegmentLoading(true) + .build() + ) + .withImmediateSegmentLoading(true) + .build(); + + startSimulation(sim); + + // Run 1: All segments are loaded. + runCoordinatorCycle(); + Assert.assertEquals(10, historicalT11.getTotalSegments()); + Assert.assertEquals(10, historicalT12.getTotalSegments()); + + // Target server disappears, loses loaded segments. + removeServer(historicalT12); + Segments.WIKI_10X1D.forEach(segment -> historicalT12.removeDataSegment(segment.getId())); + + // Run 2: No change in source historical. + runCoordinatorCycle(); + + Assert.assertEquals(10, historicalT11.getTotalSegments()); + Assert.assertEquals(0, historicalT12.getTotalSegments()); + + // Server readded + addServer(historicalT12); + + // Run 3: Segments recloned. + runCoordinatorCycle(); + + Assert.assertEquals(10, historicalT11.getTotalSegments()); + Assert.assertEquals(10, historicalT12.getTotalSegments()); + verifyValue( + Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(), + Map.of("server", historicalT12.getName()), + 10L + ); + verifyValue( + Metric.SUCCESS_ACTIONS, + Map.of("server", historicalT12.getName(), "description", "LOAD: NORMAL"), + 10L + ); + + Assert.assertEquals(Segments.WIKI_10X1D.size(), historicalT11.getTotalSegments()); + Assert.assertEquals(Segments.WIKI_10X1D.size(), historicalT12.getTotalSegments()); + Segments.WIKI_10X1D.forEach(segment -> { + Assert.assertEquals(segment, historicalT11.getSegment(segment.getId())); + Assert.assertEquals(segment, historicalT12.getSegment(segment.getId())); + }); + } + + @Test + public void testClonedServerDoesNotFollowReplicationLimit() + { + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(Segments.WIKI_10X100D) + .withServers(historicalT11) + .withRules(datasource, Load.on(Tier.T1, 1).forever()) + .withDynamicConfig( + CoordinatorDynamicConfig.builder() + .withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost())) + .withSmartSegmentLoading(true) + .withReplicationThrottleLimit(2) + .build() + ) + .withImmediateSegmentLoading(true) + .build(); + + Segments.WIKI_10X100D.forEach(segment -> historicalT11.addDataSegment(segment)); + startSimulation(sim); + + // Run 1: All segments are loaded on the source historical + runCoordinatorCycle(); + Assert.assertEquals(1000, historicalT11.getTotalSegments()); + Assert.assertEquals(0, historicalT12.getTotalSegments()); + + // Clone server now added. + addServer(historicalT12); + + // Run 2: Assigns all segments to the cloned historical + runCoordinatorCycle(); + + Assert.assertEquals(1000, historicalT11.getTotalSegments()); + Assert.assertEquals(1000, historicalT12.getTotalSegments()); + + verifyValue( + Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(), + Map.of("server", historicalT12.getName()), + 1000L + ); + + verifyValue( + Metric.SUCCESS_ACTIONS, + Map.of("server", historicalT12.getName(), "description", "LOAD: NORMAL"), + 1000L + ); + } + + @Test + public void testCloningHistoricalWithReplicationLimit() + { + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(Segments.WIKI_10X1D) + .withServers(historicalT11, historicalT12, historicalT13) + .withRules(datasource, Load.on(Tier.T1, 2).forever()) + .withImmediateSegmentLoading(true) + .withDynamicConfig( + CoordinatorDynamicConfig.builder() + .withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost())) + .withSmartSegmentLoading(false) + .withReplicationThrottleLimit(2) + .withMaxSegmentsToMove(0) + .build() + ) + .withImmediateSegmentLoading(true) + .build(); + Segments.WIKI_10X1D.forEach(historicalT13::addDataSegment); + startSimulation(sim); + + // Check that only replication count segments are loaded each run and that the cloning server copies it. + while (historicalT11.getTotalSegments() < Segments.WIKI_10X1D.size()) { + runCoordinatorCycle(); + + // Check that all segments are cloned. + Assert.assertEquals(historicalT11.getTotalSegments(), historicalT12.getTotalSegments()); + + // Check that the replication throttling is respected. + verifyValue(Metric.ASSIGNED_COUNT, 2L); + verifyValue( + Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(), + Map.of("server", historicalT12.getName()), + 2L + ); + } + + Assert.assertEquals(Segments.WIKI_10X1D.size(), historicalT11.getTotalSegments()); + Assert.assertEquals(Segments.WIKI_10X1D.size(), historicalT12.getTotalSegments()); + Assert.assertEquals(Segments.WIKI_10X1D.size(), historicalT13.getTotalSegments()); + Segments.WIKI_10X1D.forEach(segment -> { + Assert.assertEquals(segment, historicalT11.getSegment(segment.getId())); + Assert.assertEquals(segment, historicalT12.getSegment(segment.getId())); + Assert.assertEquals(segment, historicalT13.getSegment(segment.getId())); + }); + } +} diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 1be987ddc27..98324792ef7 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -244,7 +244,8 @@ public class CoordinatorDynamicConfigTest false, false, null, - ImmutableSet.of("host1") + ImmutableSet.of("host1"), + null ); Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty()); } @@ -269,7 +270,8 @@ public class CoordinatorDynamicConfigTest false, false, null, - ImmutableSet.of("host1") + ImmutableSet.of("host1"), + null ); Assert.assertEquals(ImmutableSet.of("test1"), config.getSpecificDataSourcesToKillUnusedSegmentsIn()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org