Github user anew commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174639182
  
    --- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
    @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
         return zkClient.setData(liveNodePath, serializeLiveNode());
       }
     
    +  /**
    +   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
    +   *
    +   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
    +   */
       private OperationFuture<String> createLiveNode() {
    -    String liveNodePath = getLiveNodePath();
    +    final String liveNodePath = getLiveNodePath();
         LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
    -    return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
    -                                    
KeeperException.NodeExistsException.class, liveNodePath);
    +    final SettableOperationFuture<String> resultFuture = 
SettableOperationFuture.create(liveNodePath,
    +                                                                           
             Threads.SAME_THREAD_EXECUTOR);
    +    OperationFuture<String> createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
    +    Futures.addCallback(createFuture, new FutureCallback<String>() {
    +      final FutureCallback<String> thisCallback = this;
    +
    +      @Override
    +      public void onSuccess(String result) {
    +        LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
    +        resultFuture.set(result);
    +      }
    +
    +      @Override
    +      public void onFailure(final Throwable createFailure) {
    +        if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
    +          resultFuture.setException(createFailure);
    +        }
    +
    +        // If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
    +        // Try to delete the node and recreate it
    +        LOG.info("Live node already exist. Deleting node {}{}", 
zkClient.getConnectString(), liveNodePath);
    +        Futures.addCallback(zkClient.delete(liveNodePath), new 
FutureCallback<String>() {
    +          @Override
    +          public void onSuccess(String result) {
    +            Futures.addCallback(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
    +                                thisCallback, 
Threads.SAME_THREAD_EXECUTOR);
    +          }
    +
    +          @Override
    +          public void onFailure(Throwable t) {
    +            createFailure.addSuppressed(t);
    +            resultFuture.setException(createFailure);
    +          }
    +        }, Threads.SAME_THREAD_EXECUTOR);
    +      }
    +    }, Threads.SAME_THREAD_EXECUTOR);
    +
    +    return resultFuture;
    --- End diff --
    
    I was thinking it does not have to be async. Create the ZK node, get the 
future. If success, done. If not delete the ZK node, get the future. If 
failure, throw. Else try again. But maybe that would be equally complex?


---

Reply via email to