I tried that, but I still got
junit.framework.AssertionFailedError: Partition [test,0] metadata not
propagated after timeout
at
kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:506)
at kafka.utils.TestUtils.waitUntilMetadataIsPropagated(TestUtils.scala)
...
However it seems I found a solution for my problem. I had to replace
ZkClient zkClient = new ZkClient(zkServer.connectString());
with
ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000,
ZKStringSerializer$.MODULE$);
Then suddenly my test ran without problems.
It seem that without using the ZKStringSerializer the ZkClient cannot be
used.
Regards,
Andreas Maier
Am 06.09.13 13:52 schrieb "Joe Stein" unter <[email protected]>:
>The topic maybe is not created at the broker yet ... take a look at
>ProducerTest.scala as example
>
>You could try TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0,
>500) to assert that the topic is in fact at the broker before sending
>after
>creating it
>
>/*******************************************
> Joe Stein
> Founder, Principal Consultant
> Big Data Open Source Security LLC
> http://www.stealth.ly
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>********************************************/
>
>
>On Fri, Sep 6, 2013 at 4:41 AM, Maier, Dr. Andreas
><[email protected]
>> wrote:
>
>> Hello,
>>
>> I wrote a simple junit test to test a Kafka producer.
>>
>> public class KafkaProducerTest {
>>
>> private int brokerId = 0;
>> private String topic = "test";
>>
>> @Test
>> public void producerTest() throws InterruptedException {
>>
>> // setup Zookeeper
>> String zkConnect = TestZKUtils.zookeeperConnect();
>> EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
>> ZkClient zkClient = new ZkClient(zkServer.connectString());
>>
>> // setup Broker
>> int port = TestUtils.choosePort();
>> Properties props = TestUtils.createBrokerConfig(brokerId, port);
>>
>> KafkaConfig config = new KafkaConfig(props);
>> Time mock = new MockTime();
>> KafkaServer kafkaServer = TestUtils.createServer(config, mock);
>>
>> // create topic
>> AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());
>>
>> // setup producer
>> Properties properties =
>>TestUtils.getProducerConfig("localhost:"
>> + port, "kafka.producer.DefaultPartitioner");
>>
>> ProducerConfig pConfig = new ProducerConfig(properties);
>> Producer producer = new Producer(pConfig);
>>
>> // send message
>> KeyedMessage<Integer, String> data = new KeyedMessage(topic,
>> "test-message");
>>
>> List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
>> messages.add(data);
>>
>>
>>producer.send(scala.collection.JavaConversions.asBuffer(messages));
>>
>> // cleanup
>> producer.close();
>> kafkaServer.shutdown();
>> zkClient.close();
>> zkServer.shutdown();
>> }
>>
>> }
>>
>>
>> However when I run the test I get the following error messages
>>
>> [2013-09-06 10:23:04,970] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>> [2013-09-06 10:23:05,988] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>> [2013-09-06 10:23:06,998] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>> [2013-09-06 10:23:08,009] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>>
>> kafka.common.FailedToSendMessageException: Failed to send messages
>>after 3
>> tries.
>> at
>>
>>kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>:9
>> 0)
>> at kafka.producer.Producer.send(Producer.scala:74)
>> at
>>
>>de.ideas.fingerpost.kafka.KafkaProducerTest.producerTest(KafkaProducerTes
>>t.
>> java:57)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java
>>:5
>> 7)
>> at
>>
>>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorI
>>mp
>> l.java:43)
>> at
>>
>>org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMeth
>>od
>> .java:47)
>> at
>>
>>org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallabl
>>e.
>> java:12)
>> at
>>
>>org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod
>>.j
>> ava:44)
>> at
>>
>>org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.
>>ja
>> va:17)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>> at
>>
>>org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.
>>ja
>> va:70)
>> at
>>
>>org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.
>>ja
>> va:50)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>> at
>>org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>> at
>> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>> at
>>org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>> at
>>org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>> at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
>> at
>>
>>com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTe
>>st
>> Runner.java:77)
>> at
>>
>>com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnit
>>St
>> arter.java:195)
>> at
>> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java
>>:5
>> 7)
>> at
>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
>>
>> [2013-09-06 10:23:09,017] ERROR Failed to send requests for topics test
>> with correlation ids in [0,8]
>>(kafka.producer.async.DefaultEventHandler:97)
>>
>> I tried to write my unit test following the scala unit tests in the
>>kafka
>> core.
>> But it seems like I'm still missing something basic to make it work.
>> Can someone help me with that? I'm developing on Mac OS X 10.8.3, and
>> compiled the latest Kafka
>> (plus the TestUtils) from the git repository using Scala 2.9.2.
>>
>>
>> Regards,
>>
>> Andreas Maier
>>
>>
>>
>>