+1 for Steven's suggestion. The `newFixedThreadPool` is slightly better IMHO, but I would be happy with both solutions
On Thu, Sep 26, 2024, 02:06 Steven Wu <stevenz...@gmail.com> wrote: > > First, we should definitely add Javadoc to `ThreadPools.newWorkerPool` on > its behavior with a shutdown hook. It is not obvious from the method name. > I would actually go further to deprecate `newWorkerPool` with > `newExitingWorkerPool`. `newWorkerPool` method name is easy to cause the > misuage, as the intention is not obvious from the name. > > `*newNonExitingWorkerPool*` is a little awkward to me. `NonExiting` > should be the default behavior. Maybe we can call this new method as > `newFixedThreadPool(int poolSize, String prefix)`. Alternatively, we can > just make `ThreadPools.newDaemonThreadFactory` public as the proposed ` > *newNonExitingWorkerPool` *really just saved one line on the thread > factory construction. > > > On Wed, Sep 18, 2024 at 10:25 PM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Here are the cases where we call the `newWorkerPool` in our code: >> >> - Correctly: >> - S3FileIO.executorService >> - HadoopFileIO.executorService >> - Incorrectly: >> - CountersBenchmark.defaultCounterMultipleThreads (core module) >> - BaseDistributedDataScan.newMonitorPool (core module) >> - FlinkInputFormat.createInputSplits (flink module) >> - IcebergInputFormat.getSplits (flink module) >> - Incorrectly, but typically called only once in the JVM lifecycle >> - TableMigrationUtil.migrationService - the pool management is >> abandoned, and nothing prevents multiple pool creations (data module) >> - IcebergCommitter<init> (flink module) >> IcebergFilesCommitter.open (flink module) >> - IcebergSource.planSplitsForBatch (flink module) >> - StreamingMonitorFunction.open (flink module) >> - ContinuousSplitPlannerImpl<init> (flink module) >> - Coordinator<init> - Kafka coordinator - I'm not sure that this >> belongs to here (kafka-connect) >> >> The code we need to duplicate in core/data/flink/kafka module is: >> >> * public static ExecutorService newNonExitingWorkerPool(String >> namePrefix, int poolSize) {* >> * return Executors.newFixedThreadPool(* >> * poolSize,* >> * new >> ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix + >> "-%d").build());* >> * }* >> >> >> Maybe adding another utility method to the `ThreadPools` would help >> future contributors to think twice about the need for using the `Exiting` >> solution, so I would prefer to add this method to the core `ThreadPools` >> with enough javadoc to highlight the intended usage. >> >> Thanks, >> Peter >> >> rdb...@gmail.com <rdb...@gmail.com> ezt írta (időpont: 2024. szept. 18., >> Sze, 23:26): >> >>> I think this is the intended behavior. The code calls >>> `MoreExecutors.getExitingExecutorService` internally to ensure the pool >>> exits. I think the right fix is for callers to create their own >>> `ExecutorService` rather than using `newWorkerPool`. That allows for >>> customization without making Iceberg more complicated. `ThreadPools` isn't >>> doing anything special here. It's just a convenience method to create an >>> exiting, fixed-size thread pool that runs daemon threads. If that's not >>> what you're looking for then isn't it reasonable to make your own >>> convenience method? >>> >>> On Wed, Sep 18, 2024 at 1:22 PM Péter Váry <peter.vary.apa...@gmail.com> >>> wrote: >>> >>>> This is not just a Flink issue, tha calls are spread out in multiple >>>> packages. We checked the code, and in many of the current use-cases in the >>>> Iceberg repo the pool is not used in a static environment, and closed >>>> manually. In this cases we should switch to a thread pool without a >>>> shutdown hook. So I think minimally we need to create a utility method to >>>> create such a pool. >>>> >>>> The main question is: >>>> - Is it a bug, or a feature, that we always provide a pool with a hook? >>>> >>>> If this is a bug, then we create a "newExitingWorkerPool", and change >>>> the callers to use the correct one. >>>> If this is a feature, then we create a "newNotExitingWorkerPool" (which >>>> is gross IMHO, but we should consider API compatibility), and change the >>>> callers to use the correct one. >>>> >>>> Thanks, >>>> Peter >>>> >>>> On Wed, Sep 18, 2024, 21:53 rdb...@gmail.com <rdb...@gmail.com> wrote: >>>> >>>>> Since we're using standard interfaces, maybe we should just document >>>>> this behavior and you can control it by creating your own worker pool >>>>> instead? >>>>> >>>>> On Tue, Sep 17, 2024 at 2:20 AM Péter Váry < >>>>> peter.vary.apa...@gmail.com> wrote: >>>>> >>>>>> Bumping this thread a bit. >>>>>> >>>>>> Cleaning up the pool in non-static cases should be a responsibility >>>>>> of the user. If they want a pool which is closed by a hook when the JVM >>>>>> exists they should explicitly "say" so, for example calling >>>>>> "newExitingWorkerPool". >>>>>> >>>>>> This is a behaviour change in the API, so I think we need feedback >>>>>> from the community before proceeding with it. >>>>>> What are your thoughts? >>>>>> >>>>>> Thanks, >>>>>> Peter >>>>>> >>>>>> 冯佳捷 <laputafa...@gmail.com> ezt írta (időpont: 2024. szept. 13., P, >>>>>> 17:16): >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> During the investigation of a metaspace memory leak issue in Flink >>>>>>> IcebergSource ( https://github.com/apache/iceberg/pull/11073 ), a >>>>>>> discussion with @pvary revealed that *ThreadPools.newWorkerPool* >>>>>>> currently registers a Shutdown Hook via ExitingExecutorService for all >>>>>>> created thread pools. While this ensures graceful shutdown of the pools >>>>>>> when the JVM exits, it might lead to unnecessary Shutdown Hook >>>>>>> accumulation, especially when the pool is explicitly closed within the >>>>>>> application's lifecycle. >>>>>>> >>>>>>> I propose to *modify ThreadPools.newWorkerPool to not register a >>>>>>> Shutdown Hook by default*. This would prevent potential issues >>>>>>> where developers might unintentionally register numerous Shutdown Hooks >>>>>>> when using ThreadPools.newWorkerPool for short-lived thread pools. >>>>>>> To retain the existing functionality for long-lived thread pools >>>>>>> that require a Shutdown Hook, I suggest introducing a new, more >>>>>>> descriptive >>>>>>> function, such as *newExitingWorkerPool*. This function would >>>>>>> explicitly create thread pools that are registered with a Shutdown Hook. >>>>>>> >>>>>>> *This change might potentially impact users who rely on the implicit >>>>>>> Shutdown Hook registration provided by the current >>>>>>> ThreadPools.newWorkerPool implementation.* >>>>>>> I would like to gather feedback from the community regarding this >>>>>>> proposed change, especially regarding potential compatibility concerns. >>>>>>> >>>>>>> Best regards, >>>>>>> Feng Jiajie >>>>>>> >>>>>>>