[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174644467
  
--- Diff: 
twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
 ---
@@ -170,45 +167,7 @@ private ApplicationKafkaService(ZKClient zkClient, 
String kafkaZKConnect) {
 protected void startUp() throws Exception {
   // Create the ZK node for Kafka to use. If the node already exists, 
delete it to make sure there is
   // no left over content from previous AM attempt.
-  final SettableOperationFuture completion = 
SettableOperationFuture.create(kafkaZKPath,
-   
 Threads.SAME_THREAD_EXECUTOR);
-  LOG.info("Preparing Kafka ZK path {}{}", 
zkClient.getConnectString(), kafkaZKPath);
--- End diff --

oh yeah, accidentally removed.


---


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399632#comment-16399632
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174644467
  
--- Diff: 
twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
 ---
@@ -170,45 +167,7 @@ private ApplicationKafkaService(ZKClient zkClient, 
String kafkaZKConnect) {
 protected void startUp() throws Exception {
   // Create the ZK node for Kafka to use. If the node already exists, 
delete it to make sure there is
   // no left over content from previous AM attempt.
-  final SettableOperationFuture completion = 
SettableOperationFuture.create(kafkaZKPath,
-   
 Threads.SAME_THREAD_EXECUTOR);
-  LOG.info("Preparing Kafka ZK path {}{}", 
zkClient.getConnectString(), kafkaZKPath);
--- End diff --

oh yeah, accidentally removed.


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399626#comment-16399626
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174643542
  
--- Diff: 
twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java ---
@@ -281,6 +286,73 @@ public void onFailure(Throwable t) {
 return resultFuture;
   }
 
+  /**
+   * Creates a ZK node of the given path. If the node already exists, 
deletion of the node (recursively) will happen
+   * and the creation will be retried.
+   */
+  public static OperationFuture createDeleteIfExists(final 
ZKClient zkClient, final String path,
+ @Nullable 
final byte[] data, final CreateMode createMode,
+ final boolean 
createParent, final ACL...acls) {
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(path,
+   
 Threads.SAME_THREAD_EXECUTOR);
+final List createACLs = acls.length == 0 ? 
ZooDefs.Ids.OPEN_ACL_UNSAFE : Arrays.asList(acls);
+createNode(zkClient, path, data, createMode, createParent, createACLs, 
new FutureCallback() {
+
+  final FutureCallback createCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+// Create succeeded, just set the result to the resultFuture
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+// If create failed not because of the NodeExistsException, just 
set the exception to the result future
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+  return;
+}
+
+// Try to delete the path
+LOG.info("Node {}{} already exists. Deleting it and retry 
creation", zkClient.getConnectString(), path);
+Futures.addCallback(recursiveDelete(zkClient, path), new 
FutureCallback() {
+  @Override
+  public void onSuccess(String result) {
+// If delete succeeded, perform the creation again.
+createNode(zkClient, path, data, createMode, createParent, 
createACLs, createCallback);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+// If deletion failed because of NoNodeException, fail the 
result operation future
+if (!(t instanceof KeeperException.NoNodeException)) {
+  createFailure.addSuppressed(t);
+  resultFuture.setException(createFailure);
+  return;
+}
+
+// If can't delete because the node no longer exists, just go 
ahead and recreate the node
+createNode(zkClient, path, data, createMode, createParent, 
createACLs, createCallback);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+  }
+});
+
+return resultFuture;
+  }
+
+  /**
+   * Private helper method to create a ZK node based on the parameter. The 
result of the creation is always
+   * communicate via the provided {@link FutureCallback}.
+   */
+  private static void createNode(ZKClient zkClient, String path, @Nullable 
byte[] data,
+ CreateMode createMode, boolean 
createParent,
+ Iterable acls, 
FutureCallback callback) {
+Futures.addCallback(zkClient.create(path, data, createMode, 
createParent, acls),
+callback, Threads.SAME_THREAD_EXECUTOR);
+  }
--- End diff --

yes, easier to understand and extracted into separate methods. Like it much 
better now 


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399625#comment-16399625
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174643070
  
--- Diff: 
twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
 ---
@@ -170,45 +167,7 @@ private ApplicationKafkaService(ZKClient zkClient, 
String kafkaZKConnect) {
 protected void startUp() throws Exception {
   // Create the ZK node for Kafka to use. If the node already exists, 
delete it to make sure there is
   // no left over content from previous AM attempt.
-  final SettableOperationFuture completion = 
SettableOperationFuture.create(kafkaZKPath,
-   
 Threads.SAME_THREAD_EXECUTOR);
-  LOG.info("Preparing Kafka ZK path {}{}", 
zkClient.getConnectString(), kafkaZKPath);
--- End diff --

maybe keep the log message?


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174643542
  
--- Diff: 
twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java ---
@@ -281,6 +286,73 @@ public void onFailure(Throwable t) {
 return resultFuture;
   }
 
+  /**
+   * Creates a ZK node of the given path. If the node already exists, 
deletion of the node (recursively) will happen
+   * and the creation will be retried.
+   */
+  public static OperationFuture createDeleteIfExists(final 
ZKClient zkClient, final String path,
+ @Nullable 
final byte[] data, final CreateMode createMode,
+ final boolean 
createParent, final ACL...acls) {
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(path,
+   
 Threads.SAME_THREAD_EXECUTOR);
+final List createACLs = acls.length == 0 ? 
ZooDefs.Ids.OPEN_ACL_UNSAFE : Arrays.asList(acls);
+createNode(zkClient, path, data, createMode, createParent, createACLs, 
new FutureCallback() {
+
+  final FutureCallback createCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+// Create succeeded, just set the result to the resultFuture
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+// If create failed not because of the NodeExistsException, just 
set the exception to the result future
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+  return;
+}
+
+// Try to delete the path
+LOG.info("Node {}{} already exists. Deleting it and retry 
creation", zkClient.getConnectString(), path);
+Futures.addCallback(recursiveDelete(zkClient, path), new 
FutureCallback() {
+  @Override
+  public void onSuccess(String result) {
+// If delete succeeded, perform the creation again.
+createNode(zkClient, path, data, createMode, createParent, 
createACLs, createCallback);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+// If deletion failed because of NoNodeException, fail the 
result operation future
+if (!(t instanceof KeeperException.NoNodeException)) {
+  createFailure.addSuppressed(t);
+  resultFuture.setException(createFailure);
+  return;
+}
+
+// If can't delete because the node no longer exists, just go 
ahead and recreate the node
+createNode(zkClient, path, data, createMode, createParent, 
createACLs, createCallback);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+  }
+});
+
+return resultFuture;
+  }
+
+  /**
+   * Private helper method to create a ZK node based on the parameter. The 
result of the creation is always
+   * communicate via the provided {@link FutureCallback}.
+   */
+  private static void createNode(ZKClient zkClient, String path, @Nullable 
byte[] data,
+ CreateMode createMode, boolean 
createParent,
+ Iterable acls, 
FutureCallback callback) {
+Futures.addCallback(zkClient.create(path, data, createMode, 
createParent, acls),
+callback, Threads.SAME_THREAD_EXECUTOR);
+  }
--- End diff --

yes, easier to understand and extracted into separate methods. Like it much 
better now 


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174643070
  
--- Diff: 
twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
 ---
@@ -170,45 +167,7 @@ private ApplicationKafkaService(ZKClient zkClient, 
String kafkaZKConnect) {
 protected void startUp() throws Exception {
   // Create the ZK node for Kafka to use. If the node already exists, 
delete it to make sure there is
   // no left over content from previous AM attempt.
-  final SettableOperationFuture completion = 
SettableOperationFuture.create(kafkaZKPath,
-   
 Threads.SAME_THREAD_EXECUTOR);
-  LOG.info("Preparing Kafka ZK path {}{}", 
zkClient.getConnectString(), kafkaZKPath);
--- End diff --

maybe keep the log message?


---


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399593#comment-16399593
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174639182
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
+// Try to delete the node and recreate it
+LOG.info("Live node already exist. Deleting node {}{}", 
zkClient.getConnectString(), liveNodePath);
+Futures.addCallback(zkClient.delete(liveNodePath), new 
FutureCallback() {
+  @Override
+  public void onSuccess(String result) {
+Futures.addCallback(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
+thisCallback, 
Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+createFailure.addSuppressed(t);
+resultFuture.setException(createFailure);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+
+return resultFuture;
--- End diff --

I was thinking it does not have to be async. Create the ZK node, get the 
future. If success, done. If not delete the ZK node, get the future. If 
failure, throw. Else try again. But maybe that would be equally complex?


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174639182
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
+// Try to delete the node and recreate it
+LOG.info("Live node already exist. Deleting node {}{}", 
zkClient.getConnectString(), liveNodePath);
+Futures.addCallback(zkClient.delete(liveNodePath), new 
FutureCallback() {
+  @Override
+  public void onSuccess(String result) {
+Futures.addCallback(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
+thisCallback, 
Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+createFailure.addSuppressed(t);
+resultFuture.setException(createFailure);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+
+return resultFuture;
--- End diff --

I was thinking it does not have to be async. Create the ZK node, get the 
future. If success, done. If not delete the ZK node, get the future. If 
failure, throw. Else try again. But maybe that would be equally complex?


---


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399591#comment-16399591
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174638923
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
--- End diff --

got it


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174638923
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
--- End diff --

got it


---


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399531#comment-16399531
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user chtyim commented on the issue:

https://github.com/apache/twill/pull/67
  
I've addressed the comments and fixed one more issue (one deletion failure, 
if the node not exist, we can just go ahead and recreate the node instead of 
failing).

 Also refactored the callback code a bit to try to make it cleaner.


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twill issue #67: (TWILL-61) Fix to allow higher attempts to relaunch the app...

2018-03-14 Thread chtyim
Github user chtyim commented on the issue:

https://github.com/apache/twill/pull/67
  
I've addressed the comments and fixed one more issue (one deletion failure, 
if the node not exist, we can just go ahead and recreate the node instead of 
failing).

 Also refactored the callback code a bit to try to make it cleaner.


---


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399527#comment-16399527
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174626043
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
 ---
@@ -159,30 +159,37 @@ public void changed(BrokerService brokerService) {
   }
 
   String newBrokerList = brokerService.getBrokerList();
-  if (newBrokerList.isEmpty()) {
-LOG.warn("Broker list is empty. No Kafka producer is created.");
-return;
-  }
 
+  // If there is no change, whether it is empty or not, just return
   if (Objects.equal(brokerList, newBrokerList)) {
 return;
   }
 
-  Properties props = new Properties();
-  props.put("metadata.broker.list", newBrokerList);
-  props.put("serializer.class", ByteBufferEncoder.class.getName());
-  props.put("key.serializer.class", IntegerEncoder.class.getName());
-  props.put("partitioner.class", IntegerPartitioner.class.getName());
-  props.put("request.required.acks", Integer.toString(ack.getAck()));
-  props.put("compression.codec", compression.getCodec());
+  Producer newProducer = null;
+  if (!newBrokerList.isEmpty()) {
+Properties props = new Properties();
+props.put("metadata.broker.list", newBrokerList);
+props.put("serializer.class", ByteBufferEncoder.class.getName());
+props.put("key.serializer.class", IntegerEncoder.class.getName());
+props.put("partitioner.class", IntegerPartitioner.class.getName());
+props.put("request.required.acks", Integer.toString(ack.getAck()));
+props.put("compression.codec", compression.getCodec());
+
+ProducerConfig config = new ProducerConfig(props);
+newProducer = new Producer<>(config);
+  }
 
-  ProducerConfig config = new ProducerConfig(props);
-  Producer oldProducer = producer.getAndSet(new 
Producer(config));
+  // If the broker list is empty, the producer will be set to null
+  Producer oldProducer = 
producer.getAndSet(newProducer);
   if (oldProducer != null) {
 oldProducer.close();
   }
 
-  LOG.info("Update Kafka producer broker list: {}", newBrokerList);
+  if (newBrokerList.isEmpty()) {
+LOG.warn("Empty Kafka producer broker list, publish will fail.");
--- End diff --

Yes


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174626043
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
 ---
@@ -159,30 +159,37 @@ public void changed(BrokerService brokerService) {
   }
 
   String newBrokerList = brokerService.getBrokerList();
-  if (newBrokerList.isEmpty()) {
-LOG.warn("Broker list is empty. No Kafka producer is created.");
-return;
-  }
 
+  // If there is no change, whether it is empty or not, just return
   if (Objects.equal(brokerList, newBrokerList)) {
 return;
   }
 
-  Properties props = new Properties();
-  props.put("metadata.broker.list", newBrokerList);
-  props.put("serializer.class", ByteBufferEncoder.class.getName());
-  props.put("key.serializer.class", IntegerEncoder.class.getName());
-  props.put("partitioner.class", IntegerPartitioner.class.getName());
-  props.put("request.required.acks", Integer.toString(ack.getAck()));
-  props.put("compression.codec", compression.getCodec());
+  Producer newProducer = null;
+  if (!newBrokerList.isEmpty()) {
+Properties props = new Properties();
+props.put("metadata.broker.list", newBrokerList);
+props.put("serializer.class", ByteBufferEncoder.class.getName());
+props.put("key.serializer.class", IntegerEncoder.class.getName());
+props.put("partitioner.class", IntegerPartitioner.class.getName());
+props.put("request.required.acks", Integer.toString(ack.getAck()));
+props.put("compression.codec", compression.getCodec());
+
+ProducerConfig config = new ProducerConfig(props);
+newProducer = new Producer<>(config);
+  }
 
-  ProducerConfig config = new ProducerConfig(props);
-  Producer oldProducer = producer.getAndSet(new 
Producer(config));
+  // If the broker list is empty, the producer will be set to null
+  Producer oldProducer = 
producer.getAndSet(newProducer);
   if (oldProducer != null) {
 oldProducer.close();
   }
 
-  LOG.info("Update Kafka producer broker list: {}", newBrokerList);
+  if (newBrokerList.isEmpty()) {
+LOG.warn("Empty Kafka producer broker list, publish will fail.");
--- End diff --

Yes


---


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399470#comment-16399470
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174618874
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
--- End diff --

That's right. Missed it.


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174618874
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
--- End diff --

That's right. Missed it.


---


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399466#comment-16399466
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174618395
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
+// Try to delete the node and recreate it
+LOG.info("Live node already exist. Deleting node {}{}", 
zkClient.getConnectString(), liveNodePath);
+Futures.addCallback(zkClient.delete(liveNodePath), new 
FutureCallback() {
+  @Override
+  public void onSuccess(String result) {
+Futures.addCallback(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
+thisCallback, 
Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+createFailure.addSuppressed(t);
+resultFuture.setException(createFailure);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+
+return resultFuture;
--- End diff --

I can pull the common code between this and the ApplicationMasterMain class 
into a util function. But still, inside the util function, there would be three 
callbacks.


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174618395
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
+// Try to delete the node and recreate it
+LOG.info("Live node already exist. Deleting node {}{}", 
zkClient.getConnectString(), liveNodePath);
+Futures.addCallback(zkClient.delete(liveNodePath), new 
FutureCallback() {
+  @Override
+  public void onSuccess(String result) {
+Futures.addCallback(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
+thisCallback, 
Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+createFailure.addSuppressed(t);
+resultFuture.setException(createFailure);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+
+return resultFuture;
--- End diff --

I can pull the common code between this and the ApplicationMasterMain class 
into a util function. But still, inside the util function, there would be three 
callbacks.


---


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399465#comment-16399465
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174618087
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
+// Try to delete the node and recreate it
+LOG.info("Live node already exist. Deleting node {}{}", 
zkClient.getConnectString(), liveNodePath);
+Futures.addCallback(zkClient.delete(liveNodePath), new 
FutureCallback() {
+  @Override
+  public void onSuccess(String result) {
+Futures.addCallback(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
+thisCallback, 
Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+createFailure.addSuppressed(t);
+resultFuture.setException(createFailure);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+
+return resultFuture;
--- End diff --

Well, we do need that many levels of callback (create -> delete -> create) 
for the operation. Any suggestions on how to simplify it?


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174618087
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
+// Try to delete the node and recreate it
+LOG.info("Live node already exist. Deleting node {}{}", 
zkClient.getConnectString(), liveNodePath);
+Futures.addCallback(zkClient.delete(liveNodePath), new 
FutureCallback() {
+  @Override
+  public void onSuccess(String result) {
+Futures.addCallback(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
+thisCallback, 
Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+createFailure.addSuppressed(t);
+resultFuture.setException(createFailure);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+
+return resultFuture;
--- End diff --

Well, we do need that many levels of callback (create -> delete -> create) 
for the operation. Any suggestions on how to simplify it?


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174617783
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
--- End diff --

Ephemeral node won't go away immediate if the process die. It will stay 
there till ZK session timeout, which is typically multiple seconds. In the 
meantime, the next AM process may already be started by YARN, hence the new AM 
process will see the ephemeral node.


---


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399462#comment-16399462
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174617783
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
--- End diff --

Ephemeral node won't go away immediate if the process die. It will stay 
there till ZK session timeout, which is typically multiple seconds. In the 
meantime, the next AM process may already be started by YARN, hence the new AM 
process will see the ephemeral node.


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399162#comment-16399162
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174573795
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
+// Try to delete the node and recreate it
+LOG.info("Live node already exist. Deleting node {}{}", 
zkClient.getConnectString(), liveNodePath);
+Futures.addCallback(zkClient.delete(liveNodePath), new 
FutureCallback() {
+  @Override
+  public void onSuccess(String result) {
+Futures.addCallback(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
+thisCallback, 
Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+createFailure.addSuppressed(t);
+resultFuture.setException(createFailure);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+
+return resultFuture;
--- End diff --

while this code appears correct (after addressing my comment), the three 
nested levels of callback make it almost impossible to read. Is there some way 
to unwind this?


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399160#comment-16399160
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174571435
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
--- End diff --

do you need to return here?


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399164#comment-16399164
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174583711
  
--- Diff: 
twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
 ---
@@ -165,9 +168,47 @@ private ApplicationKafkaService(ZKClient zkClient, 
String kafkaZKConnect) {
 
 @Override
 protected void startUp() throws Exception {
-  ZKOperations.ignoreError(
-zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT),
-KeeperException.NodeExistsException.class, kafkaZKPath).get();
+  // Create the ZK node for Kafka to use. If the node already exists, 
delete it to make sure there is
+  // no left over content from previous AM attempt.
+  final SettableOperationFuture completion = 
SettableOperationFuture.create(kafkaZKPath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+  LOG.info("Preparing Kafka ZK path {}{}", 
zkClient.getConnectString(), kafkaZKPath);
+  Futures.addCallback(zkClient.create(kafkaZKPath, null, 
CreateMode.PERSISTENT), new FutureCallback() {
+
+final FutureCallback thisCallback = this;
+
+@Override
+public void onSuccess(String result) {
+  completion.set(result);
+}
+
+@Override
+public void onFailure(final Throwable createFailure) {
+  if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+completion.setException(createFailure);
+  }
--- End diff --

return here?


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399163#comment-16399163
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174580690
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
 ---
@@ -159,30 +159,37 @@ public void changed(BrokerService brokerService) {
   }
 
   String newBrokerList = brokerService.getBrokerList();
-  if (newBrokerList.isEmpty()) {
-LOG.warn("Broker list is empty. No Kafka producer is created.");
-return;
-  }
 
+  // If there is no change, whether it is empty or not, just return
   if (Objects.equal(brokerList, newBrokerList)) {
 return;
   }
 
-  Properties props = new Properties();
-  props.put("metadata.broker.list", newBrokerList);
-  props.put("serializer.class", ByteBufferEncoder.class.getName());
-  props.put("key.serializer.class", IntegerEncoder.class.getName());
-  props.put("partitioner.class", IntegerPartitioner.class.getName());
-  props.put("request.required.acks", Integer.toString(ack.getAck()));
-  props.put("compression.codec", compression.getCodec());
+  Producer newProducer = null;
+  if (!newBrokerList.isEmpty()) {
+Properties props = new Properties();
+props.put("metadata.broker.list", newBrokerList);
+props.put("serializer.class", ByteBufferEncoder.class.getName());
+props.put("key.serializer.class", IntegerEncoder.class.getName());
+props.put("partitioner.class", IntegerPartitioner.class.getName());
+props.put("request.required.acks", Integer.toString(ack.getAck()));
+props.put("compression.codec", compression.getCodec());
+
+ProducerConfig config = new ProducerConfig(props);
+newProducer = new Producer<>(config);
+  }
 
-  ProducerConfig config = new ProducerConfig(props);
-  Producer oldProducer = producer.getAndSet(new 
Producer(config));
+  // If the broker list is empty, the producer will be set to null
+  Producer oldProducer = 
producer.getAndSet(newProducer);
   if (oldProducer != null) {
 oldProducer.close();
   }
 
-  LOG.info("Update Kafka producer broker list: {}", newBrokerList);
+  if (newBrokerList.isEmpty()) {
+LOG.warn("Empty Kafka producer broker list, publish will fail.");
--- End diff --

So when will this happen? If the AM dies (and its broker with it)? 


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399161#comment-16399161
 ] 

ASF GitHub Bot commented on TWILL-61:
-

Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174571807
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
--- End diff --

how can it leave an ephemeral node? Doesn't that mean there must be a 
zombie process holding on to that node?


> Second launch attempt of AM always failed
> -
>
> Key: TWILL-61
> URL: https://issues.apache.org/jira/browse/TWILL-61
> Project: Apache Twill
>  Issue Type: Bug
>  Components: yarn
>Reporter: Terence Yim
>Assignee: Terence Yim
>Priority: Major
> Fix For: 0.5.0-incubating
>
>
> YARN would make multiple attempts to launch an application. Currently second 
> or above attempts would always fail due to creation of /runId/state node in 
> ZK fail (node exists) because runId is generated on client side and doesn't 
> change between attempts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174580690
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
 ---
@@ -159,30 +159,37 @@ public void changed(BrokerService brokerService) {
   }
 
   String newBrokerList = brokerService.getBrokerList();
-  if (newBrokerList.isEmpty()) {
-LOG.warn("Broker list is empty. No Kafka producer is created.");
-return;
-  }
 
+  // If there is no change, whether it is empty or not, just return
   if (Objects.equal(brokerList, newBrokerList)) {
 return;
   }
 
-  Properties props = new Properties();
-  props.put("metadata.broker.list", newBrokerList);
-  props.put("serializer.class", ByteBufferEncoder.class.getName());
-  props.put("key.serializer.class", IntegerEncoder.class.getName());
-  props.put("partitioner.class", IntegerPartitioner.class.getName());
-  props.put("request.required.acks", Integer.toString(ack.getAck()));
-  props.put("compression.codec", compression.getCodec());
+  Producer newProducer = null;
+  if (!newBrokerList.isEmpty()) {
+Properties props = new Properties();
+props.put("metadata.broker.list", newBrokerList);
+props.put("serializer.class", ByteBufferEncoder.class.getName());
+props.put("key.serializer.class", IntegerEncoder.class.getName());
+props.put("partitioner.class", IntegerPartitioner.class.getName());
+props.put("request.required.acks", Integer.toString(ack.getAck()));
+props.put("compression.codec", compression.getCodec());
+
+ProducerConfig config = new ProducerConfig(props);
+newProducer = new Producer<>(config);
+  }
 
-  ProducerConfig config = new ProducerConfig(props);
-  Producer oldProducer = producer.getAndSet(new 
Producer(config));
+  // If the broker list is empty, the producer will be set to null
+  Producer oldProducer = 
producer.getAndSet(newProducer);
   if (oldProducer != null) {
 oldProducer.close();
   }
 
-  LOG.info("Update Kafka producer broker list: {}", newBrokerList);
+  if (newBrokerList.isEmpty()) {
+LOG.warn("Empty Kafka producer broker list, publish will fail.");
--- End diff --

So when will this happen? If the AM dies (and its broker with it)? 


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174571807
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
--- End diff --

how can it leave an ephemeral node? Doesn't that mean there must be a 
zombie process holding on to that node?


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174571435
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
--- End diff --

do you need to return here?


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174583711
  
--- Diff: 
twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
 ---
@@ -165,9 +168,47 @@ private ApplicationKafkaService(ZKClient zkClient, 
String kafkaZKConnect) {
 
 @Override
 protected void startUp() throws Exception {
-  ZKOperations.ignoreError(
-zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT),
-KeeperException.NodeExistsException.class, kafkaZKPath).get();
+  // Create the ZK node for Kafka to use. If the node already exists, 
delete it to make sure there is
+  // no left over content from previous AM attempt.
+  final SettableOperationFuture completion = 
SettableOperationFuture.create(kafkaZKPath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+  LOG.info("Preparing Kafka ZK path {}{}", 
zkClient.getConnectString(), kafkaZKPath);
+  Futures.addCallback(zkClient.create(kafkaZKPath, null, 
CreateMode.PERSISTENT), new FutureCallback() {
+
+final FutureCallback thisCallback = this;
+
+@Override
+public void onSuccess(String result) {
+  completion.set(result);
+}
+
+@Override
+public void onFailure(final Throwable createFailure) {
+  if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+completion.setException(createFailure);
+  }
--- End diff --

return here?


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174573795
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
+// Try to delete the node and recreate it
+LOG.info("Live node already exist. Deleting node {}{}", 
zkClient.getConnectString(), liveNodePath);
+Futures.addCallback(zkClient.delete(liveNodePath), new 
FutureCallback() {
+  @Override
+  public void onSuccess(String result) {
+Futures.addCallback(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
+thisCallback, 
Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+createFailure.addSuppressed(t);
+resultFuture.setException(createFailure);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+
+return resultFuture;
--- End diff --

while this code appears correct (after addressing my comment), the three 
nested levels of callback make it almost impossible to read. Is there some way 
to unwind this?


---