zhijiangW commented on a change in pull request #8646: [FLINK-12735][network] 
Make shuffle environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#discussion_r299556207
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
 ##########
 @@ -118,75 +120,61 @@ public void shutdown() {
                // Remove shutdown hook to prevent resource leaks
                ShutdownHookUtil.removeShutdownHook(shutdownHook, 
getClass().getSimpleName(), LOG);
 
-               try {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Shutting down I/O manager.");
-                       }
-
-                       // close writing and reading threads with best effort 
and log problems
-                       // first notify all to close, then wait until all are 
closed
 
-                       for (WriterThread wt : writers) {
-                               try {
-                                       wt.shutdown();
-                               }
-                               catch (Throwable t) {
-                                       LOG.error("Error while shutting down IO 
Manager writer thread.", t);
-                               }
-                       }
-                       for (ReaderThread rt : readers) {
-                               try {
-                                       rt.shutdown();
-                               }
-                               catch (Throwable t) {
-                                       LOG.error("Error while shutting down IO 
Manager reader thread.", t);
-                               }
-                       }
-                       try {
-                               for (WriterThread wt : writers) {
-                                       wt.join();
-                               }
-                               for (ReaderThread rt : readers) {
-                                       rt.join();
-                               }
-                       }
-                       catch (InterruptedException iex) {
-                               // ignore this on shutdown
-                       }
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Shutting down I/O manager.");
                }
-               finally {
-                       // make sure we call the super implementation in any 
case and at the last point,
-                       // because this will clean up the I/O directories
-                       super.shutdown();
+
+               // close writing and reading threads with best effort and log 
problems
+               // first notify all to close, then wait until all are closed
+
+               List<AutoCloseable> closeables = new ArrayList<>(2 * 
writers.length + 2 * readers.length + 1);
+
+               for (WriterThread wt : writers) {
+                       closeables.add(getWriterThreadCloser(wt));
+                       closeables.add(wt::join);
                }
-       }
-       
-       /**
-        * Utility method to check whether the IO manager has been properly 
shut down. The IO manager is considered
-        * to be properly shut down when it is closed and its threads have 
ceased operation.
-        * 
-        * @return True, if the IO manager has properly shut down, false 
otherwise.
-        */
-       @Override
-       public boolean isProperlyShutDown() {
-               boolean readersShutDown = true;
+
                for (ReaderThread rt : readers) {
-                       readersShutDown &= rt.getState() == 
Thread.State.TERMINATED;
+                       closeables.add(getReaderThreadCloser(rt));
+                       closeables.add(rt::join);
 
 Review comment:
   I think you are right. I have not looked through the implementation in 
`shutdown` before, just keep the same behavior as before, which also call 
`join` after `shutdown`.
   I could improve this previous issue in a hotfix commit later.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to