I am trying to convert a few projects to the latest kafka... Is this the latest artifact? <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-perf_2.8.0</artifactId> <version>0.8.1</version> </dependency>
I have a piece of code @Test public void test() throws InterruptedException { //super.createTopic("mytopic", 1, 1); KafkaWriter kw = new KafkaWriter(super.createProducerConfig().props().props() , "mytopic" , new DefaultMessagePartitioner()); kw.init(); for (int i =0 ;i< 1000 ; i++){ System.out.println("sending"); kw.send(("bla "+i+" yyy zzz").getBytes()); } System.out.println("done"); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(super.createConsumerConfig()); Map<String, Integer> consumers = new HashMap<String, Integer>(); consumers.put("mytopic", 1); Map<String, List<KafkaStream<byte[], byte []>>> topicMessageStreams = consumerConnector .createMessageStreams(consumers); final List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("mytopic"); final AtomicInteger in = new AtomicInteger(); final KafkaStream<byte[], byte[]> stream = streams.get(0); Thread t = new Thread() { public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()){ System.out.println("waiting for messages"); MessageAndMetadata<byte[], byte[]> m = it.next(); System.out.println("key " + new String(m.key())); System.out.println("message " + new String(m.message())); in.incrementAndGet(); System.out.println("count " + in.get()); if (in.get() == 999) { break; } } } }; t.start(); t.join(); } protected static kafka.producer.ProducerConfig createProducerConfig(){ Properties producerProps = new Properties(); //producerProps.put("serializer.class", "kafka.serializer.StringEncoder"); putZkConnect(producerProps, "localhost:"+zookeeperTestServer.getPort()); producerProps.setProperty("batch.size", "10"); producerProps.setProperty("producer.type", "async"); producerProps.put("metadata.broker.list", "localhost:9092"); return new kafka.producer.ProducerConfig(producerProps); } protected ConsumerConfig createConsumerConfig(){ Properties consumerProps = new Properties(); putZkConnect(consumerProps, "localhost:"+zookeeperTestServer.getPort()); putGroupId(consumerProps, "group1"); consumerProps.put("auto.offset.reset", "smallest"); ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps); return consumerConfig; } The first 200 messages I send seem to get lost in the ether. I have also tried creating the topic myself. public static void createTopic(String name, int replica , int partitions ){ ZkClient z = new ZkClient("localhost:"+zookeeperTestServer.getPort()); Properties p = new Properties(); AdminUtils.createTopic(z, name, replica, partitions, p); } Which complains when i try to write to the topic. 014-03-30 12:28:35 WARN BrokerPartitionInfo:83 - Error while fetching metadata [{TopicMetadata for topic mytopic -> No partition metadata for topic mytopic due to kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class kafka.common.LeaderNotAvailableException 2014-03-30 12:28:35 WARN BrokerPartitionInfo:83 - Error while fetching metadata [{TopicMetadata for topic mytopic -> No partition metadata for topic mytopic due to kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class kafka.common.LeaderNotAvailableException 2014-03-30 12:28:35 ERROR DefaultEventHandler:97 - Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: mytopic Does anyone know what is going on. I went through these pains converting to 0.8.0-betas and I was hoping to be done dealing with this :)