I tried that, but I still got junit.framework.AssertionFailedError: Partition [test,0] metadata not propagated after timeout at kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:506) at kafka.utils.TestUtils.waitUntilMetadataIsPropagated(TestUtils.scala) ...
However it seems I found a solution for my problem. I had to replace ZkClient zkClient = new ZkClient(zkServer.connectString()); with ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$); Then suddenly my test ran without problems. It seem that without using the ZKStringSerializer the ZkClient cannot be used. Regards, Andreas Maier Am 06.09.13 13:52 schrieb "Joe Stein" unter <crypt...@gmail.com>: >The topic maybe is not created at the broker yet ... take a look at >ProducerTest.scala as example > >You could try TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, >500) to assert that the topic is in fact at the broker before sending >after >creating it > >/******************************************* > Joe Stein > Founder, Principal Consultant > Big Data Open Source Security LLC > http://www.stealth.ly > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> >********************************************/ > > >On Fri, Sep 6, 2013 at 4:41 AM, Maier, Dr. Andreas ><andreas.ma...@asideas.de >> wrote: > >> Hello, >> >> I wrote a simple junit test to test a Kafka producer. >> >> public class KafkaProducerTest { >> >> private int brokerId = 0; >> private String topic = "test"; >> >> @Test >> public void producerTest() throws InterruptedException { >> >> // setup Zookeeper >> String zkConnect = TestZKUtils.zookeeperConnect(); >> EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect); >> ZkClient zkClient = new ZkClient(zkServer.connectString()); >> >> // setup Broker >> int port = TestUtils.choosePort(); >> Properties props = TestUtils.createBrokerConfig(brokerId, port); >> >> KafkaConfig config = new KafkaConfig(props); >> Time mock = new MockTime(); >> KafkaServer kafkaServer = TestUtils.createServer(config, mock); >> >> // create topic >> AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties()); >> >> // setup producer >> Properties properties = >>TestUtils.getProducerConfig("localhost:" >> + port, "kafka.producer.DefaultPartitioner"); >> >> ProducerConfig pConfig = new ProducerConfig(properties); >> Producer producer = new Producer(pConfig); >> >> // send message >> KeyedMessage<Integer, String> data = new KeyedMessage(topic, >> "test-message"); >> >> List<KeyedMessage> messages = new ArrayList<KeyedMessage>(); >> messages.add(data); >> >> >>producer.send(scala.collection.JavaConversions.asBuffer(messages)); >> >> // cleanup >> producer.close(); >> kafkaServer.shutdown(); >> zkClient.close(); >> zkServer.shutdown(); >> } >> >> } >> >> >> However when I run the test I get the following error messages >> >> [2013-09-06 10:23:04,970] ERROR Failed to collate messages by topic, >> partition due to: Failed to fetch topic metadata for topic: test >> (kafka.producer.async.DefaultEventHandler:97) >> [2013-09-06 10:23:05,988] ERROR Failed to collate messages by topic, >> partition due to: Failed to fetch topic metadata for topic: test >> (kafka.producer.async.DefaultEventHandler:97) >> [2013-09-06 10:23:06,998] ERROR Failed to collate messages by topic, >> partition due to: Failed to fetch topic metadata for topic: test >> (kafka.producer.async.DefaultEventHandler:97) >> [2013-09-06 10:23:08,009] ERROR Failed to collate messages by topic, >> partition due to: Failed to fetch topic metadata for topic: test >> (kafka.producer.async.DefaultEventHandler:97) >> >> kafka.common.FailedToSendMessageException: Failed to send messages >>after 3 >> tries. >> at >> >>kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>:9 >> 0) >> at kafka.producer.Producer.send(Producer.scala:74) >> at >> >>de.ideas.fingerpost.kafka.KafkaProducerTest.producerTest(KafkaProducerTes >>t. >> java:57) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> >>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java >>:5 >> 7) >> at >> >>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorI >>mp >> l.java:43) >> at >> >>org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMeth >>od >> .java:47) >> at >> >>org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallabl >>e. >> java:12) >> at >> >>org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod >>.j >> ava:44) >> at >> >>org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod. >>ja >> va:17) >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) >> at >> >>org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner. >>ja >> va:70) >> at >> >>org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner. >>ja >> va:50) >> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) >> at >>org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) >> at >> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) >> at >>org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) >> at >>org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) >> at org.junit.runners.ParentRunner.run(ParentRunner.java:309) >> at org.junit.runner.JUnitCore.run(JUnitCore.java:160) >> at >> >>com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTe >>st >> Runner.java:77) >> at >> >>com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnit >>St >> arter.java:195) >> at >> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> >>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java >>:5 >> 7) >> at >> com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) >> >> [2013-09-06 10:23:09,017] ERROR Failed to send requests for topics test >> with correlation ids in [0,8] >>(kafka.producer.async.DefaultEventHandler:97) >> >> I tried to write my unit test following the scala unit tests in the >>kafka >> core. >> But it seems like I'm still missing something basic to make it work. >> Can someone help me with that? I'm developing on Mac OS X 10.8.3, and >> compiled the latest Kafka >> (plus the TestUtils) from the git repository using Scala 2.9.2. >> >> >> Regards, >> >> Andreas Maier >> >> >> >>