Github user anew commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174573795
  
    --- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
    @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
         return zkClient.setData(liveNodePath, serializeLiveNode());
       }
     
    +  /**
    +   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
    +   *
    +   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
    +   */
       private OperationFuture<String> createLiveNode() {
    -    String liveNodePath = getLiveNodePath();
    +    final String liveNodePath = getLiveNodePath();
         LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
    -    return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
    -                                    
KeeperException.NodeExistsException.class, liveNodePath);
    +    final SettableOperationFuture<String> resultFuture = 
SettableOperationFuture.create(liveNodePath,
    +                                                                           
             Threads.SAME_THREAD_EXECUTOR);
    +    OperationFuture<String> createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
    +    Futures.addCallback(createFuture, new FutureCallback<String>() {
    +      final FutureCallback<String> thisCallback = this;
    +
    +      @Override
    +      public void onSuccess(String result) {
    +        LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
    +        resultFuture.set(result);
    +      }
    +
    +      @Override
    +      public void onFailure(final Throwable createFailure) {
    +        if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
    +          resultFuture.setException(createFailure);
    +        }
    +
    +        // If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
    +        // Try to delete the node and recreate it
    +        LOG.info("Live node already exist. Deleting node {}{}", 
zkClient.getConnectString(), liveNodePath);
    +        Futures.addCallback(zkClient.delete(liveNodePath), new 
FutureCallback<String>() {
    +          @Override
    +          public void onSuccess(String result) {
    +            Futures.addCallback(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
    +                                thisCallback, 
Threads.SAME_THREAD_EXECUTOR);
    +          }
    +
    +          @Override
    +          public void onFailure(Throwable t) {
    +            createFailure.addSuppressed(t);
    +            resultFuture.setException(createFailure);
    +          }
    +        }, Threads.SAME_THREAD_EXECUTOR);
    +      }
    +    }, Threads.SAME_THREAD_EXECUTOR);
    +
    +    return resultFuture;
    --- End diff --
    
    while this code appears correct (after addressing my comment), the three 
nested levels of callback make it almost impossible to read. Is there some way 
to unwind this?


---

Reply via email to