Ok so now it is looping through the messages fine, and outputting the actual message payload:
while (true) { //val fetchRequest = new FetchRequest("TEST", 0, offset, 1024) val fetchRequest = new FetchRequestBuilder().addFetch(topic, partition, offset, 1024).build() val fetchResponse: FetchResponse = consumer1.fetch(fetchRequest) val messageSet = fetchResponse.messageSet(topic, 0).iterator.toBuffer println("consumed Message " + Utils.readString(messageSet(0).message.payload, "UTF-8") ) offset += 1 } Is there a way for it to not crash at the end? *** Just to be clear, the idea is the run an embedded version in my web application so I can verify the messages are being send and processed in development, this isn't a production idea of mine :) consumed Message test199 consumed Message test200 [error] (run-main-0) java.lang.IndexOutOfBoundsException: 0 java.lang.IndexOutOfBoundsException: 0 at scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43) at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47) at com.debugging.jobs.KafkaEmbedded$delayedInit$body.apply(KafkaEmbedded.scala:92) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at com.debugging.jobs.KafkaEmbedded$.main(KafkaEmbedded.scala:24) at com.debugging.jobs.KafkaEmbedded.main(KafkaEmbedded.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) On Fri, Jun 13, 2014 at 4:51 PM, S Ahmed <sahmed1...@gmail.com> wrote: > I found this embedded kafka example online ( > https://gist.github.com/mardambey/2650743) which I am re-writing to work > with 0.8 > > Can someone help me re-write this portion: > > > > val cons = new SimpleConsumer("localhost", 9090, 100, 1024) > > var offset = 0L > > > > var i = 0 > > > while (true) { > val fetchRequest = new FetchRequest("TEST", 0, offset, 1024) > > > > for (msg <- cons.fetch(fetchRequest)) { > > i = i + 1 > > println("consumed [ " + i + "]: offset = " + msg.offset + ", payload = > " + Utils.toString(msg.message.payload, "UTF-8")) > > offset = msg.offset > > } > } > > > > I have this so far: > > val partition = 0 > var offset = 0L > > var i = 0 > while (true) { > //val fetchRequest = new FetchRequest("TEST", 0, offset, 1024) > val fetchRequest = new FetchRequestBuilder().addFetch(topic, partition, > offset, 1024).build() > > val fetchResponse: FetchResponse = consumer1.fetch(fetchRequest) > > val messageSet = fetchResponse.messageSet(topic, 0).iterator.toBuffer > println("consumed Message " + messageSet(0).message) > > } > > This currently loops forever b/c it isn't incrementing the offset or anything. > > I'm confused b/c I believe there is no more offset as things are more user > friendly with an incrmeenting counter. > > > Any help would be appreciated. > >