mxm commented on code in PR #15312:
URL: https://github.com/apache/iceberg/pull/15312#discussion_r2841704066
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,98 @@ public static ExecutorService newWorkerPool(String
namePrefix, int poolSize) {
* that should be automatically cleaned up on JVM shutdown.
*/
public static ExecutorService newExitingWorkerPool(String namePrefix, int
poolSize) {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+ ExecutorService service =
+ Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix,
poolSize));
+ THREAD_POOLS_TO_SHUTDOWN.add(service);
+ return service;
+ }
+
+ /**
+ * Force manual shutdown of the thread pools created via the {@link
#newExitingWorkerPool(String,
+ * int)}.
+ *
+ * <p>This method allows: (1) to stop thread pools manually, to avoid leaks
in hot-reload
+ * environments; (2) opt-out of the standard shutdown mechanism to manage
graceful service stops
+ * (and commit the last pending files, if the client application needs to
react to shutdown hooks
+ * on its own).
+ *
+ * <p>Please only call this method at the end of the intended usage of the
library, and NEVER
+ * before, as this method will stop thread pools required for normal library
workflows.
+ */
+ public static void shutdownThreadPools() {
+ removeShutdownHook();
+ long startTime = System.nanoTime();
+ ExecutorService item;
+ Queue<ExecutorService> pendingShutdown = new ArrayDeque<>();
+ while ((item = THREAD_POOLS_TO_SHUTDOWN.poll()) != null) {
+ item.shutdown();
+ pendingShutdown.add(item);
+ }
+ while ((item = pendingShutdown.poll()) != null) {
+ long timeElapsed = System.nanoTime() - startTime;
+ long remainingTime = SHUTDOWN_TIMEOUT.toNanos() - timeElapsed;
+ if (remainingTime > 0) {
+ try {
+ if (!item.awaitTermination(remainingTime, TimeUnit.NANOSECONDS)) {
+ item.shutdownNow();
+ }
+ } catch (InterruptedException ignored) {
+ // We're shutting down anyway, so just ignore.
+ }
+ } else {
+ item.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Initialize a shutdown hook to stop the thread pools created via the {@link
+ * #newExitingWorkerPool(String, int)}.
+ */
+ @SuppressWarnings("ShutdownHook")
+ private static void initShutdownHook() {
+ if (shutdownHook == null) {
+ shutdownHook =
+ Executors.defaultThreadFactory()
+ .newThread(
+ new Runnable() {
+ @Override
+ public void run() {
+ shutdownHook = null;
+ shutdownThreadPools();
+ }
+ });
+
+ try {
+ shutdownHook.setName("DelayedShutdownHook-iceberg");
+ } catch (SecurityException e) {
+ LOG.warn("Cannot set thread name for the shutdown hook", e);
+ }
+
+ try {
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ } catch (SecurityException e) {
+ LOG.warn("Cannot install a shutdown hook for thread pools clean up",
e);
+ }
+ }
+ }
+
+ /**
+ * Stop the shutdown hook for the thread pools created via the {@link
+ * #newExitingWorkerPool(String, int)}.
+ *
+ * <p>Thread pools can still be stopped manually via the {@link
#shutdownThreadPools()} method.
+ */
+ @SuppressWarnings("ShutdownHook")
+ public static void removeShutdownHook() {
+ if (shutdownHook != null) {
+ try {
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
+ } catch (SecurityException e) {
+ LOG.warn("Cannot remove the shutdown hook for thread pools clean up",
e);
+ }
+ shutdownHook = null;
Review Comment:
This is not safe when called concurrently and can produce
`NullPointerException` when removing the shutdown hook if two threads execute
this at the same time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]