capistrant commented on a change in pull request #12013:
URL: https://github.com/apache/druid/pull/12013#discussion_r762099478
##########
File path: docs/configuration/index.md
##########
@@ -781,7 +781,10 @@ These Coordinator static configurations can be defined in
the `coordinator/runti
|Property|Description|Default|
|--------|-----------|-------|
+|`druid.coordinator.dutiesRunnableExecutor.threadPoolSize`|The number of
threads used by the `ScheduledExecutorService` that executes `DutiesRunnable`
objects|1|
|`druid.coordinator.period`|The run period for the Coordinator. The
Coordinator operates by maintaining the current state of the world in memory
and periodically looking at the set of "used" segments and segments being
served to make decisions about whether any changes need to be made to the data
topology. This property sets the delay between each of these runs.|PT60S|
+|`druid.coordinator.loadPrimaryReplicantSeparately`|Flag that indicates if the
Coordinator should put primary replicant loading for used segments on its own
event loop. This will require more memory and CPU to be used by the
coordinator, but will ensure that the loading of primary replicas for used
segments is not blocked by other coordinator jobs. The default of false is
likely fine for most clusters. Enabling it could make sense if you regularly
have to wait longer than you'd like for coordination cycles to complete before
primary replicants are loaded for used segments that are not being served by
the cluster.|false|
Review comment:
let's include a note that if you set this to true, you will also want to
bump the threadPoolSize to make it worthwhile. A more involved question would
be, do we want to add a precondition to coordinator startup that if this is
true, threadPoolSize must be `> 1`? That would prevent operators from
mistakenly misconfiguring their cluster
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -1017,9 +1137,11 @@ void
stopPeonsForDisappearedServers(List<ImmutableDruidServer> servers)
disappeared.remove(server.getName());
}
for (String name : disappeared) {
- log.debug("Removing listener for server[%s] which is no longer
there.", name);
LoadQueuePeon peon = loadManagementPeons.remove(name);
- peon.stop();
+ if (null != peon) {
Review comment:
nit: comment about the race condition we found might be worthwhile here
to fill future readers in on why this null check was found
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
##########
@@ -130,6 +130,12 @@ private int getLoadingReplicants(SegmentId segmentId)
return retVal;
}
+ public Map<String, Integer> getLoadingTiers(SegmentId segmentId)
Review comment:
javadoc could be helpful
##########
File path:
server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
##########
@@ -272,7 +273,7 @@ public void
testDecommissioningParametersBackwardCompatibility() throws Exceptio
0,
false,
false,
- Integer.MAX_VALUE
+ 0
Review comment:
any specific reasoning behind flipping this value around in some of the
tests?
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -900,6 +1012,7 @@ public void run()
);
}
+ log.info("Duties group %s starting", dutiesRunnableAlias);
Review comment:
This could be candidate for debug level
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -665,68 +681,91 @@ private void becomeLeader()
final int startingLeaderCounter = coordLeaderSelector.localTerm();
final List<Pair<? extends DutiesRunnable, Duration>> dutiesRunnables =
new ArrayList<>();
+ LoadRule.LoadRuleMode historicalManagementDutiesLoadRuleMode =
LoadRule.LoadRuleMode.ALL;
+ if (config.isLoadPrimaryReplicantSeparately()) {
+ dutiesRunnables.add(
+ Pair.of(
+ new DutiesRunnable(
+ makePrimaryReplicantManagementDuties(),
+ startingLeaderCounter,
+ PRIMARY_REPLICANT_LOADING_DUTIES_GROUP,
+ RunRules.RunRulesMode.LOAD_RULE_ONLY,
+ LoadRule.LoadRuleMode.PRIMARY_ONLY
+ ),
+ config.getCoordinatorPrimaryReplicantLoaderPeriod()
+ )
+ );
+ historicalManagementDutiesLoadRuleMode =
LoadRule.LoadRuleMode.REPLICA_ONLY;
+ }
dutiesRunnables.add(
Pair.of(
- new DutiesRunnable(makeHistoricalManagementDuties(),
startingLeaderCounter, HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP),
+ new DutiesRunnable(
+ makeHistoricalManagementDuties(),
+ startingLeaderCounter,
+ HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP,
+ historicalManagementDutiesLoadRuleMode
+ ),
config.getCoordinatorPeriod()
)
);
+ //noinspection VariableNotUsedInsideIf
if (indexingServiceClient != null) {
dutiesRunnables.add(
Pair.of(
- new DutiesRunnable(makeIndexingServiceDuties(),
startingLeaderCounter, INDEXING_SERVICE_DUTIES_DUTY_GROUP),
+ new DutiesRunnable(
+ makeIndexingServiceDuties(),
+ startingLeaderCounter,
+ INDEXING_SERVICE_DUTIES_DUTY_GROUP
+ ),
config.getCoordinatorIndexingPeriod()
)
);
}
dutiesRunnables.add(
Pair.of(
- new DutiesRunnable(makeMetadataStoreManagementDuties(),
startingLeaderCounter, METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP),
+ new DutiesRunnable(
+ makeMetadataStoreManagementDuties(),
+ startingLeaderCounter,
+ METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP
+ ),
config.getCoordinatorMetadataStoreManagementPeriod()
)
);
for (CoordinatorCustomDutyGroup customDutyGroup :
customDutyGroups.getCoordinatorCustomDutyGroups()) {
dutiesRunnables.add(
Pair.of(
- new DutiesRunnable(customDutyGroup.getCustomDutyList(),
startingLeaderCounter, customDutyGroup.getName()),
+ new DutiesRunnable(
+ customDutyGroup.getCustomDutyList(),
+ startingLeaderCounter,
+ customDutyGroup.getName()
+ ),
customDutyGroup.getPeriod()
)
);
log.info(
"Done making custom coordinator duties %s for group %s",
- customDutyGroup.getCustomDutyList().stream().map(duty ->
duty.getClass().getName()).collect(Collectors.toList()),
+ customDutyGroup.getCustomDutyList()
+ .stream()
+ .map(duty -> duty.getClass().getName())
+ .collect(Collectors.toList()),
customDutyGroup.getName()
);
}
for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable :
dutiesRunnables) {
- // CompactSegmentsDuty can takes a non trival amount of time to
complete.
- // Hence, we schedule at fixed rate to make sure the other tasks still
run at approximately every
- // config.getCoordinatorIndexingPeriod() period. Note that cautious
should be taken
- // if setting config.getCoordinatorIndexingPeriod() lower than the
default value.
- ScheduledExecutors.scheduleAtFixedRate(
- exec,
- config.getCoordinatorStartDelay(),
- dutiesRunnable.rhs,
- new Callable<ScheduledExecutors.Signal>()
- {
- private final DutiesRunnable theRunnable = dutiesRunnable.lhs;
-
- @Override
- public ScheduledExecutors.Signal call()
- {
- if (coordLeaderSelector.isLeader() && startingLeaderCounter ==
coordLeaderSelector.localTerm()) {
- theRunnable.run();
- }
- if (coordLeaderSelector.isLeader()
- && startingLeaderCounter ==
coordLeaderSelector.localTerm()) { // (We might no longer be leader)
- return ScheduledExecutors.Signal.REPEAT;
- } else {
- return ScheduledExecutors.Signal.STOP;
- }
+ // Note that caution should be taken if setting
config.getCoordinatorIndexingPeriod() lower than the default
Review comment:
Retaining the comments from master may be preferred here
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -665,68 +681,91 @@ private void becomeLeader()
final int startingLeaderCounter = coordLeaderSelector.localTerm();
final List<Pair<? extends DutiesRunnable, Duration>> dutiesRunnables =
new ArrayList<>();
+ LoadRule.LoadRuleMode historicalManagementDutiesLoadRuleMode =
LoadRule.LoadRuleMode.ALL;
+ if (config.isLoadPrimaryReplicantSeparately()) {
+ dutiesRunnables.add(
+ Pair.of(
+ new DutiesRunnable(
+ makePrimaryReplicantManagementDuties(),
+ startingLeaderCounter,
+ PRIMARY_REPLICANT_LOADING_DUTIES_GROUP,
+ RunRules.RunRulesMode.LOAD_RULE_ONLY,
+ LoadRule.LoadRuleMode.PRIMARY_ONLY
+ ),
+ config.getCoordinatorPrimaryReplicantLoaderPeriod()
+ )
+ );
+ historicalManagementDutiesLoadRuleMode =
LoadRule.LoadRuleMode.REPLICA_ONLY;
+ }
dutiesRunnables.add(
Pair.of(
- new DutiesRunnable(makeHistoricalManagementDuties(),
startingLeaderCounter, HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP),
+ new DutiesRunnable(
+ makeHistoricalManagementDuties(),
+ startingLeaderCounter,
+ HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP,
+ historicalManagementDutiesLoadRuleMode
+ ),
config.getCoordinatorPeriod()
)
);
+ //noinspection VariableNotUsedInsideIf
if (indexingServiceClient != null) {
dutiesRunnables.add(
Pair.of(
- new DutiesRunnable(makeIndexingServiceDuties(),
startingLeaderCounter, INDEXING_SERVICE_DUTIES_DUTY_GROUP),
+ new DutiesRunnable(
+ makeIndexingServiceDuties(),
+ startingLeaderCounter,
+ INDEXING_SERVICE_DUTIES_DUTY_GROUP
+ ),
config.getCoordinatorIndexingPeriod()
)
);
}
dutiesRunnables.add(
Pair.of(
- new DutiesRunnable(makeMetadataStoreManagementDuties(),
startingLeaderCounter, METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP),
+ new DutiesRunnable(
+ makeMetadataStoreManagementDuties(),
+ startingLeaderCounter,
+ METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP
+ ),
config.getCoordinatorMetadataStoreManagementPeriod()
)
);
for (CoordinatorCustomDutyGroup customDutyGroup :
customDutyGroups.getCoordinatorCustomDutyGroups()) {
dutiesRunnables.add(
Pair.of(
- new DutiesRunnable(customDutyGroup.getCustomDutyList(),
startingLeaderCounter, customDutyGroup.getName()),
+ new DutiesRunnable(
+ customDutyGroup.getCustomDutyList(),
+ startingLeaderCounter,
+ customDutyGroup.getName()
+ ),
customDutyGroup.getPeriod()
)
);
log.info(
"Done making custom coordinator duties %s for group %s",
- customDutyGroup.getCustomDutyList().stream().map(duty ->
duty.getClass().getName()).collect(Collectors.toList()),
+ customDutyGroup.getCustomDutyList()
+ .stream()
+ .map(duty -> duty.getClass().getName())
+ .collect(Collectors.toList()),
customDutyGroup.getName()
);
}
for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable :
dutiesRunnables) {
- // CompactSegmentsDuty can takes a non trival amount of time to
complete.
- // Hence, we schedule at fixed rate to make sure the other tasks still
run at approximately every
- // config.getCoordinatorIndexingPeriod() period. Note that cautious
should be taken
- // if setting config.getCoordinatorIndexingPeriod() lower than the
default value.
- ScheduledExecutors.scheduleAtFixedRate(
- exec,
- config.getCoordinatorStartDelay(),
- dutiesRunnable.rhs,
- new Callable<ScheduledExecutors.Signal>()
- {
- private final DutiesRunnable theRunnable = dutiesRunnable.lhs;
-
- @Override
- public ScheduledExecutors.Signal call()
- {
- if (coordLeaderSelector.isLeader() && startingLeaderCounter ==
coordLeaderSelector.localTerm()) {
- theRunnable.run();
- }
- if (coordLeaderSelector.isLeader()
- && startingLeaderCounter ==
coordLeaderSelector.localTerm()) { // (We might no longer be leader)
- return ScheduledExecutors.Signal.REPEAT;
- } else {
- return ScheduledExecutors.Signal.STOP;
- }
+ // Note that caution should be taken if setting
config.getCoordinatorIndexingPeriod() lower than the default
+ // value.
+ exec.scheduleAtFixedRate(
Review comment:
this block of code may need closer look. The previous code had an
explicit STOP if not leader, want to make sure that is actually no longer
needed with this change instead of left out on accident
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -756,6 +795,15 @@ private void stopBeingLeader()
}
}
+ private List<CoordinatorDuty> makePrimaryReplicantManagementDuties()
Review comment:
quick javadoc calling out what these duties are for could be helpful
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -924,6 +1038,7 @@ public void run()
.setDimension(DruidMetrics.DUTY_GROUP, dutiesRunnableAlias)
.build("coordinator/global/time",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - globalStart))
);
+ log.info("Duties group %s complete", dutiesRunnableAlias);
Review comment:
same debug level comment as above
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -665,68 +681,91 @@ private void becomeLeader()
final int startingLeaderCounter = coordLeaderSelector.localTerm();
final List<Pair<? extends DutiesRunnable, Duration>> dutiesRunnables =
new ArrayList<>();
+ LoadRule.LoadRuleMode historicalManagementDutiesLoadRuleMode =
LoadRule.LoadRuleMode.ALL;
Review comment:
```suggestion
// By default, the historical management duties runnable will handle
all segment loading, regardless of primary/non-primary replicant status.
However, if the operator enables primary replicant loading, this will flip to
only load non-primary replicants since all primary replicants will be loaded by
the dedicated duties runnable instantiated below.
LoadRule.LoadRuleMode historicalManagementDutiesLoadRuleMode =
LoadRule.LoadRuleMode.ALL;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]