That is expected behavior. Typically there are multiple kafka brokers and so if
one is down the client retries to send to a newly elected leader.
A send should not be considered successful until an ACK is received in the
client from the kafka cluster.
By default the ACK is async for performance but the send() teturns a future so
you can make it appear to be a synchrounous publish easily. Examples are in the
javadoc.
-hans
> On Jul 18, 2018, at 7:45 AM, jingguo yao wrote:
>
> The asynchronous sending of a message returns no error even if the
> Kafka server is not started.
>
> For all the following tests, the local Kafka server is stopped. First,
> consider this piece of code:
>
> public static void main(String[] args) throws Exception {
> Properties config = new Properties();
> config.put("client.id", InetAddress.getLocalHost().getHostName());
> config.put("bootstrap.servers", "localhost:9092");
> config.put("acks", "all");
> config.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> config.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>
> try (Producer producer = new KafkaProducer<>(config);) {
>ProducerRecord record = new
> ProducerRecord<>("test-topic", null, "a-little-message");
>producer.send(record, new Callback() {
> @Override
> public void onCompletion(RecordMetadata metadata, Exception exception) {
>if (exception != null) {
> System.out.println("Exceptoin occurred!");
> exception.printStackTrace(System.out);
>}
> }
>});
> }
> }
>
> Running it will produce the following error:
>
> Exception occurred!
> org.apache.kafka.common.errors.TimeoutException: Failed to update
> metadata after 6 ms.
>
> Second, consider this piece of code:
>
> public static void main(String[] args) throws Exception {
> Properties config = new Properties();
> config.put("client.id", InetAddress.getLocalHost().getHostName());
> config.put("bootstrap.servers", "localhost:9092");
> config.put("acks", "all");
> config.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> config.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>
> try (Producer producer = new KafkaProducer<>(config);) {
>ProducerRecord record = new
> ProducerRecord<>("test-topic", null, "a-little-message");
>System.out.println("Sending a message...");
>producer.send(record).get();
>System.out.println("Message sent");
> }
> }
>
> Running it will produce the following error:
>
> Sending a message...
> Exception in thread "main" java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.TimeoutException: Failed to update
> metadata after 6 ms.
> at
> org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1168)
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:859)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:797)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:684)
> at com.xdf.foreign.KafkaTest.main(KafkaTest.java:46)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to
> update metadata after 6 ms.
>
> Third, consider this piece of code:
>
> public static void main(String[] args) throws Exception {
> Properties config = new Properties();
> config.put("client.id", InetAddress.getLocalHost().getHostName());
> config.put("bootstrap.servers", "localhost:9092");
> config.put("acks", "all");
> config.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> config.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>
> try (Producer producer = new KafkaProducer<>(config);) {
>ProducerRecord record = new
> ProducerRecord<>("test-topic", null, "a-little-message");
>System.out.println("Sending a message...");
>producer.send(record);
>System.out.println("Message sent");
> }
> }
>
> Running it will produce no error. The following output will be
> produced:
>
> Sending a message...
> Message sent
>
> I know that the nature of asynchronous sending demands that send
> method ignore connection error to the Kafka server. But I think that
> it is better to document this kind of behaviour somewhere.
>
> --
> Jingguo