svalaskevicius commented on code in PR #15312:
URL: https://github.com/apache/iceberg/pull/15312#discussion_r2878160945


##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -148,9 +152,99 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * ensure the pool terminates when the JVM exits. This is suitable for 
long-lived thread pools
    * that should be automatically cleaned up on JVM shutdown.
    */
-  public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+  public static synchronized ExecutorService newExitingWorkerPool(String 
namePrefix, int poolSize) {
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(new ExecutorServiceWithTimeout(service, 
DEFAULT_SHUTDOWN_TIMEOUT));
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   *
+   * <p>This method allows: (1) to stop thread pools manually, to avoid leaks 
in hot-reload
+   * environments; (2) opt-out of the standard shutdown mechanism to manage 
graceful service stops
+   * (and commit the last pending files, if the client application needs to 
react to shutdown hooks
+   * on its own).
+   *
+   * <p>Please only call this method at the end of the intended usage of the 
library, and NEVER
+   * before, as this method will stop thread pools required for normal library 
workflows.
+   */
+  public static synchronized void shutdownThreadPools() {
+    removeShutdownHook();
+    long startTime = System.nanoTime();
+    List<ExecutorServiceWithTimeout> pendingShutdown = Lists.newArrayList();
+    for (ExecutorServiceWithTimeout item : THREAD_POOLS_TO_SHUTDOWN) {
+      item.getService().shutdown();
+      pendingShutdown.add(item);
+    }
+    THREAD_POOLS_TO_SHUTDOWN.clear();
+    for (ExecutorServiceWithTimeout item : pendingShutdown) {
+      long timeElapsed = System.nanoTime() - startTime;
+      long remainingTime = item.getTimeout().toNanos() - timeElapsed;
+      if (remainingTime > 0) {
+        try {
+          if (!item.service.awaitTermination(remainingTime, 
TimeUnit.NANOSECONDS)) {
+            item.getService().shutdownNow();
+          }
+        } catch (InterruptedException ignored) {
+          // We're shutting down anyway, so just ignore.
+        }
+      } else {
+        item.getService().shutdownNow();
+      }
+    }
+  }
+
+  /**
+   * Initialize a shutdown hook to stop the thread pools created via the {@link
+   * #newExitingWorkerPool(String, int)}.
+   */
+  @SuppressWarnings("ShutdownHook")
+  private static synchronized void initShutdownHook() {
+    if (shutdownHook == null) {
+      shutdownHook =
+          Executors.defaultThreadFactory()
+              .newThread(
+                  new Runnable() {
+                    @Override
+                    public void run() {
+                      shutdownHook = null;
+                      shutdownThreadPools();
+                    }
+                  });
+
+      try {
+        shutdownHook.setName("DelayedShutdownHook-iceberg");
+      } catch (SecurityException e) {
+        LOG.warn("Cannot set thread name for the shutdown hook", e);
+      }
+
+      try {
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+      } catch (SecurityException e) {
+        LOG.warn("Cannot install a shutdown hook for thread pools clean up", 
e);
+      }
+    }
+  }
+
+  /**
+   * Stop the shutdown hook for the thread pools created via the {@link
+   * #newExitingWorkerPool(String, int)}.
+   *
+   * <p>Thread pools can still be stopped manually via the {@link 
#shutdownThreadPools()} method.
+   */
+  @SuppressWarnings("ShutdownHook")
+  public static synchronized void removeShutdownHook() {

Review Comment:
   i.e. the error I am trying to solve for our application is that on shutdown, 
iceberg library starts throwing all sorts of exceptions how all threadpools are 
already shut down. JVM does not allow to control the order of shutdown hooks, 
so it's either this change in iceberg or runtime magic within JVM...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to