[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174644467
  
--- Diff: 
twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
 ---
@@ -170,45 +167,7 @@ private ApplicationKafkaService(ZKClient zkClient, 
String kafkaZKConnect) {
 protected void startUp() throws Exception {
   // Create the ZK node for Kafka to use. If the node already exists, 
delete it to make sure there is
   // no left over content from previous AM attempt.
-  final SettableOperationFuture completion = 
SettableOperationFuture.create(kafkaZKPath,
-   
 Threads.SAME_THREAD_EXECUTOR);
-  LOG.info("Preparing Kafka ZK path {}{}", 
zkClient.getConnectString(), kafkaZKPath);
--- End diff --

oh yeah, accidentally removed.


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174643542
  
--- Diff: 
twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java ---
@@ -281,6 +286,73 @@ public void onFailure(Throwable t) {
 return resultFuture;
   }
 
+  /**
+   * Creates a ZK node of the given path. If the node already exists, 
deletion of the node (recursively) will happen
+   * and the creation will be retried.
+   */
+  public static OperationFuture createDeleteIfExists(final 
ZKClient zkClient, final String path,
+ @Nullable 
final byte[] data, final CreateMode createMode,
+ final boolean 
createParent, final ACL...acls) {
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(path,
+   
 Threads.SAME_THREAD_EXECUTOR);
+final List createACLs = acls.length == 0 ? 
ZooDefs.Ids.OPEN_ACL_UNSAFE : Arrays.asList(acls);
+createNode(zkClient, path, data, createMode, createParent, createACLs, 
new FutureCallback() {
+
+  final FutureCallback createCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+// Create succeeded, just set the result to the resultFuture
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+// If create failed not because of the NodeExistsException, just 
set the exception to the result future
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+  return;
+}
+
+// Try to delete the path
+LOG.info("Node {}{} already exists. Deleting it and retry 
creation", zkClient.getConnectString(), path);
+Futures.addCallback(recursiveDelete(zkClient, path), new 
FutureCallback() {
+  @Override
+  public void onSuccess(String result) {
+// If delete succeeded, perform the creation again.
+createNode(zkClient, path, data, createMode, createParent, 
createACLs, createCallback);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+// If deletion failed because of NoNodeException, fail the 
result operation future
+if (!(t instanceof KeeperException.NoNodeException)) {
+  createFailure.addSuppressed(t);
+  resultFuture.setException(createFailure);
+  return;
+}
+
+// If can't delete because the node no longer exists, just go 
ahead and recreate the node
+createNode(zkClient, path, data, createMode, createParent, 
createACLs, createCallback);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+  }
+});
+
+return resultFuture;
+  }
+
+  /**
+   * Private helper method to create a ZK node based on the parameter. The 
result of the creation is always
+   * communicate via the provided {@link FutureCallback}.
+   */
+  private static void createNode(ZKClient zkClient, String path, @Nullable 
byte[] data,
+ CreateMode createMode, boolean 
createParent,
+ Iterable acls, 
FutureCallback callback) {
+Futures.addCallback(zkClient.create(path, data, createMode, 
createParent, acls),
+callback, Threads.SAME_THREAD_EXECUTOR);
+  }
--- End diff --

yes, easier to understand and extracted into separate methods. Like it much 
better now 


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174643070
  
--- Diff: 
twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
 ---
@@ -170,45 +167,7 @@ private ApplicationKafkaService(ZKClient zkClient, 
String kafkaZKConnect) {
 protected void startUp() throws Exception {
   // Create the ZK node for Kafka to use. If the node already exists, 
delete it to make sure there is
   // no left over content from previous AM attempt.
-  final SettableOperationFuture completion = 
SettableOperationFuture.create(kafkaZKPath,
-   
 Threads.SAME_THREAD_EXECUTOR);
-  LOG.info("Preparing Kafka ZK path {}{}", 
zkClient.getConnectString(), kafkaZKPath);
--- End diff --

maybe keep the log message?


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174639182
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
+// Try to delete the node and recreate it
+LOG.info("Live node already exist. Deleting node {}{}", 
zkClient.getConnectString(), liveNodePath);
+Futures.addCallback(zkClient.delete(liveNodePath), new 
FutureCallback() {
+  @Override
+  public void onSuccess(String result) {
+Futures.addCallback(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
+thisCallback, 
Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+createFailure.addSuppressed(t);
+resultFuture.setException(createFailure);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+
+return resultFuture;
--- End diff --

I was thinking it does not have to be async. Create the ZK node, get the 
future. If success, done. If not delete the ZK node, get the future. If 
failure, throw. Else try again. But maybe that would be equally complex?


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174638923
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
--- End diff --

got it


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174626043
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
 ---
@@ -159,30 +159,37 @@ public void changed(BrokerService brokerService) {
   }
 
   String newBrokerList = brokerService.getBrokerList();
-  if (newBrokerList.isEmpty()) {
-LOG.warn("Broker list is empty. No Kafka producer is created.");
-return;
-  }
 
+  // If there is no change, whether it is empty or not, just return
   if (Objects.equal(brokerList, newBrokerList)) {
 return;
   }
 
-  Properties props = new Properties();
-  props.put("metadata.broker.list", newBrokerList);
-  props.put("serializer.class", ByteBufferEncoder.class.getName());
-  props.put("key.serializer.class", IntegerEncoder.class.getName());
-  props.put("partitioner.class", IntegerPartitioner.class.getName());
-  props.put("request.required.acks", Integer.toString(ack.getAck()));
-  props.put("compression.codec", compression.getCodec());
+  Producer newProducer = null;
+  if (!newBrokerList.isEmpty()) {
+Properties props = new Properties();
+props.put("metadata.broker.list", newBrokerList);
+props.put("serializer.class", ByteBufferEncoder.class.getName());
+props.put("key.serializer.class", IntegerEncoder.class.getName());
+props.put("partitioner.class", IntegerPartitioner.class.getName());
+props.put("request.required.acks", Integer.toString(ack.getAck()));
+props.put("compression.codec", compression.getCodec());
+
+ProducerConfig config = new ProducerConfig(props);
+newProducer = new Producer<>(config);
+  }
 
-  ProducerConfig config = new ProducerConfig(props);
-  Producer oldProducer = producer.getAndSet(new 
Producer(config));
+  // If the broker list is empty, the producer will be set to null
+  Producer oldProducer = 
producer.getAndSet(newProducer);
   if (oldProducer != null) {
 oldProducer.close();
   }
 
-  LOG.info("Update Kafka producer broker list: {}", newBrokerList);
+  if (newBrokerList.isEmpty()) {
+LOG.warn("Empty Kafka producer broker list, publish will fail.");
--- End diff --

Yes


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174618874
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
--- End diff --

That's right. Missed it.


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174618395
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
+// Try to delete the node and recreate it
+LOG.info("Live node already exist. Deleting node {}{}", 
zkClient.getConnectString(), liveNodePath);
+Futures.addCallback(zkClient.delete(liveNodePath), new 
FutureCallback() {
+  @Override
+  public void onSuccess(String result) {
+Futures.addCallback(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
+thisCallback, 
Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+createFailure.addSuppressed(t);
+resultFuture.setException(createFailure);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+
+return resultFuture;
--- End diff --

I can pull the common code between this and the ApplicationMasterMain class 
into a util function. But still, inside the util function, there would be three 
callbacks.


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174618087
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
+// Try to delete the node and recreate it
+LOG.info("Live node already exist. Deleting node {}{}", 
zkClient.getConnectString(), liveNodePath);
+Futures.addCallback(zkClient.delete(liveNodePath), new 
FutureCallback() {
+  @Override
+  public void onSuccess(String result) {
+Futures.addCallback(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
+thisCallback, 
Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+createFailure.addSuppressed(t);
+resultFuture.setException(createFailure);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+
+return resultFuture;
--- End diff --

Well, we do need that many levels of callback (create -> delete -> create) 
for the operation. Any suggestions on how to simplify it?


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174617783
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
--- End diff --

Ephemeral node won't go away immediate if the process die. It will stay 
there till ZK session timeout, which is typically multiple seconds. In the 
meantime, the next AM process may already be started by YARN, hence the new AM 
process will see the ephemeral node.


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174580690
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
 ---
@@ -159,30 +159,37 @@ public void changed(BrokerService brokerService) {
   }
 
   String newBrokerList = brokerService.getBrokerList();
-  if (newBrokerList.isEmpty()) {
-LOG.warn("Broker list is empty. No Kafka producer is created.");
-return;
-  }
 
+  // If there is no change, whether it is empty or not, just return
   if (Objects.equal(brokerList, newBrokerList)) {
 return;
   }
 
-  Properties props = new Properties();
-  props.put("metadata.broker.list", newBrokerList);
-  props.put("serializer.class", ByteBufferEncoder.class.getName());
-  props.put("key.serializer.class", IntegerEncoder.class.getName());
-  props.put("partitioner.class", IntegerPartitioner.class.getName());
-  props.put("request.required.acks", Integer.toString(ack.getAck()));
-  props.put("compression.codec", compression.getCodec());
+  Producer newProducer = null;
+  if (!newBrokerList.isEmpty()) {
+Properties props = new Properties();
+props.put("metadata.broker.list", newBrokerList);
+props.put("serializer.class", ByteBufferEncoder.class.getName());
+props.put("key.serializer.class", IntegerEncoder.class.getName());
+props.put("partitioner.class", IntegerPartitioner.class.getName());
+props.put("request.required.acks", Integer.toString(ack.getAck()));
+props.put("compression.codec", compression.getCodec());
+
+ProducerConfig config = new ProducerConfig(props);
+newProducer = new Producer<>(config);
+  }
 
-  ProducerConfig config = new ProducerConfig(props);
-  Producer oldProducer = producer.getAndSet(new 
Producer(config));
+  // If the broker list is empty, the producer will be set to null
+  Producer oldProducer = 
producer.getAndSet(newProducer);
   if (oldProducer != null) {
 oldProducer.close();
   }
 
-  LOG.info("Update Kafka producer broker list: {}", newBrokerList);
+  if (newBrokerList.isEmpty()) {
+LOG.warn("Empty Kafka producer broker list, publish will fail.");
--- End diff --

So when will this happen? If the AM dies (and its broker with it)? 


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174571435
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
--- End diff --

do you need to return here?


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174571807
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
--- End diff --

how can it leave an ephemeral node? Doesn't that mean there must be a 
zombie process holding on to that node?


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174583711
  
--- Diff: 
twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
 ---
@@ -165,9 +168,47 @@ private ApplicationKafkaService(ZKClient zkClient, 
String kafkaZKConnect) {
 
 @Override
 protected void startUp() throws Exception {
-  ZKOperations.ignoreError(
-zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT),
-KeeperException.NodeExistsException.class, kafkaZKPath).get();
+  // Create the ZK node for Kafka to use. If the node already exists, 
delete it to make sure there is
+  // no left over content from previous AM attempt.
+  final SettableOperationFuture completion = 
SettableOperationFuture.create(kafkaZKPath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+  LOG.info("Preparing Kafka ZK path {}{}", 
zkClient.getConnectString(), kafkaZKPath);
+  Futures.addCallback(zkClient.create(kafkaZKPath, null, 
CreateMode.PERSISTENT), new FutureCallback() {
+
+final FutureCallback thisCallback = this;
+
+@Override
+public void onSuccess(String result) {
+  completion.set(result);
+}
+
+@Override
+public void onFailure(final Throwable createFailure) {
+  if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+completion.setException(createFailure);
+  }
--- End diff --

return here?


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-14 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/twill/pull/67#discussion_r174573795
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
@@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
 return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it 
will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the 
creation is done.
+   */
   private OperationFuture createLiveNode() {
-String liveNodePath = getLiveNodePath();
+final String liveNodePath = getLiveNodePath();
 LOG.info("Create live node {}{}", zkClient.getConnectString(), 
liveNodePath);
-return ZKOperations.ignoreError(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
-
KeeperException.NodeExistsException.class, liveNodePath);
+final SettableOperationFuture resultFuture = 
SettableOperationFuture.create(liveNodePath,
+   
 Threads.SAME_THREAD_EXECUTOR);
+OperationFuture createFuture = zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL);
+Futures.addCallback(createFuture, new FutureCallback() {
+  final FutureCallback thisCallback = this;
+
+  @Override
+  public void onSuccess(String result) {
+LOG.info("Live node created {}{}", zkClient.getConnectString(), 
liveNodePath);
+resultFuture.set(result);
+  }
+
+  @Override
+  public void onFailure(final Throwable createFailure) {
+if (!(createFailure instanceof 
KeeperException.NodeExistsException)) {
+  resultFuture.setException(createFailure);
+}
+
+// If the node already exists, it is due to previous run attempt 
that left an ephemeral node.
+// Try to delete the node and recreate it
+LOG.info("Live node already exist. Deleting node {}{}", 
zkClient.getConnectString(), liveNodePath);
+Futures.addCallback(zkClient.delete(liveNodePath), new 
FutureCallback() {
+  @Override
+  public void onSuccess(String result) {
+Futures.addCallback(zkClient.create(liveNodePath, 
serializeLiveNode(), CreateMode.EPHEMERAL),
+thisCallback, 
Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+createFailure.addSuppressed(t);
+resultFuture.setException(createFailure);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+  }
+}, Threads.SAME_THREAD_EXECUTOR);
+
+return resultFuture;
--- End diff --

while this code appears correct (after addressing my comment), the three 
nested levels of callback make it almost impossible to read. Is there some way 
to unwind this?


---


[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

2018-03-09 Thread chtyim
GitHub user chtyim opened a pull request:

https://github.com/apache/twill/pull/67

(TWILL-61) Fix to allow higher attempts to relaunch the app after the first 
attempt failed

- Delete the Kafka root zk node for the application if already exist
- Delete the AM instance zk node if already exist
- For runnables parent zk node, it is not an error if it already exist
- Enhance KafkaClient publisher / consumer to deal with Kafka cluster 
changes
  - When AM killed and restarted, the embedded Kafka will be running in 
different host and port

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chtyim/twill feature/TWILL-61

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/twill/pull/67.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #67


commit 9decca071a9e067b30be2150a6097463c939b6af
Author: Terence Yim 
Date:   2018-03-09T20:21:26Z

(TWILL-61) Fix to allow higher attempts to relaunch the app after the first 
attempt failed

- Delete the Kafka root zk node for the application if already exist
- Delete the AM instance zk node if already exist
- For runnables parent zk node, it is not an error if it already exist
- Enhance KafkaClient publisher / consumer to deal with Kafka cluster 
changes
  - When AM killed and restarted, the embedded Kafka will be running in 
different host and port




---