svalaskevicius commented on code in PR #15312:
URL: https://github.com/apache/iceberg/pull/15312#discussion_r2878232437


##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -148,9 +152,99 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * ensure the pool terminates when the JVM exits. This is suitable for 
long-lived thread pools
    * that should be automatically cleaned up on JVM shutdown.
    */
-  public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+  public static synchronized ExecutorService newExitingWorkerPool(String 
namePrefix, int poolSize) {
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(new ExecutorServiceWithTimeout(service, 
DEFAULT_SHUTDOWN_TIMEOUT));
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   *
+   * <p>This method allows: (1) to stop thread pools manually, to avoid leaks 
in hot-reload
+   * environments; (2) opt-out of the standard shutdown mechanism to manage 
graceful service stops
+   * (and commit the last pending files, if the client application needs to 
react to shutdown hooks
+   * on its own).
+   *
+   * <p>Please only call this method at the end of the intended usage of the 
library, and NEVER
+   * before, as this method will stop thread pools required for normal library 
workflows.
+   */
+  public static synchronized void shutdownThreadPools() {
+    removeShutdownHook();
+    long startTime = System.nanoTime();
+    List<ExecutorServiceWithTimeout> pendingShutdown = Lists.newArrayList();
+    for (ExecutorServiceWithTimeout item : THREAD_POOLS_TO_SHUTDOWN) {
+      item.getService().shutdown();
+      pendingShutdown.add(item);
+    }
+    THREAD_POOLS_TO_SHUTDOWN.clear();
+    for (ExecutorServiceWithTimeout item : pendingShutdown) {
+      long timeElapsed = System.nanoTime() - startTime;
+      long remainingTime = item.getTimeout().toNanos() - timeElapsed;
+      if (remainingTime > 0) {
+        try {
+          if (!item.service.awaitTermination(remainingTime, 
TimeUnit.NANOSECONDS)) {
+            item.getService().shutdownNow();
+          }
+        } catch (InterruptedException ignored) {
+          // We're shutting down anyway, so just ignore.
+        }
+      } else {
+        item.getService().shutdownNow();
+      }
+    }
+  }
+
+  /**
+   * Initialize a shutdown hook to stop the thread pools created via the {@link
+   * #newExitingWorkerPool(String, int)}.
+   */
+  @SuppressWarnings("ShutdownHook")
+  private static synchronized void initShutdownHook() {
+    if (shutdownHook == null) {
+      shutdownHook =
+          Executors.defaultThreadFactory()
+              .newThread(
+                  new Runnable() {
+                    @Override
+                    public void run() {
+                      shutdownHook = null;
+                      shutdownThreadPools();
+                    }
+                  });
+
+      try {
+        shutdownHook.setName("DelayedShutdownHook-iceberg");
+      } catch (SecurityException e) {
+        LOG.warn("Cannot set thread name for the shutdown hook", e);
+      }
+
+      try {
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+      } catch (SecurityException e) {
+        LOG.warn("Cannot install a shutdown hook for thread pools clean up", 
e);
+      }
+    }
+  }
+
+  /**
+   * Stop the shutdown hook for the thread pools created via the {@link
+   * #newExitingWorkerPool(String, int)}.
+   *
+   * <p>Thread pools can still be stopped manually via the {@link 
#shutdownThreadPools()} method.
+   */
+  @SuppressWarnings("ShutdownHook")
+  public static synchronized void removeShutdownHook() {

Review Comment:
   to explain better. we write a new iceberg file periodically - and create a 
new one for the current window. if our application gets restarted, we want to 
write (commit / flush) the currently open window. 
   
   so we have a handler for the shutdown hook, that completes the writes for 
the iceberg library (and adds the new file to aws glue catalog). but if jvm has 
killed all the threadpools - we cannot.
   
   we the idea is that in our use case, we DO NOT want iceberg library to use 
any shutdown hook. instead, we will unregister the default behaviour (opt out) 
and invoke the shutting down only when our application is ready to do it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to