Jasper Siepkes created KAFKA-3350:
-------------------------------------

             Summary: The first published message in an autocreated topic 
results in an error
                 Key: KAFKA-3350
                 URL: https://issues.apache.org/jira/browse/KAFKA-3350
             Project: Kafka
          Issue Type: Bug
          Components: producer 
    Affects Versions: 0.9.0.1
         Environment: OpenJDK 8, CentOS 7
            Reporter: Jasper Siepkes
            Assignee: Jun Rao


When using "auto.create.topics.enable=true" and publishing the first message in 
a topic which is auto created results in the following error and the message 
not being published:

{noformat}
13:42:58,867 WARN  [kafka-producer-network-thread | producer-1][NetworkClient] 
Error while fetching metadata with correlation id 0 : 
{non.existing.topic.3b314801-6eb7-4cd2-a372-eaafcdd0db67=LEADER_NOT_AVAILABLE}
13:42:58,963 WARN  [kafka-producer-network-thread | producer-1][NetworkClient] 
Error while fetching metadata with correlation id 1 : 
{non.existing.topic.3b314801-6eb7-4cd2-a372-eaafcdd0db67=LEADER_NOT_AVAILABLE}
{noformat}

Using the default zookeeper and Kafka config and running the following 
application reproduces the above behavior:

{code:title=KafkaTopicAutoCreateProducerTest.java|borderStyle=solid}
        public static void main(String[] args) {
                PropertyConfigurator.configure("log4j.properties");

                Properties producerProperties = new Properties();
                producerProperties.put("bootstrap.servers", "localhost:9092");
                producerProperties.put("acks", "all");
                producerProperties.put("retries", "0");
                producerProperties.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
                producerProperties.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
                producerProperties.put("block.on.buffer.full", "true");

                KafkaProducer<String, String> producer = null;

                try {
                        producer = new KafkaProducer(producerProperties);

                        String randomTopic = UUID.randomUUID().toString();

                        ProducerRecord<String, String> producerRecord = new 
ProducerRecord<>(
                                                "non.existing.topic." + 
randomTopic,
                                                "foo test message 1");

                        log.debug("About to publish message 1.");
                        producer.send(producerRecord);
                        producer.flush();

                        Thread.sleep(5000);

                        producerRecord = new ProducerRecord<>(
                                        "non.existing.topic." + randomTopic,
                                        "foo test message 2");
                        log.debug("About to publish message 2.");
                        producer.send(producerRecord);
                        producer.flush();

                } catch (Exception e) {
                        log.warn("An exception occurred while running producer 
test.", e);
                } finally {
                        if (producer != null) {
                                try {
                                        producer.close();
                                } catch (Exception e) {
                                        log.info("An exception occurred while 
closing the producer.", e);
                                }
                        }
                }
        }
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to