The topic maybe is not created at the broker yet ... take a look at
ProducerTest.scala as example

You could try TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0,
500) to assert that the topic is in fact at the broker before sending after
creating it

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/


On Fri, Sep 6, 2013 at 4:41 AM, Maier, Dr. Andreas <andreas.ma...@asideas.de
> wrote:

> Hello,
>
> I wrote a simple junit test to test a Kafka producer.
>
> public class KafkaProducerTest {
>
>     private int brokerId = 0;
>     private String topic = "test";
>
>     @Test
>     public void producerTest() throws InterruptedException {
>
>         // setup Zookeeper
>         String zkConnect = TestZKUtils.zookeeperConnect();
>         EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
>         ZkClient zkClient = new ZkClient(zkServer.connectString());
>
>         // setup Broker
>         int port = TestUtils.choosePort();
>         Properties props = TestUtils.createBrokerConfig(brokerId, port);
>
>         KafkaConfig config = new KafkaConfig(props);
>         Time mock = new MockTime();
>         KafkaServer kafkaServer = TestUtils.createServer(config, mock);
>
>         // create topic
>        AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());
>
>         // setup producer
>         Properties properties =  TestUtils.getProducerConfig("localhost:"
> + port, "kafka.producer.DefaultPartitioner");
>
>         ProducerConfig pConfig = new ProducerConfig(properties);
>         Producer producer = new Producer(pConfig);
>
>         // send message
>         KeyedMessage<Integer, String> data = new KeyedMessage(topic,
> "test-message");
>
>         List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
>         messages.add(data);
>
>         producer.send(scala.collection.JavaConversions.asBuffer(messages));
>
>         // cleanup
>         producer.close();
>         kafkaServer.shutdown();
>         zkClient.close();
>         zkServer.shutdown();
>     }
>
> }
>
>
> However when I run the test I get the following error messages
>
> [2013-09-06 10:23:04,970] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
> [2013-09-06 10:23:05,988] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
> [2013-09-06 10:23:06,998] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
> [2013-09-06 10:23:08,009] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
>
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:9
> 0)
>         at kafka.producer.Producer.send(Producer.scala:74)
>         at
> de.ideas.fingerpost.kafka.KafkaProducerTest.producerTest(KafkaProducerTest.
> java:57)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5
> 7)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp
> l.java:43)
>         at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod
> .java:47)
>         at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.
> java:12)
>         at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.j
> ava:44)
>         at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.ja
> va:17)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>         at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja
> va:70)
>         at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja
> va:50)
>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>         at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
>         at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTest
> Runner.java:77)
>         at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitSt
> arter.java:195)
>         at
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5
> 7)
>         at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
>
> [2013-09-06 10:23:09,017] ERROR Failed to send requests for topics test
> with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler:97)
>
> I tried to write my unit test following the scala unit tests in the kafka
> core.
> But it seems like I'm still missing something basic to make it work.
> Can someone help me with that? I'm developing on Mac OS X 10.8.3, and
> compiled the latest Kafka
> (plus the TestUtils) from the git repository using Scala 2.9.2.
>
>
> Regards,
>
> Andreas Maier
>
>
>
>

Reply via email to