+1 for Steven's suggestion. The `newFixedThreadPool` is slightly better
IMHO, but I would be happy with both solutions

On Thu, Sep 26, 2024, 02:06 Steven Wu <stevenz...@gmail.com> wrote:

>
> First, we should definitely add Javadoc to `ThreadPools.newWorkerPool` on
> its behavior with a shutdown hook. It is not obvious from the method name.
> I would actually go further to deprecate `newWorkerPool` with
> `newExitingWorkerPool`. `newWorkerPool` method name is easy to cause the
> misuage, as the intention is not obvious from the name.
>
> `*newNonExitingWorkerPool*` is a little awkward to me. `NonExiting`
> should be the default behavior. Maybe we can call this new method as
> `newFixedThreadPool(int poolSize, String prefix)`. Alternatively, we can
> just make `ThreadPools.newDaemonThreadFactory` public as the proposed `
> *newNonExitingWorkerPool` *really just saved one line on the thread
> factory construction.
>
>
> On Wed, Sep 18, 2024 at 10:25 PM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
>> Here are the cases where we call the `newWorkerPool` in our code:
>>
>>    - Correctly:
>>       - S3FileIO.executorService
>>       - HadoopFileIO.executorService
>>    - Incorrectly:
>>       - CountersBenchmark.defaultCounterMultipleThreads (core module)
>>       - BaseDistributedDataScan.newMonitorPool (core module)
>>       - FlinkInputFormat.createInputSplits (flink module)
>>       - IcebergInputFormat.getSplits (flink module)
>>    - Incorrectly, but typically called only once in the JVM lifecycle
>>       - TableMigrationUtil.migrationService - the pool management is
>>       abandoned, and nothing prevents multiple pool creations (data module)
>>       - IcebergCommitter<init> (flink module)
>>       IcebergFilesCommitter.open (flink module)
>>       - IcebergSource.planSplitsForBatch (flink module)
>>       - StreamingMonitorFunction.open (flink module)
>>       - ContinuousSplitPlannerImpl<init> (flink module)
>>       - Coordinator<init> - Kafka coordinator - I'm not sure that this
>>       belongs to here (kafka-connect)
>>
>> The code we need to duplicate in core/data/flink/kafka module is:
>>
>> *  public static ExecutorService newNonExitingWorkerPool(String
>> namePrefix, int poolSize) {*
>> *    return Executors.newFixedThreadPool(*
>> *        poolSize,*
>> *        new
>> ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix +
>> "-%d").build());*
>> *  }*
>>
>>
>> Maybe adding another utility method to the `ThreadPools` would help
>> future contributors to think twice about the need for using the `Exiting`
>> solution, so I would prefer to add this method to the core `ThreadPools`
>> with enough javadoc to highlight the intended usage.
>>
>> Thanks,
>> Peter
>>
>> rdb...@gmail.com <rdb...@gmail.com> ezt írta (időpont: 2024. szept. 18.,
>> Sze, 23:26):
>>
>>> I think this is the intended behavior. The code calls
>>> `MoreExecutors.getExitingExecutorService` internally to ensure the pool
>>> exits. I think the right fix is for callers to create their own
>>> `ExecutorService` rather than using `newWorkerPool`. That allows for
>>> customization without making Iceberg more complicated. `ThreadPools` isn't
>>> doing anything special here. It's just a convenience method to create an
>>> exiting, fixed-size thread pool that runs daemon threads. If that's not
>>> what you're looking for then isn't it reasonable to make your own
>>> convenience method?
>>>
>>> On Wed, Sep 18, 2024 at 1:22 PM Péter Váry <peter.vary.apa...@gmail.com>
>>> wrote:
>>>
>>>> This is not just a Flink issue, tha calls are spread out in multiple
>>>> packages. We checked the code, and in many of the current use-cases in the
>>>> Iceberg repo the pool is not used in a static environment, and closed
>>>> manually. In this cases we should switch to a thread pool without a
>>>> shutdown hook. So I think minimally we need to create a utility method to
>>>> create such a pool.
>>>>
>>>> The main question is:
>>>> - Is it a bug, or a feature, that we always provide a pool with a hook?
>>>>
>>>> If this is a bug, then we create a "newExitingWorkerPool", and change
>>>> the callers to use the correct one.
>>>> If this is a feature, then we create a "newNotExitingWorkerPool" (which
>>>> is gross IMHO, but we should consider API compatibility), and change the
>>>> callers to use the correct one.
>>>>
>>>> Thanks,
>>>> Peter
>>>>
>>>> On Wed, Sep 18, 2024, 21:53 rdb...@gmail.com <rdb...@gmail.com> wrote:
>>>>
>>>>> Since we're using standard interfaces, maybe we should just document
>>>>> this behavior and you can control it by creating your own worker pool
>>>>> instead?
>>>>>
>>>>> On Tue, Sep 17, 2024 at 2:20 AM Péter Váry <
>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>
>>>>>> Bumping this thread a bit.
>>>>>>
>>>>>> Cleaning up the pool in non-static cases should be a responsibility
>>>>>> of the user. If they want a pool which is closed by a hook when the JVM
>>>>>> exists they should explicitly "say" so, for example calling
>>>>>> "newExitingWorkerPool".
>>>>>>
>>>>>> This is a behaviour change in the API, so I think we need feedback
>>>>>> from the community before proceeding with it.
>>>>>> What are your thoughts?
>>>>>>
>>>>>> Thanks,
>>>>>> Peter
>>>>>>
>>>>>> 冯佳捷 <laputafa...@gmail.com> ezt írta (időpont: 2024. szept. 13., P,
>>>>>> 17:16):
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> During the investigation of a metaspace memory leak issue in Flink
>>>>>>> IcebergSource ( https://github.com/apache/iceberg/pull/11073 ), a
>>>>>>> discussion with @pvary revealed that *ThreadPools.newWorkerPool*
>>>>>>> currently registers a Shutdown Hook via ExitingExecutorService for all
>>>>>>> created thread pools. While this ensures graceful shutdown of the pools
>>>>>>> when the JVM exits, it might lead to unnecessary Shutdown Hook
>>>>>>> accumulation, especially when the pool is explicitly closed within the
>>>>>>> application's lifecycle.
>>>>>>>
>>>>>>> I propose to *modify ThreadPools.newWorkerPool to not register a
>>>>>>> Shutdown Hook by default*. This would prevent potential issues
>>>>>>> where developers might unintentionally register numerous Shutdown Hooks
>>>>>>> when using ThreadPools.newWorkerPool for short-lived thread pools.
>>>>>>> To retain the existing functionality for long-lived thread pools
>>>>>>> that require a Shutdown Hook, I suggest introducing a new, more 
>>>>>>> descriptive
>>>>>>> function, such as *newExitingWorkerPool*. This function would
>>>>>>> explicitly create thread pools that are registered with a Shutdown Hook.
>>>>>>>
>>>>>>> *This change might potentially impact users who rely on the implicit
>>>>>>> Shutdown Hook registration provided by the current
>>>>>>> ThreadPools.newWorkerPool implementation.*
>>>>>>> I would like to gather feedback from the community regarding this
>>>>>>> proposed change, especially regarding potential compatibility concerns.
>>>>>>>
>>>>>>> Best regards,
>>>>>>> Feng Jiajie
>>>>>>>
>>>>>>>

Reply via email to