[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16278265#comment-16278265
 ] 

ASF GitHub Bot commented on FLINK-7880:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5062#discussion_r154886964
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
    @@ -166,28 +167,57 @@ public String getClientName() {
         * Shuts down the client and closes all connections.
         *
         * <p>After a call to this method, all returned futures will be failed.
    +    *
    +    * @return A {@link CompletableFuture} that will be completed when the 
shutdown process is done.
         */
    -   public void shutdown() {
    -           if (shutDown.compareAndSet(false, true)) {
    +   public CompletableFuture<Void> shutdown() {
    +           final CompletableFuture<Void> newShutdownFuture = new 
CompletableFuture<>();
    +           if (clientShutdownFuture.compareAndSet(null, 
newShutdownFuture)) {
    +
    +                   final List<CompletableFuture<Void>> connectionFutures = 
new ArrayList<>();
    +
                        for (Map.Entry<InetSocketAddress, 
EstablishedConnection> conn : establishedConnections.entrySet()) {
                                if 
(establishedConnections.remove(conn.getKey(), conn.getValue())) {
    -                                   conn.getValue().close();
    +                                   
connectionFutures.add(conn.getValue().close());
                                }
                        }
     
                        for (Map.Entry<InetSocketAddress, PendingConnection> 
conn : pendingConnections.entrySet()) {
                                if (pendingConnections.remove(conn.getKey()) != 
null) {
    -                                   conn.getValue().close();
    +                                   
connectionFutures.add(conn.getValue().close());
                                }
                        }
     
    -                   if (bootstrap != null) {
    -                           EventLoopGroup group = bootstrap.group();
    -                           if (group != null) {
    -                                   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
    +                   CompletableFuture.allOf(
    +                                   connectionFutures.toArray(new 
CompletableFuture<?>[connectionFutures.size()])
    +                   ).whenComplete((result, throwable) -> {
    +                           if (throwable != null) {
    +                                   
newShutdownFuture.completeExceptionally(throwable);
    +                           } else if (bootstrap != null) {
    +                                   EventLoopGroup group = 
bootstrap.group();
    +                                   if (group != null && 
!group.isShutdown()) {
    +                                           group.shutdownGracefully(0L, 
0L, TimeUnit.MILLISECONDS)
    +                                                           
.addListener(finished -> {
    +                                                                   if 
(finished.isSuccess()) {
    +                                                                           
newShutdownFuture.complete(null);
    +                                                                   } else {
    +                                                                           
newShutdownFuture.completeExceptionally(finished.cause());
    +                                                                   }
    +                                                           });
    +                                   } else {
    +                                           
newShutdownFuture.complete(null);
    +                                   }
    +                           } else {
    +                                   newShutdownFuture.complete(null);
                                }
    +                   });
    +
    +                   // check again if in the meantime another thread 
completed the future
    +                   if (clientShutdownFuture.compareAndSet(null, 
newShutdownFuture)) {
    --- End diff --
    
    You are right!


> flink-queryable-state-java fails with core-dump
> -----------------------------------------------
>
>                 Key: FLINK-7880
>                 URL: https://issues.apache.org/jira/browse/FLINK-7880
>             Project: Flink
>          Issue Type: Bug
>          Components: Queryable State, Tests
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Kostas Kloudas
>            Priority: Critical
>              Labels: test-stability
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to