gaoyunhaii commented on a change in pull request #13357:
URL: https://github.com/apache/flink/pull/13357#discussion_r492242431



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CircularQueues.java
##########
@@ -72,16 +76,23 @@ public void sendResult(MutableObjectIterator<E> result) {
        }
 
        @Override
-       public CircularElement<E> take(StageRunner.SortStage stage) {
-               try {
-                       return getQueue(stage).take();
-               } catch (InterruptedException e) {
-                       throw new WrappingRuntimeException(e);
+       public CircularElement<E> take(StageRunner.SortStage stage) throws 
InterruptedException {
+               while (!isFinished) {
+                       CircularElement<E> value = getQueue(stage).poll(1, 
TimeUnit.SECONDS);
+                       if (value != null) {
+                               return value;
+                       }
                }
+               throw new FlinkRuntimeException("The sorter is closed already");
        }
 
        @Override
        public CircularElement<E> poll(StageRunner.SortStage stage) {
                return getQueue(stage).poll();
        }
+
+       @Override
+       public void close() throws Exception {

Review comment:
       Sorry but I did not find where we call the `close` method?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SpillingThread.java
##########
@@ -586,4 +594,8 @@ private void getSegmentsForReaders(
                        }
                }
        }
+
+       private IOException wrapWithIOException(InterruptedException e) throws 
IOException {
+               return new IOException("The sortes has been interrupted", e);

Review comment:
       Typo `Sortes` -> `Sorter` ?
   
   I'm a little wondering that is it also possible to direct throws 
`InterruptedException` (We might add it to some methods' declaration) ? 
   
   If there are blockers for throwing `InterruptedException` directly I think 
we might promote the method to the base class since the other threads also 
faced with the situation to wrapper.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to