Github user anew commented on a diff in the pull request:
https://github.com/apache/twill/pull/67#discussion_r174580690
--- Diff:
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
---
@@ -159,30 +159,37 @@ public void changed(BrokerService brokerService) {
}
String newBrokerList = brokerService.getBrokerList();
- if (newBrokerList.isEmpty()) {
- LOG.warn("Broker list is empty. No Kafka producer is created.");
- return;
- }
+ // If there is no change, whether it is empty or not, just return
if (Objects.equal(brokerList, newBrokerList)) {
return;
}
- Properties props = new Properties();
- props.put("metadata.broker.list", newBrokerList);
- props.put("serializer.class", ByteBufferEncoder.class.getName());
- props.put("key.serializer.class", IntegerEncoder.class.getName());
- props.put("partitioner.class", IntegerPartitioner.class.getName());
- props.put("request.required.acks", Integer.toString(ack.getAck()));
- props.put("compression.codec", compression.getCodec());
+ Producer<Integer, ByteBuffer> newProducer = null;
+ if (!newBrokerList.isEmpty()) {
+ Properties props = new Properties();
+ props.put("metadata.broker.list", newBrokerList);
+ props.put("serializer.class", ByteBufferEncoder.class.getName());
+ props.put("key.serializer.class", IntegerEncoder.class.getName());
+ props.put("partitioner.class", IntegerPartitioner.class.getName());
+ props.put("request.required.acks", Integer.toString(ack.getAck()));
+ props.put("compression.codec", compression.getCodec());
+
+ ProducerConfig config = new ProducerConfig(props);
+ newProducer = new Producer<>(config);
+ }
- ProducerConfig config = new ProducerConfig(props);
- Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(new
Producer<Integer, ByteBuffer>(config));
+ // If the broker list is empty, the producer will be set to null
+ Producer<Integer, ByteBuffer> oldProducer =
producer.getAndSet(newProducer);
if (oldProducer != null) {
oldProducer.close();
}
- LOG.info("Update Kafka producer broker list: {}", newBrokerList);
+ if (newBrokerList.isEmpty()) {
+ LOG.warn("Empty Kafka producer broker list, publish will fail.");
--- End diff --
So when will this happen? If the AM dies (and its broker with it)?
---