Github user anew commented on a diff in the pull request:
https://github.com/apache/twill/pull/67#discussion_r174583711
--- Diff:
twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
---
@@ -165,9 +168,47 @@ private ApplicationKafkaService(ZKClient zkClient,
String kafkaZKConnect) {
@Override
protected void startUp() throws Exception {
- ZKOperations.ignoreError(
- zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT),
- KeeperException.NodeExistsException.class, kafkaZKPath).get();
+ // Create the ZK node for Kafka to use. If the node already exists,
delete it to make sure there is
+ // no left over content from previous AM attempt.
+ final SettableOperationFuture<String> completion =
SettableOperationFuture.create(kafkaZKPath,
+
Threads.SAME_THREAD_EXECUTOR);
+ LOG.info("Preparing Kafka ZK path {}{}",
zkClient.getConnectString(), kafkaZKPath);
+ Futures.addCallback(zkClient.create(kafkaZKPath, null,
CreateMode.PERSISTENT), new FutureCallback<String>() {
+
+ final FutureCallback<String> thisCallback = this;
+
+ @Override
+ public void onSuccess(String result) {
+ completion.set(result);
+ }
+
+ @Override
+ public void onFailure(final Throwable createFailure) {
+ if (!(createFailure instanceof
KeeperException.NodeExistsException)) {
+ completion.setException(createFailure);
+ }
--- End diff --
return here?
---