[ https://issues.apache.org/jira/browse/TWILL-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399163#comment-16399163 ]
ASF GitHub Bot commented on TWILL-61: ------------------------------------- Github user anew commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174580690 --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java --- @@ -159,30 +159,37 @@ public void changed(BrokerService brokerService) { } String newBrokerList = brokerService.getBrokerList(); - if (newBrokerList.isEmpty()) { - LOG.warn("Broker list is empty. No Kafka producer is created."); - return; - } + // If there is no change, whether it is empty or not, just return if (Objects.equal(brokerList, newBrokerList)) { return; } - Properties props = new Properties(); - props.put("metadata.broker.list", newBrokerList); - props.put("serializer.class", ByteBufferEncoder.class.getName()); - props.put("key.serializer.class", IntegerEncoder.class.getName()); - props.put("partitioner.class", IntegerPartitioner.class.getName()); - props.put("request.required.acks", Integer.toString(ack.getAck())); - props.put("compression.codec", compression.getCodec()); + Producer<Integer, ByteBuffer> newProducer = null; + if (!newBrokerList.isEmpty()) { + Properties props = new Properties(); + props.put("metadata.broker.list", newBrokerList); + props.put("serializer.class", ByteBufferEncoder.class.getName()); + props.put("key.serializer.class", IntegerEncoder.class.getName()); + props.put("partitioner.class", IntegerPartitioner.class.getName()); + props.put("request.required.acks", Integer.toString(ack.getAck())); + props.put("compression.codec", compression.getCodec()); + + ProducerConfig config = new ProducerConfig(props); + newProducer = new Producer<>(config); + } - ProducerConfig config = new ProducerConfig(props); - Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(new Producer<Integer, ByteBuffer>(config)); + // If the broker list is empty, the producer will be set to null + Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(newProducer); if (oldProducer != null) { oldProducer.close(); } - LOG.info("Update Kafka producer broker list: {}", newBrokerList); + if (newBrokerList.isEmpty()) { + LOG.warn("Empty Kafka producer broker list, publish will fail."); --- End diff -- So when will this happen? If the AM dies (and its broker with it)? > Second launch attempt of AM always failed > ----------------------------------------- > > Key: TWILL-61 > URL: https://issues.apache.org/jira/browse/TWILL-61 > Project: Apache Twill > Issue Type: Bug > Components: yarn > Reporter: Terence Yim > Assignee: Terence Yim > Priority: Major > Fix For: 0.5.0-incubating > > > YARN would make multiple attempts to launch an application. Currently second > or above attempts would always fail due to creation of /runId/state node in > ZK fail (node exists) because runId is generated on client side and doesn't > change between attempts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)