Also, I found this link that indicates the exception I reported yesterday can be symptomatic of a mismatch between the client and broker where the client is one version newer. I am not saying that's the case here with Storm 1.0.0, but wanted to provide this info troubleshooting-wise.
Thanks --John On Tue, Apr 19, 2016 at 3:26 PM, John Yost <[email protected]> wrote: > Hi Harsha, > > When the Storm 1.0.0 KafkaSpout (from the storm-kafka jar) attempts to > read from the Kafka 0.8.2.1 partition an IlegalArgumentException is thrown, > the root exception of which is as follows: > > at java.nio.Buffer.limit(Buffer.java:267) > at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:37) > at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:99) > at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:97) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.Range.foreach(Range.scala:141) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at kafka.api.TopicData$.readFrom(FetchResponse.scala:97) > at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) > at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:168) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at scala.collection.immutable.Range.foreach(Range.scala:141) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > > The corresponding source code in Kafka where the root exception is thrown > is bolded: > > object FetchResponsePartitionData { > def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = { > val error = buffer.getShort > val hw = buffer.getLong > val messageSetSize = buffer.getInt > val messageSetBuffer = buffer.slice() > * messageSetBuffer.limit(messageSetSize)* > buffer.position(buffer.position + messageSetSize) > new FetchResponsePartitionData(error, hw, new > ByteBufferMessageSet(messageSetBuffer)) > } > > I am using all the default KafkaConfig settings for the KafkaSpout with > the exception of startOffsetTime, so I don't *think* I have a > misconfiguration, but I may be wrong. > > Please confirm if there is anything I need to do config-wise to make this > work. > > Thanks > > --John > > On Sat, Apr 16, 2016 at 10:49 PM, <[email protected]> wrote: > >> Awesome, thanks Harsha! >> >> --John >> >> Sent from my iPhone >> >> > On Apr 16, 2016, at 1:28 PM, Harsha <[email protected]> wrote: >> > >> > John, >> > I think you are asking if you will be able to run 0.8.2 >> kafka consumer in storm 1.0.0 . Yes we are shipping storm-kafka-client >> which uses the new consumer api in kafka 0.9.0.1 but storm 1.0.0 still >> ships with storm-kafka which uses older consumer api which can work with >> all versions of kafka including 0.9.0.1. >> > >> > "I checked out the v1.0.0 tag, changed the kafka version to 0.8.2.1, >> and I am getting compile errors in storm-kafka-client. I don't have a >> problem fixing the errors, but I want to ensure I am not wasting my time. >> :)" >> > >> > You shouldn't be changing kafka version. As I said above Kafka 0.9.0.1 >> contains two kafka apis new ones which will only works with 0.9.0.1 kafka >> cluster and old consumer apis which can work 0.8.2. Even though you compile >> with 0.9.0.1 version it will work with 0.8.2.1 kafka cluster. >> > >> > Let me know if you have any questions. >> > >> > Thanks, >> > Harsha >> > >> >> On Fri, Apr 15, 2016, at 11:39 AM, John Yost wrote: >> >> Hi Everyone, >> >> I know that Storm 1.0.0 is designed to work with Kafka 0.9.0.1, but is >> it possible for Storm 1.0.0 to work with 0.8.2.1? >> >> >> >> I did some integration testing with 0.9.0.1 client with 0.8.2.1 >> broker, which is yucky, and it appears this won't work. >> >> >> >> I checked out the v1.0.0 tag, changed the kafka version to 0.8.2.1, >> and I am getting compile errors in storm-kafka-client. I don't have a >> problem fixing the errors, but I want to ensure I am not wasting my time. :) >> >> >> >> Please confirm if it is anticipated that Storm 1.0.0 can be made >> compatible with Kafka 0.8.2.x--thanks! >> >> --John >> > >> > >
