Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174639182 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture<String> createLiveNode() { - String liveNodePath = getLiveNodePath(); + final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); - return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); + final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); + OperationFuture<String> createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); + Futures.addCallback(createFuture, new FutureCallback<String>() { + final FutureCallback<String> thisCallback = this; + + @Override + public void onSuccess(String result) { + LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); + resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { + if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); + } + + // If the node already exists, it is due to previous run attempt that left an ephemeral node. + // Try to delete the node and recreate it + LOG.info("Live node already exist. Deleting node {}{}", zkClient.getConnectString(), liveNodePath); + Futures.addCallback(zkClient.delete(liveNodePath), new FutureCallback<String>() { + @Override + public void onSuccess(String result) { + Futures.addCallback(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), + thisCallback, Threads.SAME_THREAD_EXECUTOR); + } + + @Override + public void onFailure(Throwable t) { + createFailure.addSuppressed(t); + resultFuture.setException(createFailure); + } + }, Threads.SAME_THREAD_EXECUTOR); + } + }, Threads.SAME_THREAD_EXECUTOR); + + return resultFuture; --- End diff -- I was thinking it does not have to be async. Create the ZK node, get the future. If success, done. If not delete the ZK node, get the future. If failure, throw. Else try again. But maybe that would be equally complex?
---