i am using 0.8.0. The high level api works as expected.

<dependency>

 <groupId>org.apache.kafka</groupId>

 <artifactId>kafka_2.10</artifactId>

 <version>0.8.0</version>


On Thu, Feb 20, 2014 at 2:40 PM, Chen Wang <chen.apache.s...@gmail.com>wrote:

> Hi,
> I am using kafka for the first time, and was running the sample from
>
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
>
> However, I cannot read any data from kafka. The kafka has 10
> partitions,and I tried to read from any of them. The fetch can succeed,
> however, the message size returned is always 0( System.out
>         .println("the message size is" + messageSet
>             .kafka$javaapi$message$ByteBufferMessageSet$$underlying()
>             .size());). Is there something apparent missing for my case?
>
>
> while (a_maxReads > 0) {
>     if (consumer == null) {
>         consumer = new SimpleConsumer(leadBroker, a_port, 100000,
>             10240 * 1024, clientName);
>     }
>
>     System.out.println("start fetching");
>     System.out.println("the readoffset is" + readOffset);
>
>     FetchRequest req = new FetchRequestBuilder().clientId(clientName)
>         .addFetch(a_topic, a_partition, readOffset, 1000000)
>         .build();
>     FetchResponse fetchResponse = consumer.fetch(req);
>     System.out.println("finish fetching");
>     if (fetchResponse.hasError()) {
>         numErrors++;
>         // Something went wrong!
>         short code = fetchResponse.errorCode(a_topic, a_partition);
>         System.out.println("Error fetching data from the Broker:" +
> leadBroker + " Reason: " + code);
>         if (numErrors > 5)
>             break;
>         if (code == ErrorMapping.OffsetOutOfRangeCode()) {
>             // We asked for an invalid offset. For simple case ask for
>             // the last element to reset
>             readOffset = getLastOffset(consumer, a_topic, a_partition,
>                 kafka.api.OffsetRequest.LatestTime(), clientName);
>             continue;
>         }
>         consumer.close();
>         consumer = null;
>         leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
>             a_port);
>         continue;
>     }
>     numErrors = 0;
>
>     long numRead = 0;
>     System.out.println("The topic is:" + a_topic + " partition is : " +
> a_partition);
>     ByteBufferMessageSet messageSet = fetchResponse.messageSet(a_topic,
>         a_partition);
>     System.out
>         .println("the message size is" + messageSet
>             .kafka$javaapi$message$ByteBufferMessageSet$$underlying()
>             .size());
>     for (MessageAndOffset messageAndOffset: messageSet) {
>         long currentOffset = messageAndOffset.offset();
>         if (currentOffset < readOffset) {
>             System.out.println("Found an old offset: " + currentOffset + "
> Expecting: " + readOffset);
>             continue;
>         }
>         readOffset = messageAndOffset.nextOffset();
>         ByteBuffer payload = messageAndOffset.message().payload();
>
>         byte[] bytes = new byte[payload.limit()];
>         payload.get(bytes);
>         System.out.println(String.valueOf(messageAndOffset.offset()) + ":
> " + new String(bytes, "UTF-8"));
>         numRead++;
>         a_maxReads--;
>     }
>
>     if (numRead == 0) {
>         try {
>             Thread.sleep(1000);
>         } catch (InterruptedException ie) {}
>
> Thanks much!
> Chen
>

Reply via email to