The same code works when I switch to 0.8.0 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.0</version> </dependency>
And use my own public static void createTopic(String name, int replica, int partitions ) { String[] arguments = new String[8]; arguments[0] = "--zookeeper"; arguments[1] = "localhost:"+zookeeperTestServer.getPort(); arguments[2] = "--replica"; arguments[3] = replica+""; arguments[4] = "--partition"; arguments[5] = partitions+""; arguments[6] = "--topic"; arguments[7] = name; CreateTopicCommand.main(arguments); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } On Sun, Mar 30, 2014 at 12:30 PM, Edward Capriolo <edlinuxg...@gmail.com>wrote: > I am trying to convert a few projects to the latest kafka... > Is this the latest artifact? > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-perf_2.8.0</artifactId> > <version>0.8.1</version> > </dependency> > > I have a piece of code > > @Test > public void test() throws InterruptedException { > //super.createTopic("mytopic", 1, 1); > KafkaWriter kw = new > KafkaWriter(super.createProducerConfig().props().props() > , "mytopic" > , new DefaultMessagePartitioner()); > kw.init(); > for (int i =0 ;i< 1000 ; i++){ > System.out.println("sending"); > kw.send(("bla "+i+" yyy zzz").getBytes()); > } > System.out.println("done"); > ConsumerConnector consumerConnector = > Consumer.createJavaConsumerConnector(super.createConsumerConfig()); > > Map<String, Integer> consumers = new HashMap<String, Integer>(); > consumers.put("mytopic", 1); > Map<String, List<KafkaStream<byte[], byte []>>> topicMessageStreams = > consumerConnector > .createMessageStreams(consumers); > > final List<KafkaStream<byte[], byte[]>> streams = > topicMessageStreams.get("mytopic"); > final AtomicInteger in = new AtomicInteger(); > final KafkaStream<byte[], byte[]> stream = streams.get(0); > > Thread t = new Thread() { > public void run() { > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > while (it.hasNext()){ > System.out.println("waiting for messages"); > MessageAndMetadata<byte[], byte[]> m = it.next(); > System.out.println("key " + new String(m.key())); > System.out.println("message " + new String(m.message())); > in.incrementAndGet(); > System.out.println("count " + in.get()); > if (in.get() == 999) { > break; > } > } > } > }; > > t.start(); > t.join(); > > } > > protected static kafka.producer.ProducerConfig createProducerConfig(){ > Properties producerProps = new Properties(); > //producerProps.put("serializer.class", > "kafka.serializer.StringEncoder"); > putZkConnect(producerProps, > "localhost:"+zookeeperTestServer.getPort()); > producerProps.setProperty("batch.size", "10"); > producerProps.setProperty("producer.type", "async"); > producerProps.put("metadata.broker.list", "localhost:9092"); > return new kafka.producer.ProducerConfig(producerProps); > } > > protected ConsumerConfig createConsumerConfig(){ > Properties consumerProps = new Properties(); > putZkConnect(consumerProps, > "localhost:"+zookeeperTestServer.getPort()); > putGroupId(consumerProps, "group1"); > consumerProps.put("auto.offset.reset", "smallest"); > ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps); > return consumerConfig; > } > > The first 200 messages I send seem to get lost in the ether. > > I have also tried creating the topic myself. > > public static void createTopic(String name, int replica , int partitions > ){ > ZkClient z = new ZkClient("localhost:"+zookeeperTestServer.getPort()); > Properties p = new Properties(); > AdminUtils.createTopic(z, name, replica, partitions, p); > } > > Which complains when i try to write to the topic. > > 014-03-30 12:28:35 WARN BrokerPartitionInfo:83 - Error while fetching > metadata [{TopicMetadata for topic mytopic -> > No partition metadata for topic mytopic due to > kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class > kafka.common.LeaderNotAvailableException > 2014-03-30 12:28:35 WARN BrokerPartitionInfo:83 - Error while fetching > metadata [{TopicMetadata for topic mytopic -> > No partition metadata for topic mytopic due to > kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class > kafka.common.LeaderNotAvailableException > 2014-03-30 12:28:35 ERROR DefaultEventHandler:97 - Failed to collate > messages by topic, partition due to: Failed to fetch topic metadata for > topic: mytopic > > > Does anyone know what is going on. I went through these pains converting > to 0.8.0-betas and I was hoping to be done dealing with this :) >