Jasper Siepkes created KAFKA-3350: ------------------------------------- Summary: The first published message in an autocreated topic results in an error Key: KAFKA-3350 URL: https://issues.apache.org/jira/browse/KAFKA-3350 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.9.0.1 Environment: OpenJDK 8, CentOS 7 Reporter: Jasper Siepkes Assignee: Jun Rao
When using "auto.create.topics.enable=true" and publishing the first message in a topic which is auto created results in the following error and the message not being published: {noformat} 13:42:58,867 WARN [kafka-producer-network-thread | producer-1][NetworkClient] Error while fetching metadata with correlation id 0 : {non.existing.topic.3b314801-6eb7-4cd2-a372-eaafcdd0db67=LEADER_NOT_AVAILABLE} 13:42:58,963 WARN [kafka-producer-network-thread | producer-1][NetworkClient] Error while fetching metadata with correlation id 1 : {non.existing.topic.3b314801-6eb7-4cd2-a372-eaafcdd0db67=LEADER_NOT_AVAILABLE} {noformat} Using the default zookeeper and Kafka config and running the following application reproduces the above behavior: {code:title=KafkaTopicAutoCreateProducerTest.java|borderStyle=solid} public static void main(String[] args) { PropertyConfigurator.configure("log4j.properties"); Properties producerProperties = new Properties(); producerProperties.put("bootstrap.servers", "localhost:9092"); producerProperties.put("acks", "all"); producerProperties.put("retries", "0"); producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProperties.put("block.on.buffer.full", "true"); KafkaProducer<String, String> producer = null; try { producer = new KafkaProducer(producerProperties); String randomTopic = UUID.randomUUID().toString(); ProducerRecord<String, String> producerRecord = new ProducerRecord<>( "non.existing.topic." + randomTopic, "foo test message 1"); log.debug("About to publish message 1."); producer.send(producerRecord); producer.flush(); Thread.sleep(5000); producerRecord = new ProducerRecord<>( "non.existing.topic." + randomTopic, "foo test message 2"); log.debug("About to publish message 2."); producer.send(producerRecord); producer.flush(); } catch (Exception e) { log.warn("An exception occurred while running producer test.", e); } finally { if (producer != null) { try { producer.close(); } catch (Exception e) { log.info("An exception occurred while closing the producer.", e); } } } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)