This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e51181957c Use num cores to determine balancerComputeThreads (#14902)
e51181957c is described below

commit e51181957c596aac55c63c20e5966eba9c86d2ff
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Aug 25 08:15:27 2023 +0530

    Use num cores to determine balancerComputeThreads (#14902)
    
    Changes:
    - Determine the default value of balancerComputeThreads based on number of
    coordinator cpus rather than number of segments. Even if the number of 
segments
    is low and we create more balancer threads, it doesn't hurt the system as 
threads
    would mostly be idle.
    - Remove unused field from SegmentLoadQueueManager
    
    Expected values:
    - Clusters with ~1M segments typically work with Coordinators having 16 
cores or more.
    This would give us 8 balancer threads, which is the same as the current 
maximum.
    - On small clusters, even a single thread is enough to do the required 
balancing work.
---
 docs/configuration/index.md                        | 43 +++++++++---------
 .../coordinator/CoordinatorDynamicConfig.java      | 14 ++++--
 .../loading/SegmentLoadQueueManager.java           | 12 -----
 .../coordinator/loading/SegmentLoadingConfig.java  | 36 ++-------------
 .../coordinator/BalanceSegmentsProfiler.java       |  2 +-
 .../coordinator/CuratorDruidCoordinatorTest.java   |  9 +---
 .../server/coordinator/DruidCoordinatorTest.java   |  4 +-
 .../balancer/SegmentToMoveCalculatorTest.java      | 25 -----------
 .../coordinator/duty/BalanceSegmentsTest.java      |  2 +-
 .../duty/CollectSegmentAndServerStatsTest.java     |  2 +-
 .../duty/MarkOvershadowedSegmentsAsUnusedTest.java |  2 +-
 .../server/coordinator/duty/RunRulesTest.java      | 18 ++------
 .../coordinator/duty/UnloadUnusedSegmentsTest.java |  2 +-
 .../loading/SegmentLoadingConfigTest.java          | 51 ----------------------
 .../rules/BroadcastDistributionRuleTest.java       |  2 +-
 .../server/coordinator/rules/LoadRuleTest.java     |  2 +-
 .../simulate/CoordinatorSimulationBuilder.java     |  2 +-
 .../server/http/CoordinatorDynamicConfigTest.java  | 11 +++--
 18 files changed, 60 insertions(+), 179 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 36023ecf27..b16e676913 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -949,27 +949,27 @@ A sample Coordinator dynamic config JSON object is shown 
below:
 
 Issuing a GET request at the same URL will return the spec that is currently 
in place. A description of the config setup spec is shown below.
 
-|Property| Description                                                         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-|--------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-|`millisToWaitBeforeDeleting`| How long does the Coordinator need to be a 
leader before it can start marking overshadowed segments as unused in metadata 
storage.                                                                        
                                                                                
                                                                                
                                                                                
                    [...]
-|`mergeBytesLimit`| The maximum total uncompressed size in bytes of segments 
to merge.                                                                       
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
-|`mergeSegmentsLimit`| The maximum number of segments that can be in a single 
[append task](../ingestion/tasks.md).                                           
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
-|`smartSegmentLoading`| Enables ["smart" segment loading 
mode](#smart-segment-loading) which dynamically computes the optimal values of 
several properties that maximize Coordinator performance.                       
                                                                                
                                                                                
                                                                                
                                     [...]
-|`maxSegmentsToMove`| The maximum number of segments that can be moved at any 
given time.                                                                     
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
-|`replicantLifetime`| The maximum number of Coordinator runs for which a 
segment can wait in the load queue of a Historical before Druid raises an 
alert.                                                                          
                                                                                
                                                                                
                                                                                
                          [...]
-|`replicationThrottleLimit`| The maximum number of segment replicas that can 
be assigned to a historical tier in a single Coordinator run. This property 
prevents historicals from becoming overwhelmed when loading extra replicas of 
segments that are already available in the cluster.                             
                                                                                
                                                                                
                      [...]
-|`balancerComputeThreads`| Thread pool size for computing moving cost of 
segments during segment balancing. Consider increasing this if you have a lot 
of segments and moving segments begins to stall.                                
                                                                                
                                                                                
                                                                                
                      [...]
-|`killDataSourceWhitelist`| List of specific data sources for which kill tasks 
are sent if property `druid.coordinator.kill.on` is true. This can be a list of 
comma-separated data source names or a JSON array.                              
                                                                                
                                                                                
                                                                                
              [...]
-|`killTaskSlotRatio`| Ratio of total available task slots, including 
autoscaling if applicable that will be allowed for kill tasks. This limit only 
applies for kill tasks that are spawned automatically by the coordinator's auto 
kill duty, which is enabled when `druid.coordinator.kill.on` is true.           
                                                                                
                                                                                
                         [...]
-|`maxKillTaskSlots`| Maximum number of tasks that will be allowed for kill 
tasks. This limit only applies for kill tasks that are spawned automatically by 
the coordinator's auto kill duty, which is enabled when 
`druid.coordinator.kill.on` is true.                                            
                                                                                
                                                                                
                                          [...]
-|`killPendingSegmentsSkipList`| List of data sources for which pendingSegments 
are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is 
true. This can be a list of comma-separated data sources or a JSON array.       
                                                                                
                                                                                
                                                                                
              [...]
-|`maxSegmentsInNodeLoadingQueue`| The maximum number of segments allowed in 
the load queue of any given server. Use this parameter to load segments faster 
if, for example, the cluster contains slow-loading nodes or if there are too 
many segments to be replicated to a particular node (when faster loading is 
preferred to better segments distribution). The optimal value depends on the 
loading speed of segments, acceptable replication time and number of nodes.     
                            [...]
-|`useRoundRobinSegmentAssignment`| Boolean flag for whether segments should be 
assigned to historicals in a round robin fashion. When disabled, segment 
assignment is done using the chosen balancer strategy. When enabled, this can 
speed up segment assignments leaving balancing to move the segments to their 
optimal locations (based on the balancer strategy) lazily.                      
                                                                                
                          [...]
-|`decommissioningNodes`| List of historical servers to 'decommission'. 
Coordinator will not assign new segments to 'decommissioning' servers,  and 
segments will be moved away from them to be placed on non-decommissioning 
servers at the maximum rate specified by 
`decommissioningMaxPercentOfMaxSegmentsToMove`.                                 
                                                                                
                                                                       [...]
-|`decommissioningMaxPercentOfMaxSegmentsToMove`| Upper limit of segments the 
Coordinator can move from decommissioning servers to active non-decommissioning 
servers during a single run. This value is relative to the total maximum number 
of segments that can be moved at any given time based upon the value of 
`maxSegmentsToMove`.<br /><br />If 
`decommissioningMaxPercentOfMaxSegmentsToMove` is 0, the Coordinator does not 
move segments to decommissioning servers, effectively putting them in  [...]
-|`pauseCoordination`| Boolean flag for whether or not the coordinator should 
execute its various duties of coordinating the cluster. Setting this to true 
essentially pauses all coordination work while allowing the API to remain up. 
Duties that are paused include all classes that implement the `CoordinatorDuty` 
Interface. Such duties include: Segment balancing, Segment compaction, 
Submitting kill tasks for unused segments (if enabled), Logging of used 
segments in the cluster, Marking of n [...]
-|`replicateAfterLoadTimeout`| Boolean flag for whether or not additional 
replication is needed for segments that have failed to load due to the expiry 
of `druid.coordinator.load.timeout`. If this is set to true, the coordinator 
will attempt to replicate the failed segment on a different historical server. 
This helps improve the segment availability if there are a few slow historicals 
in the cluster. However, the slow historical may still load the segment later 
and the coordinator may iss [...]
-|`maxNonPrimaryReplicantsToLoad`| The maximum number of replicas that can be 
assigned across all tiers in a single Coordinator run. This parameter serves 
the same purpose as `replicationThrottleLimit` except this limit applies at the 
cluster-level instead of per tier. The default value does not apply a limit to 
the number of replicas assigned per coordination cycle. If you want to use a 
non-default value for this property, you may want to start with `~20%` of the 
number of segments found [...]
+|Property|Description|Default|
+|--------|-----------|-------|
+|`millisToWaitBeforeDeleting`|How long does the Coordinator need to be a 
leader before it can start marking overshadowed segments as unused in metadata 
storage.| 900000 (15 mins)|
+|`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to 
merge.|524288000L|
+|`mergeSegmentsLimit`|The maximum number of segments that can be in a single 
[append task](../ingestion/tasks.md).|100|
+|`smartSegmentLoading`|Enables ["smart" segment loading 
mode](#smart-segment-loading) which dynamically computes the optimal values of 
several properties that maximize Coordinator performance.|true|
+|`maxSegmentsToMove`|The maximum number of segments that can be moved in a 
Historical tier at any given time.|100|
+|`replicantLifetime`|The maximum number of Coordinator runs for which a 
segment can wait in the load queue of a Historical before Druid raises an 
alert.|15|
+|`replicationThrottleLimit`|The maximum number of segment replicas that can be 
assigned to a historical tier in a single Coordinator run. This property 
prevents historicals from becoming overwhelmed when loading extra replicas of 
segments that are already available in the cluster.|500|
+|`balancerComputeThreads`|Thread pool size for computing moving cost of 
segments during segment balancing. Consider increasing this if you have a lot 
of segments and moving segments begins to stall.|`num_cores` / 2|
+|`killDataSourceWhitelist`|List of specific data sources for which kill tasks 
are sent if property `druid.coordinator.kill.on` is true. This can be a list of 
comma-separated data source names or a JSON array.|none|
+|`killTaskSlotRatio`|Ratio of total available task slots, including 
autoscaling if applicable that will be allowed for kill tasks. This limit only 
applies for kill tasks that are spawned automatically by the coordinator's auto 
kill duty, which is enabled when `druid.coordinator.kill.on` is true.| 1 - all 
task slots can be used|
+|`maxKillTaskSlots`|Maximum number of tasks that will be allowed for kill 
tasks. This limit only applies for kill tasks that are spawned automatically by 
the coordinator's auto kill duty, which is enabled when 
`druid.coordinator.kill.on` is true.|`Integer.MAX_VALUE` - no limit|
+|`killPendingSegmentsSkipList`|List of data sources for which pendingSegments 
are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is 
true. This can be a list of comma-separated data sources or a JSON array.|none|
+|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments allowed in the 
load queue of any given server. Use this parameter to load segments faster if, 
for example, the cluster contains slow-loading nodes or if there are too many 
segments to be replicated to a particular node (when faster loading is 
preferred to better segments distribution). The optimal value depends on the 
loading speed of segments, acceptable replication time and number of nodes.|500|
+|`useRoundRobinSegmentAssignment`|Boolean flag for whether segments should be 
assigned to historicals in a round robin fashion. When disabled, segment 
assignment is done using the chosen balancer strategy. When enabled, this can 
speed up segment assignments leaving balancing to move the segments to their 
optimal locations (based on the balancer strategy) lazily.|true|
+|`decommissioningNodes`|List of historical servers to 'decommission'. 
Coordinator will not assign new segments to 'decommissioning' servers,  and 
segments will be moved away from them to be placed on non-decommissioning 
servers at the maximum rate specified by 
`decommissioningMaxPercentOfMaxSegmentsToMove`.|none|
+|`decommissioningMaxPercentOfMaxSegmentsToMove`|Upper limit of segments the 
Coordinator can move from decommissioning servers to active non-decommissioning 
servers during a single run. This value is relative to the total maximum number 
of segments that can be moved at any given time based upon the value of 
`maxSegmentsToMove`.<br /><br />If 
`decommissioningMaxPercentOfMaxSegmentsToMove` is 0, the Coordinator does not 
move segments to decommissioning servers, effectively putting them in a [...]
+|`pauseCoordination`|Boolean flag for whether or not the coordinator should 
execute its various duties of coordinating the cluster. Setting this to true 
essentially pauses all coordination work while allowing the API to remain up. 
Duties that are paused include all classes that implement the `CoordinatorDuty` 
Interface. Such duties include: Segment balancing, Segment compaction, 
Submitting kill tasks for unused segments (if enabled), Logging of used 
segments in the cluster, Marking of ne [...]
+|`replicateAfterLoadTimeout`|Boolean flag for whether or not additional 
replication is needed for segments that have failed to load due to the expiry 
of `druid.coordinator.load.timeout`. If this is set to true, the coordinator 
will attempt to replicate the failed segment on a different historical server. 
This helps improve the segment availability if there are a few slow historicals 
in the cluster. However, the slow historical may still load the segment later 
and the coordinator may issu [...]
+|`maxNonPrimaryReplicantsToLoad`|The maximum number of replicas that can be 
assigned across all tiers in a single Coordinator run. This parameter serves 
the same purpose as `replicationThrottleLimit` except this limit applies at the 
cluster-level instead of per tier. The default value does not apply a limit to 
the number of replicas assigned per coordination cycle. If you want to use a 
non-default value for this property, you may want to start with `~20%` of the 
number of segments found  [...]
 
 ##### Smart segment loading
 
@@ -989,6 +989,7 @@ Druid computes the values to optimize Coordinator 
performance, based on the curr
 |`maxNonPrimaryReplicantsToLoad`|`Integer.MAX_VALUE` (no limit)|This 
throttling is already handled by `replicationThrottleLimit`.|
 |`maxSegmentsToMove`|2% of used segments, minimum value 100, maximum value 
1000|Ensures that some segments are always moving in the cluster to keep it 
well balanced. The maximum value keeps the Coordinator run times bounded.|
 |`decommissioningMaxPercentOfMaxSegmentsToMove`|100|Prioritizes the move of 
segments from decommissioning servers so that they can be terminated quickly.|
+|`balancerComputeThreads`|`num_cores` / 2|Ensures that there are enough 
threads to perform balancing computations without hogging all Coordinator 
resources.|
 
 When `smartSegmentLoading` is disabled, Druid uses the configured values of 
these properties.
 Disable `smartSegmentLoading` only if you want to explicitly set the values of 
any of the above properties.
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index 8135974368..c9811fa318 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
 import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
 import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.utils.JvmUtils;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -516,6 +517,15 @@ public class CoordinatorDynamicConfig
     return new Builder();
   }
 
+  /**
+   * Returns a value of {@code (num processors / 2)} to ensure that balancing
+   * computations do not hog all Coordinator resources.
+   */
+  public static int getDefaultBalancerComputeThreads()
+  {
+    return Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() / 2);
+  }
+
   private static class Defaults
   {
     static final long LEADING_MILLIS_BEFORE_MARK_UNUSED = 
TimeUnit.MINUTES.toMillis(15);
@@ -524,8 +534,6 @@ public class CoordinatorDynamicConfig
     static final int MAX_SEGMENTS_TO_MOVE = 100;
     static final int REPLICANT_LIFETIME = 15;
     static final int REPLICATION_THROTTLE_LIMIT = 500;
-    static final int BALANCER_COMPUTE_THREADS = 1;
-    static final boolean EMIT_BALANCING_STATS = false;
     static final int MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 500;
     static final int DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
     static final boolean PAUSE_COORDINATION = false;
@@ -746,7 +754,7 @@ public class CoordinatorDynamicConfig
           valueOrDefault(maxSegmentsToMove, Defaults.MAX_SEGMENTS_TO_MOVE),
           valueOrDefault(replicantLifetime, Defaults.REPLICANT_LIFETIME),
           valueOrDefault(replicationThrottleLimit, 
Defaults.REPLICATION_THROTTLE_LIMIT),
-          valueOrDefault(balancerComputeThreads, 
Defaults.BALANCER_COMPUTE_THREADS),
+          valueOrDefault(balancerComputeThreads, 
getDefaultBalancerComputeThreads()),
           specificDataSourcesToKillUnusedSegmentsIn,
           valueOrDefault(killTaskSlotRatio, Defaults.KILL_TASK_SLOT_RATIO),
           valueOrDefault(maxKillTaskSlots, Defaults.MAX_KILL_TASK_SLOTS),
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadQueueManager.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadQueueManager.java
index dcaf87fb53..26226a7c0b 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadQueueManager.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadQueueManager.java
@@ -22,7 +22,6 @@ package org.apache.druid.server.coordinator.loading;
 import com.google.inject.Inject;
 import org.apache.druid.client.ServerInventoryView;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.metadata.SegmentsMetadataManager;
 import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.timeline.DataSegment;
 
@@ -36,17 +35,14 @@ public class SegmentLoadQueueManager
 
   private final LoadQueueTaskMaster taskMaster;
   private final ServerInventoryView serverInventoryView;
-  private final SegmentsMetadataManager segmentsMetadataManager;
 
   @Inject
   public SegmentLoadQueueManager(
       ServerInventoryView serverInventoryView,
-      SegmentsMetadataManager segmentsMetadataManager,
       LoadQueueTaskMaster taskMaster
   )
   {
     this.serverInventoryView = serverInventoryView;
-    this.segmentsMetadataManager = segmentsMetadataManager;
     this.taskMaster = taskMaster;
   }
 
@@ -148,12 +144,4 @@ public class SegmentLoadQueueManager
     return true;
   }
 
-  /**
-   * Marks the given segment as unused.
-   */
-  public boolean deleteSegment(DataSegment segment)
-  {
-    return segmentsMetadataManager.markSegmentAsUnused(segment.getId());
-  }
-
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
index f08b7ed5ca..25159cc2eb 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
@@ -50,12 +50,9 @@ public class SegmentLoadingConfig
       // Compute replicationThrottleLimit with a lower bound of 100
       final int throttlePercentage = 2;
       final int replicationThrottleLimit = Math.max(100, numUsedSegments * 
throttlePercentage / 100);
-      final int balancerComputeThreads = 
computeNumBalancerThreads(numUsedSegments);
-
       log.info(
-          "Smart segment loading is enabled. Calculated 
balancerComputeThreads[%d]"
-          + " and replicationThrottleLimit[%,d] (%d%% of used segments[%,d]).",
-          balancerComputeThreads, replicationThrottleLimit, 
throttlePercentage, numUsedSegments
+          "Smart segment loading is enabled. Calculated 
replicationThrottleLimit[%,d] (%d%% of used segments[%,d]).",
+          replicationThrottleLimit, throttlePercentage, numUsedSegments
       );
 
       return new SegmentLoadingConfig(
@@ -64,7 +61,7 @@ public class SegmentLoadingConfig
           Integer.MAX_VALUE,
           60,
           true,
-          balancerComputeThreads
+          CoordinatorDynamicConfig.getDefaultBalancerComputeThreads()
       );
     } else {
       // Use the configured values
@@ -125,31 +122,4 @@ public class SegmentLoadingConfig
   {
     return balancerComputeThreads;
   }
-
-  /**
-   * Computes the number of threads to be used in the balancing executor.
-   * The number of used segments in a cluster is generally a good indicator of
-   * the cluster size and has been used here as a proxy for the actual number 
of
-   * segments that would be involved in cost computations.
-   * <p>
-   * The number of threads increases by 1 first for every 50k segments, then 
for
-   * every 75k segments and so on.
-   *
-   * @return Number of {@code balancerComputeThreads} in the range [1, 8].
-   */
-  public static int computeNumBalancerThreads(int numUsedSegments)
-  {
-    // Add an extra thread when numUsedSegments increases by a step
-    final int[] stepValues = {50, 50, 75, 75, 100, 100, 150, 150};
-
-    int remainder = numUsedSegments / 1000;
-    for (int step = 0; step < stepValues.length; ++step) {
-      remainder -= stepValues[step];
-      if (remainder < 0) {
-        return step + 1;
-      }
-    }
-
-    return stepValues.length;
-  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
index 63a0944ed7..ba69b1b1ee 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
@@ -67,7 +67,7 @@ public class BalanceSegmentsProfiler
   @Before
   public void setUp()
   {
-    loadQueueManager = new SegmentLoadQueueManager(null, null, null);
+    loadQueueManager = new SegmentLoadQueueManager(null, null);
     druidServer1 = EasyMock.createMock(ImmutableDruidServer.class);
     druidServer2 = EasyMock.createMock(ImmutableDruidServer.class);
     emitter = EasyMock.createMock(ServiceEmitter.class);
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index b9b6fc8516..e8f9f1e9fe 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -39,7 +39,6 @@ import org.apache.druid.curator.CuratorUtils;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.metadata.SegmentsMetadataManager;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
@@ -81,7 +80,6 @@ import java.util.concurrent.TimeUnit;
  */
 public class CuratorDruidCoordinatorTest extends CuratorTestBase
 {
-  private SegmentsMetadataManager segmentsMetadataManager;
   private DataSourcesSnapshot dataSourcesSnapshot;
   private DruidCoordinatorRuntimeParams coordinatorRuntimeParams;
 
@@ -121,7 +119,6 @@ public class CuratorDruidCoordinatorTest extends 
CuratorTestBase
   @Before
   public void setUp() throws Exception
   {
-    segmentsMetadataManager = 
EasyMock.createNiceMock(SegmentsMetadataManager.class);
     dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
     coordinatorRuntimeParams = 
EasyMock.createNiceMock(DruidCoordinatorRuntimeParams.class);
 
@@ -294,8 +291,6 @@ public class CuratorDruidCoordinatorTest extends 
CuratorTestBase
     ImmutableDruidDataSource druidDataSource = 
EasyMock.createNiceMock(ImmutableDruidDataSource.class);
     
EasyMock.expect(druidDataSource.getSegment(EasyMock.anyObject(SegmentId.class))).andReturn(sourceSegments.get(2));
     EasyMock.replay(druidDataSource);
-    
EasyMock.expect(segmentsMetadataManager.getImmutableDataSourceWithUsedSegments(EasyMock.anyString()))
-            .andReturn(druidDataSource);
     EasyMock.expect(coordinatorRuntimeParams.getDataSourcesSnapshot())
             .andReturn(dataSourcesSnapshot).anyTimes();
     final CoordinatorDynamicConfig dynamicConfig =
@@ -322,7 +317,7 @@ public class CuratorDruidCoordinatorTest extends 
CuratorTestBase
     EasyMock.expect(coordinatorRuntimeParams.getBalancerStrategy())
             .andReturn(balancerStrategy).anyTimes();
     
EasyMock.expect(coordinatorRuntimeParams.getDruidCluster()).andReturn(cluster).anyTimes();
-    EasyMock.replay(segmentsMetadataManager, coordinatorRuntimeParams, 
balancerStrategy);
+    EasyMock.replay(coordinatorRuntimeParams, balancerStrategy);
 
     EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString()))
             .andReturn(druidDataSource).anyTimes();
@@ -334,7 +329,7 @@ public class CuratorDruidCoordinatorTest extends 
CuratorTestBase
 
     // Move the segment from source to dest
     SegmentLoadQueueManager loadQueueManager =
-        new SegmentLoadQueueManager(baseView, segmentsMetadataManager, 
taskMaster);
+        new SegmentLoadQueueManager(baseView, taskMaster);
     StrategicSegmentAssigner segmentAssigner = 
createSegmentAssigner(loadQueueManager, coordinatorRuntimeParams);
     segmentAssigner.moveSegment(
         segmentToMove,
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index d1e5b7c5b2..380650f27a 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -178,7 +178,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
         scheduledExecutorFactory,
         null,
         loadQueueTaskMaster,
-        new SegmentLoadQueueManager(serverInventoryView, 
segmentsMetadataManager, loadQueueTaskMaster),
+        new SegmentLoadQueueManager(serverInventoryView, loadQueueTaskMaster),
         new LatchableServiceAnnouncer(leaderAnnouncerLatch, 
leaderUnannouncerLatch),
         druidNode,
         new HashSet<>(),
@@ -789,7 +789,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
         scheduledExecutorFactory,
         null,
         loadQueueTaskMaster,
-        new SegmentLoadQueueManager(serverInventoryView, 
segmentsMetadataManager, loadQueueTaskMaster),
+        new SegmentLoadQueueManager(serverInventoryView, loadQueueTaskMaster),
         new LatchableServiceAnnouncer(leaderAnnouncerLatch, 
leaderUnannouncerLatch),
         druidNode,
         new HashSet<>(),
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculatorTest.java
index 58c1cde409..56fb316aeb 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculatorTest.java
@@ -24,7 +24,6 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.CreateDataSegments;
 import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
 import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.Duration;
@@ -112,21 +111,6 @@ public class SegmentToMoveCalculatorTest
     Assert.assertEquals(1_000, computeMaxSegmentsToMove(10_000_000, 8));
   }
 
-  @Test
-  public void testMaxSegmentsToMoveWithComputedNumThreads()
-  {
-    Assert.assertEquals(1_900, computeNumThreadsAndMaxToMove(10_000));
-    Assert.assertEquals(9_700, computeNumThreadsAndMaxToMove(50_000));
-
-    Assert.assertEquals(19_500, computeNumThreadsAndMaxToMove(100_000));
-    Assert.assertEquals(39_000, computeNumThreadsAndMaxToMove(200_000));
-    Assert.assertEquals(29_000, computeNumThreadsAndMaxToMove(500_000));
-
-    Assert.assertEquals(16_000, computeNumThreadsAndMaxToMove(1_000_000));
-    Assert.assertEquals(8_000, computeNumThreadsAndMaxToMove(2_000_000));
-    Assert.assertEquals(1_000, computeNumThreadsAndMaxToMove(10_000_000));
-  }
-
   @Test
   public void testMinSegmentsToMove()
   {
@@ -237,15 +221,6 @@ public class SegmentToMoveCalculatorTest
     return 
SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(totalSegments, 1, 
coordinatorPeriod);
   }
 
-  private static int computeNumThreadsAndMaxToMove(int totalSegments)
-  {
-    return SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(
-        totalSegments,
-        SegmentLoadingConfig.computeNumBalancerThreads(totalSegments),
-        DEFAULT_COORDINATOR_PERIOD
-    );
-  }
-
   private static int computeMinSegmentsToMove(int totalSegmentsInTier)
   {
     return 
SegmentToMoveCalculator.computeMinSegmentsToMoveInTier(totalSegmentsInTier);
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
index 1f51565bcd..db2bcda006 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
@@ -72,7 +72,7 @@ public class BalanceSegmentsTest
   @Before
   public void setUp()
   {
-    loadQueueManager = new SegmentLoadQueueManager(null, null, null);
+    loadQueueManager = new SegmentLoadQueueManager(null, null);
 
     // Create test segments for multiple datasources
     final DateTime start1 = DateTimes.of("2012-01-01");
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java
index 9921281fde..f1e46f70b4 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java
@@ -50,7 +50,7 @@ public class CollectSegmentAndServerStatsTest
                                      .withDruidCluster(DruidCluster.EMPTY)
                                      .withUsedSegmentsInTest()
                                      .withBalancerStrategy(new 
RandomBalancerStrategy())
-                                     .withSegmentAssignerUsing(new 
SegmentLoadQueueManager(null, null, null))
+                                     .withSegmentAssignerUsing(new 
SegmentLoadQueueManager(null, null))
                                      .build();
 
     Mockito.when(mockTaskMaster.getAllPeons())
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java
index acbf89e322..d7cbf7773c 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java
@@ -99,7 +99,7 @@ public class MarkOvershadowedSegmentsAsUnusedTest
             
CoordinatorDynamicConfig.builder().withMarkSegmentAsUnusedDelayMillis(0).build()
         )
         .withBalancerStrategy(new RandomBalancerStrategy())
-        .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null, 
null))
+        .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null))
         .build();
 
     SegmentTimeline timeline = 
segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments()
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java
index 37292aefe8..32cd1f66ef 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java
@@ -33,7 +33,6 @@ import org.apache.druid.java.util.emitter.core.EventMap;
 import org.apache.druid.java.util.emitter.service.AlertEvent;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.metadata.MetadataRuleManager;
-import org.apache.druid.metadata.SegmentsMetadataManager;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
@@ -83,7 +82,6 @@ public class RunRulesTest
   private RunRules ruleRunner;
   private StubServiceEmitter emitter;
   private MetadataRuleManager databaseRuleManager;
-  private SegmentsMetadataManager segmentsMetadataManager;
   private SegmentLoadQueueManager loadQueueManager;
   private final List<DataSegment> usedSegments =
       CreateDataSegments.ofDatasource(DATASOURCE)
@@ -101,9 +99,8 @@ public class RunRulesTest
     emitter = new StubServiceEmitter("coordinator", "host");
     EmittingLogger.registerEmitter(emitter);
     databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
-    segmentsMetadataManager = 
EasyMock.createNiceMock(SegmentsMetadataManager.class);
     ruleRunner = new RunRules(Set::size);
-    loadQueueManager = new SegmentLoadQueueManager(null, 
segmentsMetadataManager, null);
+    loadQueueManager = new SegmentLoadQueueManager(null, null);
     balancerExecutor = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, 
"RunRulesTest-%d"));
   }
 
@@ -535,10 +532,6 @@ public class RunRulesTest
     EasyMock.expectLastCall().atLeastOnce();
     mockEmptyPeon();
 
-    
EasyMock.expect(segmentsMetadataManager.markSegmentAsUnused(EasyMock.anyObject()))
-            .andReturn(true).anyTimes();
-    EasyMock.replay(segmentsMetadataManager);
-
     
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
         Lists.newArrayList(
             new IntervalLoadRule(
@@ -587,7 +580,7 @@ public class RunRulesTest
             new 
IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
         )
     ).atLeastOnce();
-    EasyMock.replay(databaseRuleManager, segmentsMetadataManager);
+    EasyMock.replay(databaseRuleManager);
 
     DruidServer server1 = createHistorical("serverNorm", "normal");
     server1.addDataSegment(usedSegments.get(0));
@@ -644,7 +637,7 @@ public class RunRulesTest
             new 
IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
         )
     ).atLeastOnce();
-    EasyMock.replay(databaseRuleManager, segmentsMetadataManager);
+    EasyMock.replay(databaseRuleManager);
 
     DruidServer server1 = createHistorical("server1", "hot");
     server1.addDataSegment(usedSegments.get(0));
@@ -688,7 +681,7 @@ public class RunRulesTest
             new 
IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
         )
     ).atLeastOnce();
-    EasyMock.replay(databaseRuleManager, segmentsMetadataManager);
+    EasyMock.replay(databaseRuleManager);
 
     DruidServer server1 = createHistorical("server1", "hot");
     DruidServer server2 = createHistorical("serverNorm2", "normal");
@@ -856,9 +849,6 @@ public class RunRulesTest
   @Test
   public void testReplicantThrottleAcrossTiers()
   {
-    
EasyMock.expect(segmentsMetadataManager.markSegmentAsUnused(EasyMock.anyObject()))
-            .andReturn(true).anyTimes();
-    EasyMock.replay(segmentsMetadataManager);
     mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject());
     EasyMock.expectLastCall().atLeastOnce();
     mockEmptyPeon();
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java
index ba39c5fc43..3199abfc0f 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java
@@ -84,7 +84,7 @@ public class UnloadUnusedSegmentsTest
     brokerServer = EasyMock.createMock(ImmutableDruidServer.class);
     indexerServer = EasyMock.createMock(ImmutableDruidServer.class);
     databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
-    loadQueueManager = new SegmentLoadQueueManager(null, null, null);
+    loadQueueManager = new SegmentLoadQueueManager(null, null);
 
     DateTime start1 = DateTimes.of("2012-01-01");
     DateTime start2 = DateTimes.of("2012-02-01");
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfigTest.java
deleted file mode 100644
index 947ac45ca7..0000000000
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfigTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator.loading;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class SegmentLoadingConfigTest
-{
-
-  @Test
-  public void testComputeNumBalancerThreads()
-  {
-    Assert.assertEquals(1, computeBalancerThreads(0));
-    Assert.assertEquals(1, computeBalancerThreads(30_000));
-    Assert.assertEquals(2, computeBalancerThreads(50_000));
-    Assert.assertEquals(3, computeBalancerThreads(100_000));
-
-    Assert.assertEquals(4, computeBalancerThreads(175_000));
-    Assert.assertEquals(5, computeBalancerThreads(250_000));
-    Assert.assertEquals(6, computeBalancerThreads(350_000));
-    Assert.assertEquals(7, computeBalancerThreads(450_000));
-    Assert.assertEquals(8, computeBalancerThreads(600_000));
-
-    Assert.assertEquals(8, computeBalancerThreads(1_000_000));
-    Assert.assertEquals(8, computeBalancerThreads(10_000_000));
-  }
-
-  private int computeBalancerThreads(int numUsedSegments)
-  {
-    return SegmentLoadingConfig.computeNumBalancerThreads(numUsedSegments);
-  }
-
-}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
index b57cfc9291..a9f1485e06 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
@@ -238,7 +238,7 @@ public class BroadcastDistributionRuleTest
         .withDruidCluster(druidCluster)
         .withUsedSegmentsInTest(usedSegments)
         .withBalancerStrategy(new RandomBalancerStrategy())
-        .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null, 
null))
+        .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null))
         .build();
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
index 7ef07719be..0f91d09693 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
@@ -91,7 +91,7 @@ public class LoadRuleTest
   {
     exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, 
"LoadRuleTest-%d"));
     balancerStrategy = new CostBalancerStrategy(exec);
-    loadQueueManager = new SegmentLoadQueueManager(null, null, null);
+    loadQueueManager = new SegmentLoadQueueManager(null, null);
   }
 
   @After
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 776c2f836c..4556e20315 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -489,7 +489,7 @@ public class CoordinatorSimulationBuilder
           null
       );
       this.loadQueueManager =
-          new SegmentLoadQueueManager(coordinatorInventoryView, 
segmentManager, loadQueueTaskMaster);
+          new SegmentLoadQueueManager(coordinatorInventoryView, 
loadQueueTaskMaster);
 
       this.jacksonConfigManager = mockConfigManager();
       setDynamicConfig(dynamicConfig);
diff --git 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index d7bac78e10..895d6b08b4 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -24,11 +24,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.utils.JvmUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
-
 import java.util.Set;
 
 /**
@@ -631,7 +631,7 @@ public class CoordinatorDynamicConfigTest
         100,
         15,
         500,
-        1,
+        getDefaultNumBalancerThreads(),
         emptyList,
         1.0,
         Integer.MAX_VALUE,
@@ -661,7 +661,7 @@ public class CoordinatorDynamicConfigTest
         100,
         15,
         500,
-        1,
+        getDefaultNumBalancerThreads(),
         ImmutableSet.of("DATASOURCE"),
         1.0,
         Integer.MAX_VALUE,
@@ -792,4 +792,9 @@ public class CoordinatorDynamicConfigTest
     Assert.assertEquals(replicateAfterLoadTimeout, 
config.getReplicateAfterLoadTimeout());
     Assert.assertEquals(maxNonPrimaryReplicantsToLoad, 
config.getMaxNonPrimaryReplicantsToLoad());
   }
+
+  private static int getDefaultNumBalancerThreads()
+  {
+    return Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() / 2);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to