XComp commented on code in PR #23296: URL: https://github.com/apache/flink/pull/23296#discussion_r1308278198
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java: ########## @@ -83,7 +104,10 @@ public void start() throws Exception { @Override public void close() throws Exception { LOG.info("Closing the CollectSinkOperatorCoordinator."); + running = false; this.executorService.shutdownNow(); + ongoingRequests.forEach(ft -> ft.cancel(true)); Review Comment: Yeah, you're right. I don't know why but I remember that I assumed in the previous PR that shutdown would trigger a cancellation of the queued tasks. But that's not the case. The guarding of the `SocketConnection` and the `running` are obsolete: The running tasks are handled by `shutdownNow` and the queued tasks are cancelled before executing them. Therefore, no concurrency happens when resetting the `SocketConnection`. That also explains the more frequent test failures: The previous `shutdown` call tried to handle the ongoing requests sequentially. The `shutdownNow` seem to have cancelled the ongoing task but queued tasks were executed afterwards which lead to the re-instantiation of the `SocketConnection` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org