[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174644467 --- Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java --- @@ -170,45 +167,7 @@ private ApplicationKafkaService(ZKClient zkClient, String kafkaZKConnect) { protected void startUp() throws Exception { // Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is // no left over content from previous AM attempt. - final SettableOperationFuture completion = SettableOperationFuture.create(kafkaZKPath, - Threads.SAME_THREAD_EXECUTOR); - LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath); --- End diff -- oh yeah, accidentally removed. ---
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399632#comment-16399632 ] ASF GitHub Bot commented on TWILL-61: - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174644467 --- Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java --- @@ -170,45 +167,7 @@ private ApplicationKafkaService(ZKClient zkClient, String kafkaZKConnect) { protected void startUp() throws Exception { // Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is // no left over content from previous AM attempt. - final SettableOperationFuture completion = SettableOperationFuture.create(kafkaZKPath, - Threads.SAME_THREAD_EXECUTOR); - LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath); --- End diff -- oh yeah, accidentally removed. > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399626#comment-16399626 ] ASF GitHub Bot commented on TWILL-61: - Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174643542 --- Diff: twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java --- @@ -281,6 +286,73 @@ public void onFailure(Throwable t) { return resultFuture; } + /** + * Creates a ZK node of the given path. If the node already exists, deletion of the node (recursively) will happen + * and the creation will be retried. + */ + public static OperationFuture createDeleteIfExists(final ZKClient zkClient, final String path, + @Nullable final byte[] data, final CreateMode createMode, + final boolean createParent, final ACL...acls) { +final SettableOperationFuture resultFuture = SettableOperationFuture.create(path, + Threads.SAME_THREAD_EXECUTOR); +final List createACLs = acls.length == 0 ? ZooDefs.Ids.OPEN_ACL_UNSAFE : Arrays.asList(acls); +createNode(zkClient, path, data, createMode, createParent, createACLs, new FutureCallback() { + + final FutureCallback createCallback = this; + + @Override + public void onSuccess(String result) { +// Create succeeded, just set the result to the resultFuture +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +// If create failed not because of the NodeExistsException, just set the exception to the result future +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); + return; +} + +// Try to delete the path +LOG.info("Node {}{} already exists. Deleting it and retry creation", zkClient.getConnectString(), path); +Futures.addCallback(recursiveDelete(zkClient, path), new FutureCallback() { + @Override + public void onSuccess(String result) { +// If delete succeeded, perform the creation again. +createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback); + } + + @Override + public void onFailure(Throwable t) { +// If deletion failed because of NoNodeException, fail the result operation future +if (!(t instanceof KeeperException.NoNodeException)) { + createFailure.addSuppressed(t); + resultFuture.setException(createFailure); + return; +} + +// If can't delete because the node no longer exists, just go ahead and recreate the node +createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback); + } +}, Threads.SAME_THREAD_EXECUTOR); + } +}); + +return resultFuture; + } + + /** + * Private helper method to create a ZK node based on the parameter. The result of the creation is always + * communicate via the provided {@link FutureCallback}. + */ + private static void createNode(ZKClient zkClient, String path, @Nullable byte[] data, + CreateMode createMode, boolean createParent, + Iterable acls, FutureCallback callback) { +Futures.addCallback(zkClient.create(path, data, createMode, createParent, acls), +callback, Threads.SAME_THREAD_EXECUTOR); + } --- End diff -- yes, easier to understand and extracted into separate methods. Like it much better now > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399625#comment-16399625 ] ASF GitHub Bot commented on TWILL-61: - Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174643070 --- Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java --- @@ -170,45 +167,7 @@ private ApplicationKafkaService(ZKClient zkClient, String kafkaZKConnect) { protected void startUp() throws Exception { // Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is // no left over content from previous AM attempt. - final SettableOperationFuture completion = SettableOperationFuture.create(kafkaZKPath, - Threads.SAME_THREAD_EXECUTOR); - LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath); --- End diff -- maybe keep the log message? > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...
Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174643542 --- Diff: twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java --- @@ -281,6 +286,73 @@ public void onFailure(Throwable t) { return resultFuture; } + /** + * Creates a ZK node of the given path. If the node already exists, deletion of the node (recursively) will happen + * and the creation will be retried. + */ + public static OperationFuture createDeleteIfExists(final ZKClient zkClient, final String path, + @Nullable final byte[] data, final CreateMode createMode, + final boolean createParent, final ACL...acls) { +final SettableOperationFuture resultFuture = SettableOperationFuture.create(path, + Threads.SAME_THREAD_EXECUTOR); +final List createACLs = acls.length == 0 ? ZooDefs.Ids.OPEN_ACL_UNSAFE : Arrays.asList(acls); +createNode(zkClient, path, data, createMode, createParent, createACLs, new FutureCallback() { + + final FutureCallback createCallback = this; + + @Override + public void onSuccess(String result) { +// Create succeeded, just set the result to the resultFuture +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +// If create failed not because of the NodeExistsException, just set the exception to the result future +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); + return; +} + +// Try to delete the path +LOG.info("Node {}{} already exists. Deleting it and retry creation", zkClient.getConnectString(), path); +Futures.addCallback(recursiveDelete(zkClient, path), new FutureCallback() { + @Override + public void onSuccess(String result) { +// If delete succeeded, perform the creation again. +createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback); + } + + @Override + public void onFailure(Throwable t) { +// If deletion failed because of NoNodeException, fail the result operation future +if (!(t instanceof KeeperException.NoNodeException)) { + createFailure.addSuppressed(t); + resultFuture.setException(createFailure); + return; +} + +// If can't delete because the node no longer exists, just go ahead and recreate the node +createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback); + } +}, Threads.SAME_THREAD_EXECUTOR); + } +}); + +return resultFuture; + } + + /** + * Private helper method to create a ZK node based on the parameter. The result of the creation is always + * communicate via the provided {@link FutureCallback}. + */ + private static void createNode(ZKClient zkClient, String path, @Nullable byte[] data, + CreateMode createMode, boolean createParent, + Iterable acls, FutureCallback callback) { +Futures.addCallback(zkClient.create(path, data, createMode, createParent, acls), +callback, Threads.SAME_THREAD_EXECUTOR); + } --- End diff -- yes, easier to understand and extracted into separate methods. Like it much better now ---
[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...
Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174643070 --- Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java --- @@ -170,45 +167,7 @@ private ApplicationKafkaService(ZKClient zkClient, String kafkaZKConnect) { protected void startUp() throws Exception { // Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is // no left over content from previous AM attempt. - final SettableOperationFuture completion = SettableOperationFuture.create(kafkaZKPath, - Threads.SAME_THREAD_EXECUTOR); - LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath); --- End diff -- maybe keep the log message? ---
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399593#comment-16399593 ] ASF GitHub Bot commented on TWILL-61: - Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174639182 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} + +// If the node already exists, it is due to previous run attempt that left an ephemeral node. +// Try to delete the node and recreate it +LOG.info("Live node already exist. Deleting node {}{}", zkClient.getConnectString(), liveNodePath); +Futures.addCallback(zkClient.delete(liveNodePath), new FutureCallback() { + @Override + public void onSuccess(String result) { +Futures.addCallback(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), +thisCallback, Threads.SAME_THREAD_EXECUTOR); + } + + @Override + public void onFailure(Throwable t) { +createFailure.addSuppressed(t); +resultFuture.setException(createFailure); + } +}, Threads.SAME_THREAD_EXECUTOR); + } +}, Threads.SAME_THREAD_EXECUTOR); + +return resultFuture; --- End diff -- I was thinking it does not have to be async. Create the ZK node, get the future. If success, done. If not delete the ZK node, get the future. If failure, throw. Else try again. But maybe that would be equally complex? > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...
Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174639182 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} + +// If the node already exists, it is due to previous run attempt that left an ephemeral node. +// Try to delete the node and recreate it +LOG.info("Live node already exist. Deleting node {}{}", zkClient.getConnectString(), liveNodePath); +Futures.addCallback(zkClient.delete(liveNodePath), new FutureCallback() { + @Override + public void onSuccess(String result) { +Futures.addCallback(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), +thisCallback, Threads.SAME_THREAD_EXECUTOR); + } + + @Override + public void onFailure(Throwable t) { +createFailure.addSuppressed(t); +resultFuture.setException(createFailure); + } +}, Threads.SAME_THREAD_EXECUTOR); + } +}, Threads.SAME_THREAD_EXECUTOR); + +return resultFuture; --- End diff -- I was thinking it does not have to be async. Create the ZK node, get the future. If success, done. If not delete the ZK node, get the future. If failure, throw. Else try again. But maybe that would be equally complex? ---
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399591#comment-16399591 ] ASF GitHub Bot commented on TWILL-61: - Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174638923 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} + +// If the node already exists, it is due to previous run attempt that left an ephemeral node. --- End diff -- got it > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...
Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174638923 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} + +// If the node already exists, it is due to previous run attempt that left an ephemeral node. --- End diff -- got it ---
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399531#comment-16399531 ] ASF GitHub Bot commented on TWILL-61: - Github user chtyim commented on the issue: https://github.com/apache/twill/pull/67 I've addressed the comments and fixed one more issue (one deletion failure, if the node not exist, we can just go ahead and recreate the node instead of failing). Also refactored the callback code a bit to try to make it cleaner. > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twill issue #67: (TWILL-61) Fix to allow higher attempts to relaunch the app...
Github user chtyim commented on the issue: https://github.com/apache/twill/pull/67 I've addressed the comments and fixed one more issue (one deletion failure, if the node not exist, we can just go ahead and recreate the node instead of failing). Also refactored the callback code a bit to try to make it cleaner. ---
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399527#comment-16399527 ] ASF GitHub Bot commented on TWILL-61: - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174626043 --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java --- @@ -159,30 +159,37 @@ public void changed(BrokerService brokerService) { } String newBrokerList = brokerService.getBrokerList(); - if (newBrokerList.isEmpty()) { -LOG.warn("Broker list is empty. No Kafka producer is created."); -return; - } + // If there is no change, whether it is empty or not, just return if (Objects.equal(brokerList, newBrokerList)) { return; } - Properties props = new Properties(); - props.put("metadata.broker.list", newBrokerList); - props.put("serializer.class", ByteBufferEncoder.class.getName()); - props.put("key.serializer.class", IntegerEncoder.class.getName()); - props.put("partitioner.class", IntegerPartitioner.class.getName()); - props.put("request.required.acks", Integer.toString(ack.getAck())); - props.put("compression.codec", compression.getCodec()); + Producer newProducer = null; + if (!newBrokerList.isEmpty()) { +Properties props = new Properties(); +props.put("metadata.broker.list", newBrokerList); +props.put("serializer.class", ByteBufferEncoder.class.getName()); +props.put("key.serializer.class", IntegerEncoder.class.getName()); +props.put("partitioner.class", IntegerPartitioner.class.getName()); +props.put("request.required.acks", Integer.toString(ack.getAck())); +props.put("compression.codec", compression.getCodec()); + +ProducerConfig config = new ProducerConfig(props); +newProducer = new Producer<>(config); + } - ProducerConfig config = new ProducerConfig(props); - Producer oldProducer = producer.getAndSet(new Producer(config)); + // If the broker list is empty, the producer will be set to null + Producer oldProducer = producer.getAndSet(newProducer); if (oldProducer != null) { oldProducer.close(); } - LOG.info("Update Kafka producer broker list: {}", newBrokerList); + if (newBrokerList.isEmpty()) { +LOG.warn("Empty Kafka producer broker list, publish will fail."); --- End diff -- Yes > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174626043 --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java --- @@ -159,30 +159,37 @@ public void changed(BrokerService brokerService) { } String newBrokerList = brokerService.getBrokerList(); - if (newBrokerList.isEmpty()) { -LOG.warn("Broker list is empty. No Kafka producer is created."); -return; - } + // If there is no change, whether it is empty or not, just return if (Objects.equal(brokerList, newBrokerList)) { return; } - Properties props = new Properties(); - props.put("metadata.broker.list", newBrokerList); - props.put("serializer.class", ByteBufferEncoder.class.getName()); - props.put("key.serializer.class", IntegerEncoder.class.getName()); - props.put("partitioner.class", IntegerPartitioner.class.getName()); - props.put("request.required.acks", Integer.toString(ack.getAck())); - props.put("compression.codec", compression.getCodec()); + Producer newProducer = null; + if (!newBrokerList.isEmpty()) { +Properties props = new Properties(); +props.put("metadata.broker.list", newBrokerList); +props.put("serializer.class", ByteBufferEncoder.class.getName()); +props.put("key.serializer.class", IntegerEncoder.class.getName()); +props.put("partitioner.class", IntegerPartitioner.class.getName()); +props.put("request.required.acks", Integer.toString(ack.getAck())); +props.put("compression.codec", compression.getCodec()); + +ProducerConfig config = new ProducerConfig(props); +newProducer = new Producer<>(config); + } - ProducerConfig config = new ProducerConfig(props); - Producer oldProducer = producer.getAndSet(new Producer(config)); + // If the broker list is empty, the producer will be set to null + Producer oldProducer = producer.getAndSet(newProducer); if (oldProducer != null) { oldProducer.close(); } - LOG.info("Update Kafka producer broker list: {}", newBrokerList); + if (newBrokerList.isEmpty()) { +LOG.warn("Empty Kafka producer broker list, publish will fail."); --- End diff -- Yes ---
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399470#comment-16399470 ] ASF GitHub Bot commented on TWILL-61: - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174618874 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} --- End diff -- That's right. Missed it. > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174618874 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} --- End diff -- That's right. Missed it. ---
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399466#comment-16399466 ] ASF GitHub Bot commented on TWILL-61: - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174618395 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} + +// If the node already exists, it is due to previous run attempt that left an ephemeral node. +// Try to delete the node and recreate it +LOG.info("Live node already exist. Deleting node {}{}", zkClient.getConnectString(), liveNodePath); +Futures.addCallback(zkClient.delete(liveNodePath), new FutureCallback() { + @Override + public void onSuccess(String result) { +Futures.addCallback(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), +thisCallback, Threads.SAME_THREAD_EXECUTOR); + } + + @Override + public void onFailure(Throwable t) { +createFailure.addSuppressed(t); +resultFuture.setException(createFailure); + } +}, Threads.SAME_THREAD_EXECUTOR); + } +}, Threads.SAME_THREAD_EXECUTOR); + +return resultFuture; --- End diff -- I can pull the common code between this and the ApplicationMasterMain class into a util function. But still, inside the util function, there would be three callbacks. > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174618395 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} + +// If the node already exists, it is due to previous run attempt that left an ephemeral node. +// Try to delete the node and recreate it +LOG.info("Live node already exist. Deleting node {}{}", zkClient.getConnectString(), liveNodePath); +Futures.addCallback(zkClient.delete(liveNodePath), new FutureCallback() { + @Override + public void onSuccess(String result) { +Futures.addCallback(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), +thisCallback, Threads.SAME_THREAD_EXECUTOR); + } + + @Override + public void onFailure(Throwable t) { +createFailure.addSuppressed(t); +resultFuture.setException(createFailure); + } +}, Threads.SAME_THREAD_EXECUTOR); + } +}, Threads.SAME_THREAD_EXECUTOR); + +return resultFuture; --- End diff -- I can pull the common code between this and the ApplicationMasterMain class into a util function. But still, inside the util function, there would be three callbacks. ---
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399465#comment-16399465 ] ASF GitHub Bot commented on TWILL-61: - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174618087 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} + +// If the node already exists, it is due to previous run attempt that left an ephemeral node. +// Try to delete the node and recreate it +LOG.info("Live node already exist. Deleting node {}{}", zkClient.getConnectString(), liveNodePath); +Futures.addCallback(zkClient.delete(liveNodePath), new FutureCallback() { + @Override + public void onSuccess(String result) { +Futures.addCallback(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), +thisCallback, Threads.SAME_THREAD_EXECUTOR); + } + + @Override + public void onFailure(Throwable t) { +createFailure.addSuppressed(t); +resultFuture.setException(createFailure); + } +}, Threads.SAME_THREAD_EXECUTOR); + } +}, Threads.SAME_THREAD_EXECUTOR); + +return resultFuture; --- End diff -- Well, we do need that many levels of callback (create -> delete -> create) for the operation. Any suggestions on how to simplify it? > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174618087 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} + +// If the node already exists, it is due to previous run attempt that left an ephemeral node. +// Try to delete the node and recreate it +LOG.info("Live node already exist. Deleting node {}{}", zkClient.getConnectString(), liveNodePath); +Futures.addCallback(zkClient.delete(liveNodePath), new FutureCallback() { + @Override + public void onSuccess(String result) { +Futures.addCallback(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), +thisCallback, Threads.SAME_THREAD_EXECUTOR); + } + + @Override + public void onFailure(Throwable t) { +createFailure.addSuppressed(t); +resultFuture.setException(createFailure); + } +}, Threads.SAME_THREAD_EXECUTOR); + } +}, Threads.SAME_THREAD_EXECUTOR); + +return resultFuture; --- End diff -- Well, we do need that many levels of callback (create -> delete -> create) for the operation. Any suggestions on how to simplify it? ---
[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174617783 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} + +// If the node already exists, it is due to previous run attempt that left an ephemeral node. --- End diff -- Ephemeral node won't go away immediate if the process die. It will stay there till ZK session timeout, which is typically multiple seconds. In the meantime, the next AM process may already be started by YARN, hence the new AM process will see the ephemeral node. ---
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399462#comment-16399462 ] ASF GitHub Bot commented on TWILL-61: - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174617783 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} + +// If the node already exists, it is due to previous run attempt that left an ephemeral node. --- End diff -- Ephemeral node won't go away immediate if the process die. It will stay there till ZK session timeout, which is typically multiple seconds. In the meantime, the next AM process may already be started by YARN, hence the new AM process will see the ephemeral node. > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399162#comment-16399162 ] ASF GitHub Bot commented on TWILL-61: - Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174573795 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} + +// If the node already exists, it is due to previous run attempt that left an ephemeral node. +// Try to delete the node and recreate it +LOG.info("Live node already exist. Deleting node {}{}", zkClient.getConnectString(), liveNodePath); +Futures.addCallback(zkClient.delete(liveNodePath), new FutureCallback() { + @Override + public void onSuccess(String result) { +Futures.addCallback(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), +thisCallback, Threads.SAME_THREAD_EXECUTOR); + } + + @Override + public void onFailure(Throwable t) { +createFailure.addSuppressed(t); +resultFuture.setException(createFailure); + } +}, Threads.SAME_THREAD_EXECUTOR); + } +}, Threads.SAME_THREAD_EXECUTOR); + +return resultFuture; --- End diff -- while this code appears correct (after addressing my comment), the three nested levels of callback make it almost impossible to read. Is there some way to unwind this? > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399160#comment-16399160 ] ASF GitHub Bot commented on TWILL-61: - Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174571435 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} --- End diff -- do you need to return here? > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399164#comment-16399164 ] ASF GitHub Bot commented on TWILL-61: - Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174583711 --- Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java --- @@ -165,9 +168,47 @@ private ApplicationKafkaService(ZKClient zkClient, String kafkaZKConnect) { @Override protected void startUp() throws Exception { - ZKOperations.ignoreError( -zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT), -KeeperException.NodeExistsException.class, kafkaZKPath).get(); + // Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is + // no left over content from previous AM attempt. + final SettableOperationFuture completion = SettableOperationFuture.create(kafkaZKPath, + Threads.SAME_THREAD_EXECUTOR); + LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath); + Futures.addCallback(zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT), new FutureCallback() { + +final FutureCallback thisCallback = this; + +@Override +public void onSuccess(String result) { + completion.set(result); +} + +@Override +public void onFailure(final Throwable createFailure) { + if (!(createFailure instanceof KeeperException.NodeExistsException)) { +completion.setException(createFailure); + } --- End diff -- return here? > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399163#comment-16399163 ] ASF GitHub Bot commented on TWILL-61: - Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174580690 --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java --- @@ -159,30 +159,37 @@ public void changed(BrokerService brokerService) { } String newBrokerList = brokerService.getBrokerList(); - if (newBrokerList.isEmpty()) { -LOG.warn("Broker list is empty. No Kafka producer is created."); -return; - } + // If there is no change, whether it is empty or not, just return if (Objects.equal(brokerList, newBrokerList)) { return; } - Properties props = new Properties(); - props.put("metadata.broker.list", newBrokerList); - props.put("serializer.class", ByteBufferEncoder.class.getName()); - props.put("key.serializer.class", IntegerEncoder.class.getName()); - props.put("partitioner.class", IntegerPartitioner.class.getName()); - props.put("request.required.acks", Integer.toString(ack.getAck())); - props.put("compression.codec", compression.getCodec()); + Producer newProducer = null; + if (!newBrokerList.isEmpty()) { +Properties props = new Properties(); +props.put("metadata.broker.list", newBrokerList); +props.put("serializer.class", ByteBufferEncoder.class.getName()); +props.put("key.serializer.class", IntegerEncoder.class.getName()); +props.put("partitioner.class", IntegerPartitioner.class.getName()); +props.put("request.required.acks", Integer.toString(ack.getAck())); +props.put("compression.codec", compression.getCodec()); + +ProducerConfig config = new ProducerConfig(props); +newProducer = new Producer<>(config); + } - ProducerConfig config = new ProducerConfig(props); - Producer oldProducer = producer.getAndSet(new Producer(config)); + // If the broker list is empty, the producer will be set to null + Producer oldProducer = producer.getAndSet(newProducer); if (oldProducer != null) { oldProducer.close(); } - LOG.info("Update Kafka producer broker list: {}", newBrokerList); + if (newBrokerList.isEmpty()) { +LOG.warn("Empty Kafka producer broker list, publish will fail."); --- End diff -- So when will this happen? If the AM dies (and its broker with it)? > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TWILL-61) Second launch attempt of AM always failed
[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399161#comment-16399161 ] ASF GitHub Bot commented on TWILL-61: - Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174571807 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} + +// If the node already exists, it is due to previous run attempt that left an ephemeral node. --- End diff -- how can it leave an ephemeral node? Doesn't that mean there must be a zombie process holding on to that node? > Second launch attempt of AM always failed > - > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn >Reporter: Terence Yim >Assignee: Terence Yim >Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...
Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174580690 --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java --- @@ -159,30 +159,37 @@ public void changed(BrokerService brokerService) { } String newBrokerList = brokerService.getBrokerList(); - if (newBrokerList.isEmpty()) { -LOG.warn("Broker list is empty. No Kafka producer is created."); -return; - } + // If there is no change, whether it is empty or not, just return if (Objects.equal(brokerList, newBrokerList)) { return; } - Properties props = new Properties(); - props.put("metadata.broker.list", newBrokerList); - props.put("serializer.class", ByteBufferEncoder.class.getName()); - props.put("key.serializer.class", IntegerEncoder.class.getName()); - props.put("partitioner.class", IntegerPartitioner.class.getName()); - props.put("request.required.acks", Integer.toString(ack.getAck())); - props.put("compression.codec", compression.getCodec()); + Producer newProducer = null; + if (!newBrokerList.isEmpty()) { +Properties props = new Properties(); +props.put("metadata.broker.list", newBrokerList); +props.put("serializer.class", ByteBufferEncoder.class.getName()); +props.put("key.serializer.class", IntegerEncoder.class.getName()); +props.put("partitioner.class", IntegerPartitioner.class.getName()); +props.put("request.required.acks", Integer.toString(ack.getAck())); +props.put("compression.codec", compression.getCodec()); + +ProducerConfig config = new ProducerConfig(props); +newProducer = new Producer<>(config); + } - ProducerConfig config = new ProducerConfig(props); - Producer oldProducer = producer.getAndSet(new Producer(config)); + // If the broker list is empty, the producer will be set to null + Producer oldProducer = producer.getAndSet(newProducer); if (oldProducer != null) { oldProducer.close(); } - LOG.info("Update Kafka producer broker list: {}", newBrokerList); + if (newBrokerList.isEmpty()) { +LOG.warn("Empty Kafka producer broker list, publish will fail."); --- End diff -- So when will this happen? If the AM dies (and its broker with it)? ---
[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...
Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174571807 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} + +// If the node already exists, it is due to previous run attempt that left an ephemeral node. --- End diff -- how can it leave an ephemeral node? Doesn't that mean there must be a zombie process holding on to that node? ---
[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...
Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174571435 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} --- End diff -- do you need to return here? ---
[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...
Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174583711 --- Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java --- @@ -165,9 +168,47 @@ private ApplicationKafkaService(ZKClient zkClient, String kafkaZKConnect) { @Override protected void startUp() throws Exception { - ZKOperations.ignoreError( -zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT), -KeeperException.NodeExistsException.class, kafkaZKPath).get(); + // Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is + // no left over content from previous AM attempt. + final SettableOperationFuture completion = SettableOperationFuture.create(kafkaZKPath, + Threads.SAME_THREAD_EXECUTOR); + LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath); + Futures.addCallback(zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT), new FutureCallback() { + +final FutureCallback thisCallback = this; + +@Override +public void onSuccess(String result) { + completion.set(result); +} + +@Override +public void onFailure(final Throwable createFailure) { + if (!(createFailure instanceof KeeperException.NodeExistsException)) { +completion.setException(createFailure); + } --- End diff -- return here? ---
[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...
Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174573795 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java --- @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception { return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture createLiveNode() { -String liveNodePath = getLiveNodePath(); +final String liveNodePath = getLiveNodePath(); LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); -return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); +final SettableOperationFuture resultFuture = SettableOperationFuture.create(liveNodePath, + Threads.SAME_THREAD_EXECUTOR); +OperationFuture createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL); +Futures.addCallback(createFuture, new FutureCallback() { + final FutureCallback thisCallback = this; + + @Override + public void onSuccess(String result) { +LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath); +resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { +if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); +} + +// If the node already exists, it is due to previous run attempt that left an ephemeral node. +// Try to delete the node and recreate it +LOG.info("Live node already exist. Deleting node {}{}", zkClient.getConnectString(), liveNodePath); +Futures.addCallback(zkClient.delete(liveNodePath), new FutureCallback() { + @Override + public void onSuccess(String result) { +Futures.addCallback(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), +thisCallback, Threads.SAME_THREAD_EXECUTOR); + } + + @Override + public void onFailure(Throwable t) { +createFailure.addSuppressed(t); +resultFuture.setException(createFailure); + } +}, Threads.SAME_THREAD_EXECUTOR); + } +}, Threads.SAME_THREAD_EXECUTOR); + +return resultFuture; --- End diff -- while this code appears correct (after addressing my comment), the three nested levels of callback make it almost impossible to read. Is there some way to unwind this? ---