That is expected behavior. Typically there are multiple kafka brokers and so if one is down the client retries to send to a newly elected leader.
A send should not be considered successful until an ACK is received in the client from the kafka cluster. By default the ACK is async for performance but the send() teturns a future so you can make it appear to be a synchrounous publish easily. Examples are in the javadoc. -hans > On Jul 18, 2018, at 7:45 AM, jingguo yao <yaojing...@gmail.com> wrote: > > The asynchronous sending of a message returns no error even if the > Kafka server is not started. > > For all the following tests, the local Kafka server is stopped. First, > consider this piece of code: > > public static void main(String[] args) throws Exception { > Properties config = new Properties(); > config.put("client.id", InetAddress.getLocalHost().getHostName()); > config.put("bootstrap.servers", "localhost:9092"); > config.put("acks", "all"); > config.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > config.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > > try (Producer<String, String> producer = new KafkaProducer<>(config);) { > ProducerRecord<String, String> record = new > ProducerRecord<>("test-topic", null, "a-little-message"); > producer.send(record, new Callback() { > @Override > public void onCompletion(RecordMetadata metadata, Exception exception) { > if (exception != null) { > System.out.println("Exceptoin occurred!"); > exception.printStackTrace(System.out); > } > } > }); > } > } > > Running it will produce the following error: > > Exception occurred! > org.apache.kafka.common.errors.TimeoutException: Failed to update > metadata after 60000 ms. > > Second, consider this piece of code: > > public static void main(String[] args) throws Exception { > Properties config = new Properties(); > config.put("client.id", InetAddress.getLocalHost().getHostName()); > config.put("bootstrap.servers", "localhost:9092"); > config.put("acks", "all"); > config.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > config.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > > try (Producer<String, String> producer = new KafkaProducer<>(config);) { > ProducerRecord<String, String> record = new > ProducerRecord<>("test-topic", null, "a-little-message"); > System.out.println("Sending a message..."); > producer.send(record).get(); > System.out.println("Message sent"); > } > } > > Running it will produce the following error: > > Sending a message... > Exception in thread "main" java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Failed to update > metadata after 60000 ms. > at > org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1168) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:859) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:797) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:684) > at com.xdf.foreign.KafkaTest.main(KafkaTest.java:46) > Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to > update metadata after 60000 ms. > > Third, consider this piece of code: > > public static void main(String[] args) throws Exception { > Properties config = new Properties(); > config.put("client.id", InetAddress.getLocalHost().getHostName()); > config.put("bootstrap.servers", "localhost:9092"); > config.put("acks", "all"); > config.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > config.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > > try (Producer<String, String> producer = new KafkaProducer<>(config);) { > ProducerRecord<String, String> record = new > ProducerRecord<>("test-topic", null, "a-little-message"); > System.out.println("Sending a message..."); > producer.send(record); > System.out.println("Message sent"); > } > } > > Running it will produce no error. The following output will be > produced: > > Sending a message... > Message sent > > I know that the nature of asynchronous sending demands that send > method ignore connection error to the Kafka server. But I think that > it is better to document this kind of behaviour somewhere. > > -- > Jingguo