Hi Harsha,

When the Storm 1.0.0 KafkaSpout (from the storm-kafka jar) attempts to read
from the Kafka 0.8.2.1 partition an IlegalArgumentException is thrown, the
root exception of which is as follows:

at java.nio.Buffer.limit(Buffer.java:267)
at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:37)
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:99)
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:97)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.api.TopicData$.readFrom(FetchResponse.scala:97)
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:168)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)

The corresponding source code in Kafka where the root exception is thrown
is bolded:

object FetchResponsePartitionData {
  def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
    val error = buffer.getShort
    val hw = buffer.getLong
    val messageSetSize = buffer.getInt
    val messageSetBuffer = buffer.slice()
 *   messageSetBuffer.limit(messageSetSize)*
    buffer.position(buffer.position + messageSetSize)
    new FetchResponsePartitionData(error, hw, new
ByteBufferMessageSet(messageSetBuffer))
  }

I am using all the default KafkaConfig settings for the KafkaSpout with the
exception of startOffsetTime, so I don't *think* I have a misconfiguration,
but I may be wrong.

Please confirm if there is anything I need to do config-wise to make this
work.

Thanks

--John

On Sat, Apr 16, 2016 at 10:49 PM, <[email protected]> wrote:

> Awesome, thanks Harsha!
>
> --John
>
> Sent from my iPhone
>
> > On Apr 16, 2016, at 1:28 PM, Harsha <[email protected]> wrote:
> >
> > John,
> >             I think you are asking if you will be able to run 0.8.2
> kafka consumer in storm 1.0.0 . Yes we are shipping storm-kafka-client
> which uses the new consumer api in kafka 0.9.0.1 but storm 1.0.0 still
> ships with storm-kafka which uses older consumer api which can work with
> all versions of kafka including 0.9.0.1.
> >
> > "I checked out the v1.0.0 tag, changed the kafka version to 0.8.2.1, and
> I am getting compile errors in storm-kafka-client. I don't have a problem
> fixing the errors, but I want to ensure I am not wasting my time. :)"
> >
> > You shouldn't be changing kafka version. As I said above Kafka 0.9.0.1
> contains two kafka apis new ones which will only works with 0.9.0.1 kafka
> cluster and old consumer apis which can work 0.8.2. Even though you compile
> with 0.9.0.1 version it will work with 0.8.2.1 kafka cluster.
> >
> > Let me know if you have any questions.
> >
> > Thanks,
> > Harsha
> >
> >> On Fri, Apr 15, 2016, at 11:39 AM, John Yost wrote:
> >> Hi Everyone,
> >> I know that Storm 1.0.0 is designed to work with Kafka 0.9.0.1, but is
> it possible for Storm 1.0.0 to work with 0.8.2.1?
> >>
> >> I did some integration testing with 0.9.0.1 client with 0.8.2.1 broker,
> which is yucky, and it appears this won't work.
> >>
> >> I checked out the v1.0.0 tag, changed the kafka version to 0.8.2.1, and
> I am getting compile errors in storm-kafka-client. I don't have a problem
> fixing the errors, but I want to ensure I am not wasting my time. :)
> >>
> >> Please confirm if it is anticipated that Storm 1.0.0 can be made
> compatible with Kafka 0.8.2.x--thanks!
> >> --John
> >
>

Reply via email to