Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174618395 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture<String> createLiveNode() { - String liveNodePath = getLiveNodePath(); + final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); - return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); + final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); + OperationFuture<String> createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); + Futures.addCallback(createFuture, new FutureCallback<String>() { + final FutureCallback<String> thisCallback = this; + + @Override + public void onSuccess(String result) { + LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); + resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { + if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); + } + + // If the node already exists, it is due to previous run attempt that left an ephemeral node. + // Try to delete the node and recreate it + LOG.info("Live node already exist. Deleting node {}{}", zkClient.getConnectString(), liveNodePath); + Futures.addCallback(zkClient.delete(liveNodePath), new FutureCallback<String>() { + @Override + public void onSuccess(String result) { + Futures.addCallback(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), + thisCallback, Threads.SAME_THREAD_EXECUTOR); + } + + @Override + public void onFailure(Throwable t) { + createFailure.addSuppressed(t); + resultFuture.setException(createFailure); + } + }, Threads.SAME_THREAD_EXECUTOR); + } + }, Threads.SAME_THREAD_EXECUTOR); + + return resultFuture; --- End diff -- I can pull the common code between this and the ApplicationMasterMain class into a util function. But still, inside the util function, there would be three callbacks.
---