The topic maybe is not created at the broker yet ... take a look at ProducerTest.scala as example
You could try TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 500) to assert that the topic is in fact at the broker before sending after creating it /******************************************* Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> ********************************************/ On Fri, Sep 6, 2013 at 4:41 AM, Maier, Dr. Andreas <andreas.ma...@asideas.de > wrote: > Hello, > > I wrote a simple junit test to test a Kafka producer. > > public class KafkaProducerTest { > > private int brokerId = 0; > private String topic = "test"; > > @Test > public void producerTest() throws InterruptedException { > > // setup Zookeeper > String zkConnect = TestZKUtils.zookeeperConnect(); > EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect); > ZkClient zkClient = new ZkClient(zkServer.connectString()); > > // setup Broker > int port = TestUtils.choosePort(); > Properties props = TestUtils.createBrokerConfig(brokerId, port); > > KafkaConfig config = new KafkaConfig(props); > Time mock = new MockTime(); > KafkaServer kafkaServer = TestUtils.createServer(config, mock); > > // create topic > AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties()); > > // setup producer > Properties properties = TestUtils.getProducerConfig("localhost:" > + port, "kafka.producer.DefaultPartitioner"); > > ProducerConfig pConfig = new ProducerConfig(properties); > Producer producer = new Producer(pConfig); > > // send message > KeyedMessage<Integer, String> data = new KeyedMessage(topic, > "test-message"); > > List<KeyedMessage> messages = new ArrayList<KeyedMessage>(); > messages.add(data); > > producer.send(scala.collection.JavaConversions.asBuffer(messages)); > > // cleanup > producer.close(); > kafkaServer.shutdown(); > zkClient.close(); > zkServer.shutdown(); > } > > } > > > However when I run the test I get the following error messages > > [2013-09-06 10:23:04,970] ERROR Failed to collate messages by topic, > partition due to: Failed to fetch topic metadata for topic: test > (kafka.producer.async.DefaultEventHandler:97) > [2013-09-06 10:23:05,988] ERROR Failed to collate messages by topic, > partition due to: Failed to fetch topic metadata for topic: test > (kafka.producer.async.DefaultEventHandler:97) > [2013-09-06 10:23:06,998] ERROR Failed to collate messages by topic, > partition due to: Failed to fetch topic metadata for topic: test > (kafka.producer.async.DefaultEventHandler:97) > [2013-09-06 10:23:08,009] ERROR Failed to collate messages by topic, > partition due to: Failed to fetch topic metadata for topic: test > (kafka.producer.async.DefaultEventHandler:97) > > kafka.common.FailedToSendMessageException: Failed to send messages after 3 > tries. > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:9 > 0) > at kafka.producer.Producer.send(Producer.scala:74) > at > de.ideas.fingerpost.kafka.KafkaProducerTest.producerTest(KafkaProducerTest. > java:57) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5 > 7) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp > l.java:43) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod > .java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable. > java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.j > ava:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.ja > va:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja > va:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja > va:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at org.junit.runner.JUnitCore.run(JUnitCore.java:160) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTest > Runner.java:77) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitSt > arter.java:195) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5 > 7) > at > com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) > > [2013-09-06 10:23:09,017] ERROR Failed to send requests for topics test > with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler:97) > > I tried to write my unit test following the scala unit tests in the kafka > core. > But it seems like I'm still missing something basic to make it work. > Can someone help me with that? I'm developing on Mac OS X 10.8.3, and > compiled the latest Kafka > (plus the TestUtils) from the git repository using Scala 2.9.2. > > > Regards, > > Andreas Maier > > > >