liming30 commented on code in PR #21503: URL: https://github.com/apache/flink/pull/21503#discussion_r1102335317
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java: ########## @@ -177,6 +182,14 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { this.checkpoints = new HashMap<>(); this.lock = new Object(); this.asyncOperationsThreadPool = checkNotNull(asyncOperationsThreadPool); + this.asyncDisposeThreadPool = + new ThreadPoolExecutor( + 0, + 4, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new ExecutorThreadFactory("AsyncDispose")); Review Comment: In @gaoyunhaii's comment in [FLINK-30251](https://issues.apache.org/jira/browse/FLINK-30251): > There is also one concern: > > 1. The current `asyncOperationsThreadPool` is a cached thread pool, which do not have an upper limit of the number of threads, and it will create a new thread whenever there is not free thread when submitting tasks. Then if we have a large number of file to close, we might end up with a lot of threads, which might further cause a large number of memory consumption (1MB for each thread RSS region). > > 2. Thus we might change it to a thread pool with a limited maximum number of thread and one unbounded Blocking Queue. Also since the thread in this pool might be blocked, we might need to use a separate thread pool. The main concern is that too many threads will be generated when asynchronous cleanup may block. From your point of view, which is better to reuse `asyncOperationsThreadPool` or create a new pool? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java: ########## @@ -529,15 +542,19 @@ public void close() throws IOException { } public void cancel() throws IOException { - List<AsyncCheckpointRunnable> asyncCheckpointRunnables = null; synchronized (lock) { if (!closed) { closed = true; - asyncCheckpointRunnables = new ArrayList<>(checkpoints.values()); + final List<AsyncCheckpointRunnable> asyncCheckpointRunnables = + new ArrayList<>(checkpoints.values()); checkpoints.clear(); + if (!asyncCheckpointRunnables.isEmpty()) { + asyncDisposeThreadPool.execute( + () -> IOUtils.closeAllQuietly(asyncCheckpointRunnables)); + } + asyncDisposeThreadPool.shutdown(); Review Comment: Yes, you are right, I originally just hoped that all async operations should not block the process, but from the perspective of preventing resource leaks, cancel should not be asynchronous. I'll change this part of the logic back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org