The same code works when I switch to 0.8.0

And use my own

  public static void createTopic(String name, int replica, int partitions )
    String[] arguments = new String[8];
    arguments[0] = "--zookeeper";
    arguments[1] = "localhost:"+zookeeperTestServer.getPort();
    arguments[2] = "--replica";
    arguments[3] = replica+"";
    arguments[4] = "--partition";
    arguments[5] = partitions+"";
    arguments[6] = "--topic";
    arguments[7] = name;

    try {
    } catch (InterruptedException e) {

On Sun, Mar 30, 2014 at 12:30 PM, Edward Capriolo <>wrote:

> I am trying to convert a few projects to the latest kafka...
> Is this the latest artifact?
>   <dependency>
>                 <groupId>org.apache.kafka</groupId>
>                 <artifactId>kafka-perf_2.8.0</artifactId>
>                 <version>0.8.1</version>
>         </dependency>
> I have a piece of code
>   @Test
>   public void test() throws InterruptedException {
>     //super.createTopic("mytopic", 1, 1);
>     KafkaWriter kw = new
> KafkaWriter(super.createProducerConfig().props().props()
>             , "mytopic"
>             , new DefaultMessagePartitioner());
>     kw.init();
>     for (int i =0 ;i< 1000 ; i++){
>       System.out.println("sending");
>       kw.send(("bla "+i+" yyy zzz").getBytes());
>     }
>     System.out.println("done");
>     ConsumerConnector consumerConnector =
> Consumer.createJavaConsumerConnector(super.createConsumerConfig());
>     Map<String, Integer> consumers = new HashMap<String, Integer>();
>     consumers.put("mytopic", 1);
>     Map<String, List<KafkaStream<byte[], byte []>>> topicMessageStreams =
> consumerConnector
>             .createMessageStreams(consumers);
>     final List<KafkaStream<byte[], byte[]>> streams =
> topicMessageStreams.get("mytopic");
>     final AtomicInteger in = new AtomicInteger();
>     final KafkaStream<byte[], byte[]> stream = streams.get(0);
>     Thread t = new Thread() {
>       public void run() {
>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>         while (it.hasNext()){
>           System.out.println("waiting for messages");
>           MessageAndMetadata<byte[], byte[]> m =;
>           System.out.println("key " + new String(m.key()));
>           System.out.println("message " + new String(m.message()));
>           in.incrementAndGet();
>           System.out.println("count " + in.get());
>           if (in.get() == 999) {
>             break;
>           }
>         }
>       }
>     };
>     t.start();
>     t.join();
>   }
>  protected static kafka.producer.ProducerConfig createProducerConfig(){
>     Properties producerProps = new Properties();
>     //producerProps.put("serializer.class",
> "kafka.serializer.StringEncoder");
>     putZkConnect(producerProps,
> "localhost:"+zookeeperTestServer.getPort());
>     producerProps.setProperty("batch.size", "10");
>     producerProps.setProperty("producer.type", "async");
>     producerProps.put("", "localhost:9092");
>     return new kafka.producer.ProducerConfig(producerProps);
>   }
>   protected ConsumerConfig createConsumerConfig(){
>     Properties consumerProps = new Properties();
>     putZkConnect(consumerProps,
> "localhost:"+zookeeperTestServer.getPort());
>     putGroupId(consumerProps, "group1");
>     consumerProps.put("auto.offset.reset", "smallest");
>     ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
>     return consumerConfig;
>   }
> The first 200 messages I send seem to get lost in the ether.
> I have also tried creating the topic myself.
>   public static void createTopic(String name, int replica , int partitions
> ){
>     ZkClient z = new ZkClient("localhost:"+zookeeperTestServer.getPort());
>     Properties p  = new Properties();
>     AdminUtils.createTopic(z, name, replica, partitions, p);
>   }
> Which complains when i try to write to the topic.
> 014-03-30 12:28:35 WARN  BrokerPartitionInfo:83 - Error while fetching
> metadata [{TopicMetadata for topic mytopic ->
> No partition metadata for topic mytopic due to
> kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
> kafka.common.LeaderNotAvailableException
> 2014-03-30 12:28:35 WARN  BrokerPartitionInfo:83 - Error while fetching
> metadata [{TopicMetadata for topic mytopic ->
> No partition metadata for topic mytopic due to
> kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
> kafka.common.LeaderNotAvailableException
> 2014-03-30 12:28:35 ERROR DefaultEventHandler:97 - Failed to collate
> messages by topic, partition due to: Failed to fetch topic metadata for
> topic: mytopic
> Does anyone know what is going on. I went through these pains converting
> to 0.8.0-betas and I was hoping to be done dealing with this :)

Reply via email to