I tried that, but I still got

junit.framework.AssertionFailedError: Partition [test,0] metadata not
propagated after timeout
        at 
kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:506)
        at kafka.utils.TestUtils.waitUntilMetadataIsPropagated(TestUtils.scala)
...

However it seems I found a solution for my problem. I had to replace

ZkClient zkClient = new ZkClient(zkServer.connectString());


with

ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000,
ZKStringSerializer$.MODULE$);

Then suddenly my test ran without problems.
It seem that without using the ZKStringSerializer the ZkClient cannot be
used. 

Regards,

Andreas Maier



Am 06.09.13 13:52 schrieb "Joe Stein" unter <crypt...@gmail.com>:

>The topic maybe is not created at the broker yet ... take a look at
>ProducerTest.scala as example
>
>You could try TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0,
>500) to assert that the topic is in fact at the broker before sending
>after
>creating it
>
>/*******************************************
> Joe Stein
> Founder, Principal Consultant
> Big Data Open Source Security LLC
> http://www.stealth.ly
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>********************************************/
>
>
>On Fri, Sep 6, 2013 at 4:41 AM, Maier, Dr. Andreas
><andreas.ma...@asideas.de
>> wrote:
>
>> Hello,
>>
>> I wrote a simple junit test to test a Kafka producer.
>>
>> public class KafkaProducerTest {
>>
>>     private int brokerId = 0;
>>     private String topic = "test";
>>
>>     @Test
>>     public void producerTest() throws InterruptedException {
>>
>>         // setup Zookeeper
>>         String zkConnect = TestZKUtils.zookeeperConnect();
>>         EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
>>         ZkClient zkClient = new ZkClient(zkServer.connectString());
>>
>>         // setup Broker
>>         int port = TestUtils.choosePort();
>>         Properties props = TestUtils.createBrokerConfig(brokerId, port);
>>
>>         KafkaConfig config = new KafkaConfig(props);
>>         Time mock = new MockTime();
>>         KafkaServer kafkaServer = TestUtils.createServer(config, mock);
>>
>>         // create topic
>>        AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());
>>
>>         // setup producer
>>         Properties properties =
>>TestUtils.getProducerConfig("localhost:"
>> + port, "kafka.producer.DefaultPartitioner");
>>
>>         ProducerConfig pConfig = new ProducerConfig(properties);
>>         Producer producer = new Producer(pConfig);
>>
>>         // send message
>>         KeyedMessage<Integer, String> data = new KeyedMessage(topic,
>> "test-message");
>>
>>         List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
>>         messages.add(data);
>>
>>         
>>producer.send(scala.collection.JavaConversions.asBuffer(messages));
>>
>>         // cleanup
>>         producer.close();
>>         kafkaServer.shutdown();
>>         zkClient.close();
>>         zkServer.shutdown();
>>     }
>>
>> }
>>
>>
>> However when I run the test I get the following error messages
>>
>> [2013-09-06 10:23:04,970] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>> [2013-09-06 10:23:05,988] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>> [2013-09-06 10:23:06,998] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>> [2013-09-06 10:23:08,009] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>>
>> kafka.common.FailedToSendMessageException: Failed to send messages
>>after 3
>> tries.
>>         at
>> 
>>kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>:9
>> 0)
>>         at kafka.producer.Producer.send(Producer.scala:74)
>>         at
>> 
>>de.ideas.fingerpost.kafka.KafkaProducerTest.producerTest(KafkaProducerTes
>>t.
>> java:57)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> 
>>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java
>>:5
>> 7)
>>         at
>> 
>>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorI
>>mp
>> l.java:43)
>>         at
>> 
>>org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMeth
>>od
>> .java:47)
>>         at
>> 
>>org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallabl
>>e.
>> java:12)
>>         at
>> 
>>org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod
>>.j
>> ava:44)
>>         at
>> 
>>org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.
>>ja
>> va:17)
>>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>>         at
>> 
>>org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.
>>ja
>> va:70)
>>         at
>> 
>>org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.
>>ja
>> va:50)
>>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>>         at 
>>org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>>         at
>> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>>         at 
>>org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>>         at 
>>org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>>         at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>>         at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
>>         at
>> 
>>com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTe
>>st
>> Runner.java:77)
>>         at
>> 
>>com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnit
>>St
>> arter.java:195)
>>         at
>> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> 
>>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java
>>:5
>> 7)
>>         at
>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
>>
>> [2013-09-06 10:23:09,017] ERROR Failed to send requests for topics test
>> with correlation ids in [0,8]
>>(kafka.producer.async.DefaultEventHandler:97)
>>
>> I tried to write my unit test following the scala unit tests in the
>>kafka
>> core.
>> But it seems like I'm still missing something basic to make it work.
>> Can someone help me with that? I'm developing on Mac OS X 10.8.3, and
>> compiled the latest Kafka
>> (plus the TestUtils) from the git repository using Scala 2.9.2.
>>
>>
>> Regards,
>>
>> Andreas Maier
>>
>>
>>
>>

Reply via email to