XComp commented on code in PR #23296:
URL: https://github.com/apache/flink/pull/23296#discussion_r1308278198


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java:
##########
@@ -83,7 +104,10 @@ public void start() throws Exception {
     @Override
     public void close() throws Exception {
         LOG.info("Closing the CollectSinkOperatorCoordinator.");
+        running = false;
         this.executorService.shutdownNow();
+        ongoingRequests.forEach(ft -> ft.cancel(true));

Review Comment:
   Yeah, you're right. I don't know why but I remember that I assumed in the 
previous PR that shutdown would trigger a cancellation of the queued tasks. But 
that's not the case. The guarding of the `SocketConnection` and the `running` 
are obsolete: The running tasks are handled by `shutdownNow` and the queued 
tasks are cancelled before executing them. Therefore, no concurrency happens 
when resetting the `SocketConnection`.
   
   That also explains the more frequent test failures: The previous `shutdown` 
call tried to handle the ongoing requests sequentially. The `shutdownNow` seem 
to have cancelled the ongoing task but queued tasks were executed afterwards 
which lead to the re-instantiation of the `SocketConnection`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to