I am trying to convert a few projects to the latest kafka...
Is this the latest artifact?
  <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-perf_2.8.0</artifactId>
                <version>0.8.1</version>
        </dependency>

I have a piece of code

  @Test
  public void test() throws InterruptedException {
    //super.createTopic("mytopic", 1, 1);
    KafkaWriter kw = new
KafkaWriter(super.createProducerConfig().props().props()
            , "mytopic"
            , new DefaultMessagePartitioner());
    kw.init();
    for (int i =0 ;i< 1000 ; i++){
      System.out.println("sending");
      kw.send(("bla "+i+" yyy zzz").getBytes());
    }
    System.out.println("done");
    ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(super.createConsumerConfig());

    Map<String, Integer> consumers = new HashMap<String, Integer>();
    consumers.put("mytopic", 1);
    Map<String, List<KafkaStream<byte[], byte []>>> topicMessageStreams =
consumerConnector
            .createMessageStreams(consumers);

    final List<KafkaStream<byte[], byte[]>> streams =
topicMessageStreams.get("mytopic");
    final AtomicInteger in = new AtomicInteger();
    final KafkaStream<byte[], byte[]> stream = streams.get(0);

    Thread t = new Thread() {
      public void run() {
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()){
          System.out.println("waiting for messages");
          MessageAndMetadata<byte[], byte[]> m = it.next();
          System.out.println("key " + new String(m.key()));
          System.out.println("message " + new String(m.message()));
          in.incrementAndGet();
          System.out.println("count " + in.get());
          if (in.get() == 999) {
            break;
          }
        }
      }
    };

    t.start();
    t.join();

  }

 protected static kafka.producer.ProducerConfig createProducerConfig(){
    Properties producerProps = new Properties();
    //producerProps.put("serializer.class",
"kafka.serializer.StringEncoder");
    putZkConnect(producerProps, "localhost:"+zookeeperTestServer.getPort());
    producerProps.setProperty("batch.size", "10");
    producerProps.setProperty("producer.type", "async");
    producerProps.put("metadata.broker.list", "localhost:9092");
    return new kafka.producer.ProducerConfig(producerProps);
  }

  protected ConsumerConfig createConsumerConfig(){
    Properties consumerProps = new Properties();
    putZkConnect(consumerProps, "localhost:"+zookeeperTestServer.getPort());
    putGroupId(consumerProps, "group1");
    consumerProps.put("auto.offset.reset", "smallest");
    ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
    return consumerConfig;
  }

The first 200 messages I send seem to get lost in the ether.

I have also tried creating the topic myself.

  public static void createTopic(String name, int replica , int partitions
){
    ZkClient z = new ZkClient("localhost:"+zookeeperTestServer.getPort());
    Properties p  = new Properties();
    AdminUtils.createTopic(z, name, replica, partitions, p);
  }

Which complains when i try to write to the topic.

014-03-30 12:28:35 WARN  BrokerPartitionInfo:83 - Error while fetching
metadata [{TopicMetadata for topic mytopic ->
No partition metadata for topic mytopic due to
kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
kafka.common.LeaderNotAvailableException
2014-03-30 12:28:35 WARN  BrokerPartitionInfo:83 - Error while fetching
metadata [{TopicMetadata for topic mytopic ->
No partition metadata for topic mytopic due to
kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
kafka.common.LeaderNotAvailableException
2014-03-30 12:28:35 ERROR DefaultEventHandler:97 - Failed to collate
messages by topic, partition due to: Failed to fetch topic metadata for
topic: mytopic


Does anyone know what is going on. I went through these pains converting to
0.8.0-betas and I was hoping to be done dealing with this :)

Reply via email to