capistrant commented on a change in pull request #12013:
URL: https://github.com/apache/druid/pull/12013#discussion_r762099478



##########
File path: docs/configuration/index.md
##########
@@ -781,7 +781,10 @@ These Coordinator static configurations can be defined in 
the `coordinator/runti
 
 |Property|Description|Default|
 |--------|-----------|-------|
+|`druid.coordinator.dutiesRunnableExecutor.threadPoolSize`|The number of 
threads used by the `ScheduledExecutorService` that executes `DutiesRunnable` 
objects|1|
 |`druid.coordinator.period`|The run period for the Coordinator. The 
Coordinator operates by maintaining the current state of the world in memory 
and periodically looking at the set of "used" segments and segments being 
served to make decisions about whether any changes need to be made to the data 
topology. This property sets the delay between each of these runs.|PT60S|
+|`druid.coordinator.loadPrimaryReplicantSeparately`|Flag that indicates if the 
Coordinator should put primary replicant loading for used segments on its own 
event loop. This will require more memory and CPU to be used by the 
coordinator, but will ensure that the loading of primary replicas for used 
segments is not blocked by other coordinator jobs. The default of false is 
likely fine for most clusters. Enabling it could make sense if you regularly 
have to wait longer than you'd like for coordination cycles to complete before 
primary replicants are loaded for used segments that are not being served by 
the cluster.|false|

Review comment:
       let's include a note that if you set this to true, you will also want to 
bump the threadPoolSize to make it worthwhile. A more involved question would 
be, do we want to add a precondition to coordinator startup that if this is 
true, threadPoolSize must be `> 1`? That would prevent operators from 
mistakenly misconfiguring their cluster

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -1017,9 +1137,11 @@ void 
stopPeonsForDisappearedServers(List<ImmutableDruidServer> servers)
         disappeared.remove(server.getName());
       }
       for (String name : disappeared) {
-        log.debug("Removing listener for server[%s] which is no longer 
there.", name);
         LoadQueuePeon peon = loadManagementPeons.remove(name);
-        peon.stop();
+        if (null != peon) {

Review comment:
       nit: comment about the race condition we found might be worthwhile here 
to fill future readers in on why this null check was found

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
##########
@@ -130,6 +130,12 @@ private int getLoadingReplicants(SegmentId segmentId)
     return retVal;
   }
 
+  public Map<String, Integer> getLoadingTiers(SegmentId segmentId)

Review comment:
       javadoc could be helpful

##########
File path: 
server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
##########
@@ -272,7 +273,7 @@ public void 
testDecommissioningParametersBackwardCompatibility() throws Exceptio
         0,
         false,
         false,
-        Integer.MAX_VALUE
+        0

Review comment:
       any specific reasoning behind flipping this value around in some of the 
tests?

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -900,6 +1012,7 @@ public void run()
           );
         }
 
+        log.info("Duties group %s starting", dutiesRunnableAlias);

Review comment:
       This could be candidate for debug level

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -665,68 +681,91 @@ private void becomeLeader()
       final int startingLeaderCounter = coordLeaderSelector.localTerm();
 
       final List<Pair<? extends DutiesRunnable, Duration>> dutiesRunnables = 
new ArrayList<>();
+      LoadRule.LoadRuleMode historicalManagementDutiesLoadRuleMode = 
LoadRule.LoadRuleMode.ALL;
+      if (config.isLoadPrimaryReplicantSeparately()) {
+        dutiesRunnables.add(
+            Pair.of(
+                new DutiesRunnable(
+                    makePrimaryReplicantManagementDuties(),
+                    startingLeaderCounter,
+                    PRIMARY_REPLICANT_LOADING_DUTIES_GROUP,
+                    RunRules.RunRulesMode.LOAD_RULE_ONLY,
+                    LoadRule.LoadRuleMode.PRIMARY_ONLY
+                ),
+                config.getCoordinatorPrimaryReplicantLoaderPeriod()
+            )
+        );
+        historicalManagementDutiesLoadRuleMode = 
LoadRule.LoadRuleMode.REPLICA_ONLY;
+      }
       dutiesRunnables.add(
           Pair.of(
-              new DutiesRunnable(makeHistoricalManagementDuties(), 
startingLeaderCounter, HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP),
+              new DutiesRunnable(
+                  makeHistoricalManagementDuties(),
+                  startingLeaderCounter,
+                  HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP,
+                  historicalManagementDutiesLoadRuleMode
+              ),
               config.getCoordinatorPeriod()
           )
       );
+      //noinspection VariableNotUsedInsideIf
       if (indexingServiceClient != null) {
         dutiesRunnables.add(
             Pair.of(
-                new DutiesRunnable(makeIndexingServiceDuties(), 
startingLeaderCounter, INDEXING_SERVICE_DUTIES_DUTY_GROUP),
+                new DutiesRunnable(
+                    makeIndexingServiceDuties(),
+                    startingLeaderCounter,
+                    INDEXING_SERVICE_DUTIES_DUTY_GROUP
+                ),
                 config.getCoordinatorIndexingPeriod()
             )
         );
       }
       dutiesRunnables.add(
           Pair.of(
-              new DutiesRunnable(makeMetadataStoreManagementDuties(), 
startingLeaderCounter, METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP),
+              new DutiesRunnable(
+                  makeMetadataStoreManagementDuties(),
+                  startingLeaderCounter,
+                  METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP
+              ),
               config.getCoordinatorMetadataStoreManagementPeriod()
           )
       );
 
       for (CoordinatorCustomDutyGroup customDutyGroup : 
customDutyGroups.getCoordinatorCustomDutyGroups()) {
         dutiesRunnables.add(
             Pair.of(
-                new DutiesRunnable(customDutyGroup.getCustomDutyList(), 
startingLeaderCounter, customDutyGroup.getName()),
+                new DutiesRunnable(
+                    customDutyGroup.getCustomDutyList(),
+                    startingLeaderCounter,
+                    customDutyGroup.getName()
+                ),
                 customDutyGroup.getPeriod()
             )
         );
         log.info(
             "Done making custom coordinator duties %s for group %s",
-            customDutyGroup.getCustomDutyList().stream().map(duty -> 
duty.getClass().getName()).collect(Collectors.toList()),
+            customDutyGroup.getCustomDutyList()
+                           .stream()
+                           .map(duty -> duty.getClass().getName())
+                           .collect(Collectors.toList()),
             customDutyGroup.getName()
         );
       }
 
       for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : 
dutiesRunnables) {
-        // CompactSegmentsDuty can takes a non trival amount of time to 
complete.
-        // Hence, we schedule at fixed rate to make sure the other tasks still 
run at approximately every
-        // config.getCoordinatorIndexingPeriod() period. Note that cautious 
should be taken
-        // if setting config.getCoordinatorIndexingPeriod() lower than the 
default value.
-        ScheduledExecutors.scheduleAtFixedRate(
-            exec,
-            config.getCoordinatorStartDelay(),
-            dutiesRunnable.rhs,
-            new Callable<ScheduledExecutors.Signal>()
-            {
-              private final DutiesRunnable theRunnable = dutiesRunnable.lhs;
-
-              @Override
-              public ScheduledExecutors.Signal call()
-              {
-                if (coordLeaderSelector.isLeader() && startingLeaderCounter == 
coordLeaderSelector.localTerm()) {
-                  theRunnable.run();
-                }
-                if (coordLeaderSelector.isLeader()
-                    && startingLeaderCounter == 
coordLeaderSelector.localTerm()) { // (We might no longer be leader)
-                  return ScheduledExecutors.Signal.REPEAT;
-                } else {
-                  return ScheduledExecutors.Signal.STOP;
-                }
+        // Note that caution should be taken if setting 
config.getCoordinatorIndexingPeriod() lower than the default

Review comment:
       Retaining the comments from master may be preferred here

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -665,68 +681,91 @@ private void becomeLeader()
       final int startingLeaderCounter = coordLeaderSelector.localTerm();
 
       final List<Pair<? extends DutiesRunnable, Duration>> dutiesRunnables = 
new ArrayList<>();
+      LoadRule.LoadRuleMode historicalManagementDutiesLoadRuleMode = 
LoadRule.LoadRuleMode.ALL;
+      if (config.isLoadPrimaryReplicantSeparately()) {
+        dutiesRunnables.add(
+            Pair.of(
+                new DutiesRunnable(
+                    makePrimaryReplicantManagementDuties(),
+                    startingLeaderCounter,
+                    PRIMARY_REPLICANT_LOADING_DUTIES_GROUP,
+                    RunRules.RunRulesMode.LOAD_RULE_ONLY,
+                    LoadRule.LoadRuleMode.PRIMARY_ONLY
+                ),
+                config.getCoordinatorPrimaryReplicantLoaderPeriod()
+            )
+        );
+        historicalManagementDutiesLoadRuleMode = 
LoadRule.LoadRuleMode.REPLICA_ONLY;
+      }
       dutiesRunnables.add(
           Pair.of(
-              new DutiesRunnable(makeHistoricalManagementDuties(), 
startingLeaderCounter, HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP),
+              new DutiesRunnable(
+                  makeHistoricalManagementDuties(),
+                  startingLeaderCounter,
+                  HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP,
+                  historicalManagementDutiesLoadRuleMode
+              ),
               config.getCoordinatorPeriod()
           )
       );
+      //noinspection VariableNotUsedInsideIf
       if (indexingServiceClient != null) {
         dutiesRunnables.add(
             Pair.of(
-                new DutiesRunnable(makeIndexingServiceDuties(), 
startingLeaderCounter, INDEXING_SERVICE_DUTIES_DUTY_GROUP),
+                new DutiesRunnable(
+                    makeIndexingServiceDuties(),
+                    startingLeaderCounter,
+                    INDEXING_SERVICE_DUTIES_DUTY_GROUP
+                ),
                 config.getCoordinatorIndexingPeriod()
             )
         );
       }
       dutiesRunnables.add(
           Pair.of(
-              new DutiesRunnable(makeMetadataStoreManagementDuties(), 
startingLeaderCounter, METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP),
+              new DutiesRunnable(
+                  makeMetadataStoreManagementDuties(),
+                  startingLeaderCounter,
+                  METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP
+              ),
               config.getCoordinatorMetadataStoreManagementPeriod()
           )
       );
 
       for (CoordinatorCustomDutyGroup customDutyGroup : 
customDutyGroups.getCoordinatorCustomDutyGroups()) {
         dutiesRunnables.add(
             Pair.of(
-                new DutiesRunnable(customDutyGroup.getCustomDutyList(), 
startingLeaderCounter, customDutyGroup.getName()),
+                new DutiesRunnable(
+                    customDutyGroup.getCustomDutyList(),
+                    startingLeaderCounter,
+                    customDutyGroup.getName()
+                ),
                 customDutyGroup.getPeriod()
             )
         );
         log.info(
             "Done making custom coordinator duties %s for group %s",
-            customDutyGroup.getCustomDutyList().stream().map(duty -> 
duty.getClass().getName()).collect(Collectors.toList()),
+            customDutyGroup.getCustomDutyList()
+                           .stream()
+                           .map(duty -> duty.getClass().getName())
+                           .collect(Collectors.toList()),
             customDutyGroup.getName()
         );
       }
 
       for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : 
dutiesRunnables) {
-        // CompactSegmentsDuty can takes a non trival amount of time to 
complete.
-        // Hence, we schedule at fixed rate to make sure the other tasks still 
run at approximately every
-        // config.getCoordinatorIndexingPeriod() period. Note that cautious 
should be taken
-        // if setting config.getCoordinatorIndexingPeriod() lower than the 
default value.
-        ScheduledExecutors.scheduleAtFixedRate(
-            exec,
-            config.getCoordinatorStartDelay(),
-            dutiesRunnable.rhs,
-            new Callable<ScheduledExecutors.Signal>()
-            {
-              private final DutiesRunnable theRunnable = dutiesRunnable.lhs;
-
-              @Override
-              public ScheduledExecutors.Signal call()
-              {
-                if (coordLeaderSelector.isLeader() && startingLeaderCounter == 
coordLeaderSelector.localTerm()) {
-                  theRunnable.run();
-                }
-                if (coordLeaderSelector.isLeader()
-                    && startingLeaderCounter == 
coordLeaderSelector.localTerm()) { // (We might no longer be leader)
-                  return ScheduledExecutors.Signal.REPEAT;
-                } else {
-                  return ScheduledExecutors.Signal.STOP;
-                }
+        // Note that caution should be taken if setting 
config.getCoordinatorIndexingPeriod() lower than the default
+        // value.
+        exec.scheduleAtFixedRate(

Review comment:
       this block of code may need closer look. The previous code had an 
explicit STOP if not leader, want to make sure that is actually no longer 
needed with this change instead of left out on accident

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -756,6 +795,15 @@ private void stopBeingLeader()
     }
   }
 
+  private List<CoordinatorDuty> makePrimaryReplicantManagementDuties()

Review comment:
       quick javadoc calling out what these duties are for could be helpful

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -924,6 +1038,7 @@ public void run()
                 .setDimension(DruidMetrics.DUTY_GROUP, dutiesRunnableAlias)
                 .build("coordinator/global/time", 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - globalStart))
         );
+        log.info("Duties group %s complete", dutiesRunnableAlias);

Review comment:
       same debug level comment as above

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -665,68 +681,91 @@ private void becomeLeader()
       final int startingLeaderCounter = coordLeaderSelector.localTerm();
 
       final List<Pair<? extends DutiesRunnable, Duration>> dutiesRunnables = 
new ArrayList<>();
+      LoadRule.LoadRuleMode historicalManagementDutiesLoadRuleMode = 
LoadRule.LoadRuleMode.ALL;

Review comment:
       ```suggestion
         // By default, the historical management duties runnable will handle 
all segment loading, regardless of primary/non-primary replicant status. 
However, if the operator enables primary replicant loading, this will flip to 
only load non-primary replicants since all primary replicants will be loaded by 
the dedicated duties runnable instantiated below.
         LoadRule.LoadRuleMode historicalManagementDutiesLoadRuleMode = 
LoadRule.LoadRuleMode.ALL;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to