Argh, fat fingers...I am attempting to write to Kafka 0.8.2.1 from Storm 1.0.0 which is has a dependency upon Kafka 0.9.0.1.
@Abhishek -> interesting you are seeing the same exception for Storm 0.10.0 because that has a dependency upon Kafka 0.8.1.1. On Wed, Apr 20, 2016 at 8:06 AM, John Yost <[email protected]> wrote: > Oh, gotcha, okay, will do. BTW, here's the link I failed to provide the > first time: https://github.com/confluentinc/examples/issues/15 > > --John > > On Wed, Apr 20, 2016 at 7:44 AM, Abhishek Agarwal <[email protected]> > wrote: > >> @John - >> can you file a JIRA for this? I doubt it is related to 1.0.0 version in >> particular. I have run into "illegalArugmentExceptions" in KafkaSpout >> (0.10.0). >> >> On Wed, Apr 20, 2016 at 4:44 PM, John Yost <[email protected]> wrote: >> >>> Also, I found this link that indicates the exception I reported >>> yesterday can be symptomatic of a mismatch between the client and broker >>> where the client is one version newer. I am not saying that's the case >>> here with Storm 1.0.0, but wanted to provide this info troubleshooting-wise. >>> >>> Thanks >>> >>> --John >>> >>> On Tue, Apr 19, 2016 at 3:26 PM, John Yost <[email protected]> wrote: >>> >>>> Hi Harsha, >>>> >>>> When the Storm 1.0.0 KafkaSpout (from the storm-kafka jar) attempts to >>>> read from the Kafka 0.8.2.1 partition an IlegalArgumentException is thrown, >>>> the root exception of which is as follows: >>>> >>>> at java.nio.Buffer.limit(Buffer.java:267) >>>> at >>>> kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:37) >>>> at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:99) >>>> at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:97) >>>> at >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>> at >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>> at scala.collection.immutable.Range.foreach(Range.scala:141) >>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>>> at scala.collection.AbstractTraversable.map(Traversable.scala:105) >>>> at kafka.api.TopicData$.readFrom(FetchResponse.scala:97) >>>> at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) >>>> at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:168) >>>> at >>>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >>>> at >>>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >>>> at scala.collection.immutable.Range.foreach(Range.scala:141) >>>> at >>>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) >>>> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) >>>> >>>> The corresponding source code in Kafka where the root exception is >>>> thrown is bolded: >>>> >>>> object FetchResponsePartitionData { >>>> def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = { >>>> val error = buffer.getShort >>>> val hw = buffer.getLong >>>> val messageSetSize = buffer.getInt >>>> val messageSetBuffer = buffer.slice() >>>> * messageSetBuffer.limit(messageSetSize)* >>>> buffer.position(buffer.position + messageSetSize) >>>> new FetchResponsePartitionData(error, hw, new >>>> ByteBufferMessageSet(messageSetBuffer)) >>>> } >>>> >>>> I am using all the default KafkaConfig settings for the KafkaSpout with >>>> the exception of startOffsetTime, so I don't *think* I have a >>>> misconfiguration, but I may be wrong. >>>> >>>> Please confirm if there is anything I need to do config-wise to make >>>> this work. >>>> >>>> Thanks >>>> >>>> --John >>>> >>>> On Sat, Apr 16, 2016 at 10:49 PM, <[email protected]> wrote: >>>> >>>>> Awesome, thanks Harsha! >>>>> >>>>> --John >>>>> >>>>> Sent from my iPhone >>>>> >>>>> > On Apr 16, 2016, at 1:28 PM, Harsha <[email protected]> wrote: >>>>> > >>>>> > John, >>>>> > I think you are asking if you will be able to run 0.8.2 >>>>> kafka consumer in storm 1.0.0 . Yes we are shipping storm-kafka-client >>>>> which uses the new consumer api in kafka 0.9.0.1 but storm 1.0.0 still >>>>> ships with storm-kafka which uses older consumer api which can work with >>>>> all versions of kafka including 0.9.0.1. >>>>> > >>>>> > "I checked out the v1.0.0 tag, changed the kafka version to 0.8.2.1, >>>>> and I am getting compile errors in storm-kafka-client. I don't have a >>>>> problem fixing the errors, but I want to ensure I am not wasting my time. >>>>> :)" >>>>> > >>>>> > You shouldn't be changing kafka version. As I said above Kafka >>>>> 0.9.0.1 contains two kafka apis new ones which will only works with >>>>> 0.9.0.1 >>>>> kafka cluster and old consumer apis which can work 0.8.2. Even though you >>>>> compile with 0.9.0.1 version it will work with 0.8.2.1 kafka cluster. >>>>> > >>>>> > Let me know if you have any questions. >>>>> > >>>>> > Thanks, >>>>> > Harsha >>>>> > >>>>> >> On Fri, Apr 15, 2016, at 11:39 AM, John Yost wrote: >>>>> >> Hi Everyone, >>>>> >> I know that Storm 1.0.0 is designed to work with Kafka 0.9.0.1, but >>>>> is it possible for Storm 1.0.0 to work with 0.8.2.1? >>>>> >> >>>>> >> I did some integration testing with 0.9.0.1 client with 0.8.2.1 >>>>> broker, which is yucky, and it appears this won't work. >>>>> >> >>>>> >> I checked out the v1.0.0 tag, changed the kafka version to 0.8.2.1, >>>>> and I am getting compile errors in storm-kafka-client. I don't have a >>>>> problem fixing the errors, but I want to ensure I am not wasting my time. >>>>> :) >>>>> >> >>>>> >> Please confirm if it is anticipated that Storm 1.0.0 can be made >>>>> compatible with Kafka 0.8.2.x--thanks! >>>>> >> --John >>>>> > >>>>> >>>> >>>> >>> >> >> >> -- >> Regards, >> Abhishek Agarwal >> >> >
