Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/67#discussion_r174626043 --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java --- @@ -159,30 +159,37 @@ public void changed(BrokerService brokerService) { } String newBrokerList = brokerService.getBrokerList(); - if (newBrokerList.isEmpty()) { - LOG.warn("Broker list is empty. No Kafka producer is created."); - return; - } + // If there is no change, whether it is empty or not, just return if (Objects.equal(brokerList, newBrokerList)) { return; } - Properties props = new Properties(); - props.put("metadata.broker.list", newBrokerList); - props.put("serializer.class", ByteBufferEncoder.class.getName()); - props.put("key.serializer.class", IntegerEncoder.class.getName()); - props.put("partitioner.class", IntegerPartitioner.class.getName()); - props.put("request.required.acks", Integer.toString(ack.getAck())); - props.put("compression.codec", compression.getCodec()); + Producer<Integer, ByteBuffer> newProducer = null; + if (!newBrokerList.isEmpty()) { + Properties props = new Properties(); + props.put("metadata.broker.list", newBrokerList); + props.put("serializer.class", ByteBufferEncoder.class.getName()); + props.put("key.serializer.class", IntegerEncoder.class.getName()); + props.put("partitioner.class", IntegerPartitioner.class.getName()); + props.put("request.required.acks", Integer.toString(ack.getAck())); + props.put("compression.codec", compression.getCodec()); + + ProducerConfig config = new ProducerConfig(props); + newProducer = new Producer<>(config); + } - ProducerConfig config = new ProducerConfig(props); - Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(new Producer<Integer, ByteBuffer>(config)); + // If the broker list is empty, the producer will be set to null + Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(newProducer); if (oldProducer != null) { oldProducer.close(); } - LOG.info("Update Kafka producer broker list: {}", newBrokerList); + if (newBrokerList.isEmpty()) { + LOG.warn("Empty Kafka producer broker list, publish will fail."); --- End diff -- Yes
---