kfaraz commented on code in PR #17535:
URL: https://github.com/apache/druid/pull/17535#discussion_r1882116827
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -212,19 +224,33 @@ public void start()
public void stop()
{
Preconditions.checkState(started, "SupervisorManager not started");
-
+ List<ListenableFuture<Void>> stopFutures = new ArrayList<>();
synchronized (lock) {
for (String id : supervisors.keySet()) {
- try {
- supervisors.get(id).lhs.stop(false);
- SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
- if (autoscaler != null) {
- autoscaler.stop();
+ stopFutures.add(shutdownExec.submit(() -> {
+ try {
+ supervisors.get(id).lhs.stop(false);
+ SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
+ if (autoscaler != null) {
+ autoscaler.stop();
+ }
}
- }
- catch (Exception e) {
- log.warn(e, "Caught exception while stopping supervisor [%s]", id);
- }
+ catch (Exception e) {
+ log.warn(e, "Caught exception while stopping supervisor [%s]", id);
+ }
+ return null;
+ }));
+ }
+ log.info("Waiting for [%d] supervisors to shutdown", stopFutures.size());
+ try {
+ FutureUtils.coalesce(stopFutures).get(80, TimeUnit.SECONDS);
+ }
+ catch (Exception e) {
+ log.warn(
Review Comment:
Maybe log the exception too just in case it is something other than a
timeout.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -56,13 +64,17 @@ public class SupervisorManager
// SupervisorTaskAutoScaler could be null
private final ConcurrentHashMap<String, SupervisorTaskAutoScaler>
autoscalers = new ConcurrentHashMap<>();
private final Object lock = new Object();
+ private final ListeningExecutorService shutdownExec;
private volatile boolean started = false;
@Inject
public SupervisorManager(MetadataSupervisorManager metadataSupervisorManager)
{
this.metadataSupervisorManager = metadataSupervisorManager;
+ this.shutdownExec = MoreExecutors.listeningDecorator(
+ Execs.multiThreaded(25, "supervisor-manager-shutdown-%d")
Review Comment:
25 maybe excessive in some cases and inadequate in others. Maybe initialize
the executor lazily inside the `stop()` method, then the number of required
threads can be computed at run time. The `shutdownExec` need not be a
class-level field either.
---
Alternatively, instead of using a completely new executor, you could
consider using the `scheduledExec` inside each supervisor. That executor
basically just sits idle most of the time and is responsible only for
submitting `RunNotice` to the notice queue.
You could add a `stopAsync` method to `SeekableStreamSupervisor` that does
the following:
- returns a future that we coalesce and wait upon
- internally submits a runnable to the `scheduledExec` to perform the actual
`stop`
I guess the only thing we will miss out on is parallelizing the
`autoscaler.stop()` which should not be a concern, I guess?
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -212,19 +224,33 @@ public void start()
public void stop()
{
Preconditions.checkState(started, "SupervisorManager not started");
-
+ List<ListenableFuture<Void>> stopFutures = new ArrayList<>();
synchronized (lock) {
for (String id : supervisors.keySet()) {
- try {
- supervisors.get(id).lhs.stop(false);
- SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
- if (autoscaler != null) {
- autoscaler.stop();
+ stopFutures.add(shutdownExec.submit(() -> {
+ try {
+ supervisors.get(id).lhs.stop(false);
+ SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
+ if (autoscaler != null) {
+ autoscaler.stop();
+ }
}
- }
- catch (Exception e) {
- log.warn(e, "Caught exception while stopping supervisor [%s]", id);
- }
+ catch (Exception e) {
+ log.warn(e, "Caught exception while stopping supervisor [%s]", id);
+ }
+ return null;
+ }));
+ }
+ log.info("Waiting for [%d] supervisors to shutdown", stopFutures.size());
+ try {
+ FutureUtils.coalesce(stopFutures).get(80, TimeUnit.SECONDS);
Review Comment:
I don't think we should use a timeout of 80s here since each supervisor
could have a different value of shutdown timeout. We could either just do
`get()` (which would be no worse than what we are already doing) or do a
`get()` with a longer timeout.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1076,6 +1076,10 @@ public void stop(boolean stopGracefully)
long shutdownTimeoutMillis =
tuningConfig.getShutdownTimeout().getMillis();
long endTime = System.currentTimeMillis() + shutdownTimeoutMillis;
while (!stopped) {
+ if (!stopGracefully) {
+ stopped = true;
+ break;
+ }
Review Comment:
If we have already parallelized the `stop` of supervisors, is this still
needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]