Github user anew commented on a diff in the pull request:
https://github.com/apache/twill/pull/67#discussion_r174573795
--- Diff:
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
return zkClient.setData(liveNodePath, serializeLiveNode());
}
+ /**
+ * Creates the live node for the service. If the node already exists, it
will be deleted before creation.
+ *
+ * @return A {@link OperationFuture} that will be completed when the
creation is done.
+ */
private OperationFuture<String> createLiveNode() {
- String liveNodePath = getLiveNodePath();
+ final String liveNodePath = getLiveNodePath();
LOG.info("Create live node {}{}", zkClient.getConnectString(),
liveNodePath);
- return ZKOperations.ignoreError(zkClient.create(liveNodePath,
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+ final SettableOperationFuture<String> resultFuture =
SettableOperationFuture.create(liveNodePath,
+
Threads.SAME_THREAD_EXECUTOR);
+ OperationFuture<String> createFuture = zkClient.create(liveNodePath,
serializeLiveNode(), CreateMode.EPHEMERAL);
+ Futures.addCallback(createFuture, new FutureCallback<String>() {
+ final FutureCallback<String> thisCallback = this;
+
+ @Override
+ public void onSuccess(String result) {
+ LOG.info("Live node created {}{}", zkClient.getConnectString(),
liveNodePath);
+ resultFuture.set(result);
+ }
+
+ @Override
+ public void onFailure(final Throwable createFailure) {
+ if (!(createFailure instanceof
KeeperException.NodeExistsException)) {
+ resultFuture.setException(createFailure);
+ }
+
+ // If the node already exists, it is due to previous run attempt
that left an ephemeral node.
+ // Try to delete the node and recreate it
+ LOG.info("Live node already exist. Deleting node {}{}",
zkClient.getConnectString(), liveNodePath);
+ Futures.addCallback(zkClient.delete(liveNodePath), new
FutureCallback<String>() {
+ @Override
+ public void onSuccess(String result) {
+ Futures.addCallback(zkClient.create(liveNodePath,
serializeLiveNode(), CreateMode.EPHEMERAL),
+ thisCallback,
Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ createFailure.addSuppressed(t);
+ resultFuture.setException(createFailure);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ return resultFuture;
--- End diff --
while this code appears correct (after addressing my comment), the three
nested levels of callback make it almost impossible to read. Is there some way
to unwind this?
---