liming30 commented on code in PR #21503:
URL: https://github.com/apache/flink/pull/21503#discussion_r1102335317


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -177,6 +182,14 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
         this.checkpoints = new HashMap<>();
         this.lock = new Object();
         this.asyncOperationsThreadPool = 
checkNotNull(asyncOperationsThreadPool);
+        this.asyncDisposeThreadPool =
+                new ThreadPoolExecutor(
+                        0,
+                        4,
+                        60L,
+                        TimeUnit.SECONDS,
+                        new LinkedBlockingQueue<>(),
+                        new ExecutorThreadFactory("AsyncDispose"));

Review Comment:
   In @gaoyunhaii's comment in 
[FLINK-30251](https://issues.apache.org/jira/browse/FLINK-30251):
   
   > There is also one concern:
   >
   > 1. The current `asyncOperationsThreadPool` is a cached thread pool, which 
do not have an upper limit of the number of threads, and it will create a new 
thread whenever there is not free thread when submitting tasks. Then if we have 
a large number of file to close, we might end up with a lot of threads, which 
might further cause a large number of memory consumption (1MB for each thread 
RSS region).
   >
   > 2. Thus we might change it to a thread pool with a limited maximum number 
of thread and one unbounded Blocking Queue. Also since the thread in this pool 
might be blocked, we might need to use a separate thread pool.
   
   The main concern is that too many threads will be generated when 
asynchronous cleanup may block. From your point of view, which is better to 
reuse `asyncOperationsThreadPool` or create a new pool?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -529,15 +542,19 @@ public void close() throws IOException {
     }
 
     public void cancel() throws IOException {
-        List<AsyncCheckpointRunnable> asyncCheckpointRunnables = null;
         synchronized (lock) {
             if (!closed) {
                 closed = true;
-                asyncCheckpointRunnables = new 
ArrayList<>(checkpoints.values());
+                final List<AsyncCheckpointRunnable> asyncCheckpointRunnables =
+                        new ArrayList<>(checkpoints.values());
                 checkpoints.clear();
+                if (!asyncCheckpointRunnables.isEmpty()) {
+                    asyncDisposeThreadPool.execute(
+                            () -> 
IOUtils.closeAllQuietly(asyncCheckpointRunnables));
+                }
+                asyncDisposeThreadPool.shutdown();

Review Comment:
   Yes, you are right, I originally just hoped that all async operations should 
not block the process, but from the perspective of preventing resource leaks, 
cancel should not be asynchronous. I'll change this part of the logic back.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to