divijvaidya commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1182398774


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -670,6 +875,14 @@ public void close() {
                 } catch (InterruptedException e) {
                     // ignore
                 }
+                remoteStorageReaderThreadPool.shutdownNow();
+                //waits for 2 mins to terminate the current tasks
+                try {
+                    remoteStorageReaderThreadPool.awaitTermination(2, 
TimeUnit.MINUTES);

Review Comment:
   How did we decide on 2min. here? I don't think we should block shutdown of 
the broker on this over here because there are other limits associated with 
clean vs unclean shutdown. If we do plan to block, we should tie it to overall 
shutdown timeout. As an example, clean shutdown is expected to be completed in 
5 min. see `lifecycleManager.controlledShutdownFuture.get(5L, 
TimeUnit.MINUTES)` in BrokerServer.scala.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -670,6 +875,14 @@ public void close() {
                 } catch (InterruptedException e) {
                     // ignore
                 }
+                remoteStorageReaderThreadPool.shutdownNow();

Review Comment:
   1. Should we try to complete the pending reads first using `shutdown()` 
instead of interrupting them via `shutdownNow()`? This would potentially reduce 
abrupt behaviour if RSM doesn't handle interruption correctly.
   
   I will suggest the following pattern (also see `shutdownAndAwaitTermination` 
at 
https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html)
   ```
           remoteStorageReaderThreadPool.shutdown(); // stop accepting new 
requests to this pool
   
           try {
                   // wait until the pool has terminated
                   if(!remoteStorageReaderThreadPool.awaitTermination(2, 
TimeUnit.MINUTES)) {
                       LOGGER.warn("Shutdown for remoteStorageReaderThreadPool 
didn't complete all tasks.");
                       remoteStorageReaderThreadPool.shutdownNow();    
                   }
           } catch (InterruptedException e) {
               remoteStorageReaderThreadPool.shutdownNow();
               LOGGER.warn("Shutdown for remoteStorageReaderThreadPool 
interrupted");
           }
   
           LOGGER.info("RemoteStorageReaderThreadPool stopped.");
   ```
   
   2. I would also suggest to explain why aren't we re-interrupting the current 
thread using Thread.currentThread().interrupt().
   
   3. Please add warn and info logs that help debug the correct order of 
shutdown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to