svalaskevicius commented on code in PR #15312:
URL: https://github.com/apache/iceberg/pull/15312#discussion_r2878497196
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -148,9 +152,99 @@ public static ExecutorService newWorkerPool(String
namePrefix, int poolSize) {
* ensure the pool terminates when the JVM exits. This is suitable for
long-lived thread pools
* that should be automatically cleaned up on JVM shutdown.
*/
- public static ExecutorService newExitingWorkerPool(String namePrefix, int
poolSize) {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+ public static synchronized ExecutorService newExitingWorkerPool(String
namePrefix, int poolSize) {
+ ExecutorService service =
+ Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix,
poolSize));
+ THREAD_POOLS_TO_SHUTDOWN.add(new ExecutorServiceWithTimeout(service,
DEFAULT_SHUTDOWN_TIMEOUT));
+ return service;
+ }
+
+ /**
+ * Force manual shutdown of the thread pools created via the {@link
#newExitingWorkerPool(String,
+ * int)}.
+ *
+ * <p>This method allows: (1) to stop thread pools manually, to avoid leaks
in hot-reload
+ * environments; (2) opt-out of the standard shutdown mechanism to manage
graceful service stops
+ * (and commit the last pending files, if the client application needs to
react to shutdown hooks
+ * on its own).
+ *
+ * <p>Please only call this method at the end of the intended usage of the
library, and NEVER
+ * before, as this method will stop thread pools required for normal library
workflows.
+ */
+ public static synchronized void shutdownThreadPools() {
+ removeShutdownHook();
+ long startTime = System.nanoTime();
+ List<ExecutorServiceWithTimeout> pendingShutdown = Lists.newArrayList();
+ for (ExecutorServiceWithTimeout item : THREAD_POOLS_TO_SHUTDOWN) {
+ item.getService().shutdown();
+ pendingShutdown.add(item);
+ }
+ THREAD_POOLS_TO_SHUTDOWN.clear();
+ for (ExecutorServiceWithTimeout item : pendingShutdown) {
+ long timeElapsed = System.nanoTime() - startTime;
+ long remainingTime = item.getTimeout().toNanos() - timeElapsed;
+ if (remainingTime > 0) {
+ try {
+ if (!item.service.awaitTermination(remainingTime,
TimeUnit.NANOSECONDS)) {
+ item.getService().shutdownNow();
+ }
+ } catch (InterruptedException ignored) {
+ // We're shutting down anyway, so just ignore.
+ }
+ } else {
+ item.getService().shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Initialize a shutdown hook to stop the thread pools created via the {@link
+ * #newExitingWorkerPool(String, int)}.
+ */
+ @SuppressWarnings("ShutdownHook")
+ private static synchronized void initShutdownHook() {
+ if (shutdownHook == null) {
+ shutdownHook =
+ Executors.defaultThreadFactory()
+ .newThread(
+ new Runnable() {
+ @Override
+ public void run() {
+ shutdownHook = null;
+ shutdownThreadPools();
+ }
+ });
+
+ try {
+ shutdownHook.setName("DelayedShutdownHook-iceberg");
+ } catch (SecurityException e) {
+ LOG.warn("Cannot set thread name for the shutdown hook", e);
+ }
+
+ try {
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ } catch (SecurityException e) {
+ LOG.warn("Cannot install a shutdown hook for thread pools clean up",
e);
+ }
+ }
+ }
+
+ /**
+ * Stop the shutdown hook for the thread pools created via the {@link
+ * #newExitingWorkerPool(String, int)}.
+ *
+ * <p>Thread pools can still be stopped manually via the {@link
#shutdownThreadPools()} method.
+ */
+ @SuppressWarnings("ShutdownHook")
+ public static synchronized void removeShutdownHook() {
Review Comment:
@mxm please can you confirm you're ok with the above?
because the ability to keep the iceberg library working (threads running)
while our application is already running a shutdown hook handler is crucial for
us and the main reason why I am working on this PR (at the same time solving
the other use case which asks to terminate threads w/o JVM exit)
ofc I would also be happy if you want to propose an alternative way to solve
this problem, although I think having this method public solves it in an easy
way and is compatible with the overall improvements
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]