Argh, fat fingers...I am attempting to write to Kafka 0.8.2.1 from Storm
1.0.0 which is has a dependency upon Kafka 0.9.0.1.

@Abhishek -> interesting you are seeing the same exception for Storm 0.10.0
because that has a dependency upon Kafka 0.8.1.1.

On Wed, Apr 20, 2016 at 8:06 AM, John Yost <[email protected]> wrote:

> Oh, gotcha, okay, will do. BTW, here's the link I failed to provide the
> first time: https://github.com/confluentinc/examples/issues/15
>
> --John
>
> On Wed, Apr 20, 2016 at 7:44 AM, Abhishek Agarwal <[email protected]>
> wrote:
>
>> @John -
>> can you file a JIRA for this? I doubt it is related to 1.0.0 version in
>> particular. I have run into "illegalArugmentExceptions" in KafkaSpout
>> (0.10.0).
>>
>> On Wed, Apr 20, 2016 at 4:44 PM, John Yost <[email protected]> wrote:
>>
>>> Also, I found this link that indicates the exception I reported
>>> yesterday can be symptomatic of a mismatch between the client and broker
>>> where the client is one version newer.  I am not saying that's the case
>>> here with Storm 1.0.0, but wanted to provide this info troubleshooting-wise.
>>>
>>> Thanks
>>>
>>> --John
>>>
>>> On Tue, Apr 19, 2016 at 3:26 PM, John Yost <[email protected]> wrote:
>>>
>>>> Hi Harsha,
>>>>
>>>> When the Storm 1.0.0 KafkaSpout (from the storm-kafka jar) attempts to
>>>> read from the Kafka 0.8.2.1 partition an IlegalArgumentException is thrown,
>>>> the root exception of which is as follows:
>>>>
>>>> at java.nio.Buffer.limit(Buffer.java:267)
>>>> at
>>>> kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:37)
>>>> at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:99)
>>>> at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:97)
>>>> at
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>> at
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>> at scala.collection.immutable.Range.foreach(Range.scala:141)
>>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>>> at kafka.api.TopicData$.readFrom(FetchResponse.scala:97)
>>>> at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
>>>> at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:168)
>>>> at
>>>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>>> at
>>>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>>> at scala.collection.immutable.Range.foreach(Range.scala:141)
>>>> at
>>>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>>>> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>>>>
>>>> The corresponding source code in Kafka where the root exception is
>>>> thrown is bolded:
>>>>
>>>> object FetchResponsePartitionData {
>>>>   def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
>>>>     val error = buffer.getShort
>>>>     val hw = buffer.getLong
>>>>     val messageSetSize = buffer.getInt
>>>>     val messageSetBuffer = buffer.slice()
>>>>  *   messageSetBuffer.limit(messageSetSize)*
>>>>     buffer.position(buffer.position + messageSetSize)
>>>>     new FetchResponsePartitionData(error, hw, new
>>>> ByteBufferMessageSet(messageSetBuffer))
>>>>   }
>>>>
>>>> I am using all the default KafkaConfig settings for the KafkaSpout with
>>>> the exception of startOffsetTime, so I don't *think* I have a
>>>> misconfiguration, but I may be wrong.
>>>>
>>>> Please confirm if there is anything I need to do config-wise to make
>>>> this work.
>>>>
>>>> Thanks
>>>>
>>>> --John
>>>>
>>>> On Sat, Apr 16, 2016 at 10:49 PM, <[email protected]> wrote:
>>>>
>>>>> Awesome, thanks Harsha!
>>>>>
>>>>> --John
>>>>>
>>>>> Sent from my iPhone
>>>>>
>>>>> > On Apr 16, 2016, at 1:28 PM, Harsha <[email protected]> wrote:
>>>>> >
>>>>> > John,
>>>>> >             I think you are asking if you will be able to run 0.8.2
>>>>> kafka consumer in storm 1.0.0 . Yes we are shipping storm-kafka-client
>>>>> which uses the new consumer api in kafka 0.9.0.1 but storm 1.0.0 still
>>>>> ships with storm-kafka which uses older consumer api which can work with
>>>>> all versions of kafka including 0.9.0.1.
>>>>> >
>>>>> > "I checked out the v1.0.0 tag, changed the kafka version to 0.8.2.1,
>>>>> and I am getting compile errors in storm-kafka-client. I don't have a
>>>>> problem fixing the errors, but I want to ensure I am not wasting my time.
>>>>> :)"
>>>>> >
>>>>> > You shouldn't be changing kafka version. As I said above Kafka
>>>>> 0.9.0.1 contains two kafka apis new ones which will only works with 
>>>>> 0.9.0.1
>>>>> kafka cluster and old consumer apis which can work 0.8.2. Even though you
>>>>> compile with 0.9.0.1 version it will work with 0.8.2.1 kafka cluster.
>>>>> >
>>>>> > Let me know if you have any questions.
>>>>> >
>>>>> > Thanks,
>>>>> > Harsha
>>>>> >
>>>>> >> On Fri, Apr 15, 2016, at 11:39 AM, John Yost wrote:
>>>>> >> Hi Everyone,
>>>>> >> I know that Storm 1.0.0 is designed to work with Kafka 0.9.0.1, but
>>>>> is it possible for Storm 1.0.0 to work with 0.8.2.1?
>>>>> >>
>>>>> >> I did some integration testing with 0.9.0.1 client with 0.8.2.1
>>>>> broker, which is yucky, and it appears this won't work.
>>>>> >>
>>>>> >> I checked out the v1.0.0 tag, changed the kafka version to 0.8.2.1,
>>>>> and I am getting compile errors in storm-kafka-client. I don't have a
>>>>> problem fixing the errors, but I want to ensure I am not wasting my time. 
>>>>> :)
>>>>> >>
>>>>> >> Please confirm if it is anticipated that Storm 1.0.0 can be made
>>>>> compatible with Kafka 0.8.2.x--thanks!
>>>>> >> --John
>>>>> >
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Regards,
>> Abhishek Agarwal
>>
>>
>

Reply via email to