Hi Philip.
   Noted and really appreciate for your inputs. There is no problem to
patch the code.
I just didn't want to be coupled with forked version of the storm-kafka
spout. Since my patch will not be in the main branch of the code.

Thanks
Oleg.


On Tue, Nov 19, 2013 at 9:44 PM, Philip O'Toole <phi...@loggly.com> wrote:

> The Storm mailing list is probably a better place for this thread. If I
> understand the issue, it is not a ZK issue, nor a Kafka config issue. We
> run multiple topos draining the same topics all the time.
>
> In any event, you just need to patch the Kafka Spout code to catch this
> exception, and ask Kafka for the earliest or latest offset (make that
> choice configurable), and reset with that. We did that here at Loggly.
>
> I don't have a patch to hand, but to be honest, if you want to have any
> hope of running Kafka and Storm in production you should attempt to code
> the patch yourself. It will teach you stuff you absolutely need to
> understand about these two pieces of software, and offset management is
> particularly important.
>
> It's not a difficult change though. Just build and instrument the code to
> start. You'll thank me when you hit your next offset-related issue.
>
> Philip
>
> On Nov 18, 2013, at 11:10 PM, Oleg Ruchovets <oruchov...@gmail.com> wrote:
>
> > Hi Philip.
> >
> >   It looks like this is our case:
> > https://github.com/nathanmarz/storm-contrib/pull/15
> >
> > It is interesting that the issue is still open ( after more then 1 year)
> so
> > I am curious how people able to work on production without ability to
> > deploy another topology.
> > Can community please share is this patch resolve the issue and who is
> using
> > it on production.
> >
> > Also question : should I change zookeeper , kafka configuration to
> resolve
> > the issue? If yes please share what should be changed.
> >
> > Thanks
> > Oleg.
> >
> >
> >
> > On Tue, Nov 19, 2013 at 11:51 AM, Philip O'Toole <phi...@loggly.com>
> wrote:
> >
> >> Don't get scared, this if perfectly normal and easily fixed. :-) The
> second
> >> topology attempted to fetch messages from an offset in Kafka that does
> not
> >> exists. This could happen due to Kafka retention policies (messages
> >> deleted) or a bug in your code. Your code needs to catch this exception,
> >> and then ask Kafka for the earliest -- or latest offset (take your
> pick) --
> >> and then re-issue the fetch using the returned offset.
> >>
> >> Are you using a separate path in ZK for the second topology? It is of a
> >> completely different nature than the first?
> >>
> >> Philip
> >>
> >>
> >>
> >>
> >> On Mon, Nov 18, 2013 at 7:40 PM, Oleg Ruchovets <oruchov...@gmail.com
> >>> wrote:
> >>
> >>> We are working with kafka  (0.7.2) + storm.
> >>>   1) We deployed 1st topology which subscribed on Kafka topic and it is
> >>> working fine already couple of weeks.
> >>>    2) Yesterday we deploy 2nd topology which subscribed on the  same
> >> Kafka
> >>> topic , but 2nd topology immediately failed with exception:
> >>>
> >>> *What can cause such behavior and how we can resolve the issue: *
> >>>
> >>>
> >>> java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)
> >>>
> >>>                at
> >>> backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
> >>>
> >>>                at clojure.lang.AFn.run(AFn.java:24)
> >>>
> >>>                at java.lang.Thread.run(Thread.java:662)
> >>>
> >>> Caused by: kafka.common.OffsetOutOfRangeException
> >>>
> >>>                at
> >>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >>>
> >>>                at
> >>>
> >>>
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
> >>>
> >>>                at
> >>>
> >>>
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
> >>>
> >>>                at
> >>> java.lang.reflect.Constructor.newInstance(Constructor.java:513)
> >>>
> >>>                at java.lang.Class.newInstance0(Class.java:355)
> >>>
> >>>                at java.lang.Class.newInstance(Class.java:308)
> >>>
> >>>                at
> >>> kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)
> >>>
> >>>                at
> >>>
> >>>
> >>
> kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99)
> >>>
> >>>                at
> >>>
> >>
> kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82)
> >>>
> >>>                at
> >>>
> >>>
> >>
> kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51)
> >>>
> >>>                at
> >>>
> >>>
> >>
> kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50)
> >>>
> >>>                at
> >>> storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36)
> >>>
> >>>                at
> >>>
> >>>
> >>
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75)
> >>>
> >>>                at
> >>>
> >>>
> >>
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
> >>>
> >>>                ... 6 more
> >>>
> >>> Mon, 18 Nov 2013 12:36:25 +0000
> >>>
> >>> java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)
> >>>
> >>>                at
> >>> backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
> >>>
> >>>                at clojure.lang.AFn.run(AFn.java:24)
> >>>
> >>>                at java.lang.Thread.run(Thread.java:662)
> >>>
> >>> Caused by: kafka.common.OffsetOutOfRangeException
> >>>
> >>>                at
> >>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >>>
> >>>                at
> >>>
> >>>
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
> >>>
> >>>                at
> >>>
> >>>
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
> >>>
> >>>                at
> >>> java.lang.reflect.Constructor.newInstance(Constructor.java:513)
> >>>
> >>>                at java.lang.Class.newInstance0(Class.java:355)
> >>>
> >>>                at java.lang.Class.newInstance(Class.java:308)
> >>>
> >>>                at
> >>> kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)
> >>>
> >>>                at
> >>>
> >>>
> >>
> kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99)
> >>>
> >>>                at
> >>>
> >>
> kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82)
> >>>
> >>>                at
> >>>
> >>>
> >>
> kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51)
> >>>
> >>>                at
> >>>
> >>>
> >>
> kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50)
> >>>
> >>>                at
> >>> storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36)
> >>>
> >>>                at
> >>>
> >>>
> >>
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75)
> >>>
> >>>                at
> >>>
> >>>
> >>
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
> >>>
> >>>                ... 6 more
> >>>
> >>> Mon, 18 Nov 2013 12:35:49 +0000
> >>>
> >>> java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)
> >>>
> >>>                at
> >>> backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
> >>>
> >>>                at clojure.lang.AFn.run(AFn.java:24)
> >>>
> >>>                at java.lang.Thread.run(Thread.java:662)
> >>>
> >>> Caused by: kafka.common.OffsetOutOfRangeException
> >>>
> >>>                at
> >>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >>>
> >>>                at
> >>>
> >>>
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
> >>>
> >>>                at
> >>>
> >>>
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
> >>>
> >>>                at
> >>> java.lang.reflect.Constructor.newInstance(Constructor.java:513)
> >>>
> >>>                at java.lang.Class.newInstance0(Class.java:355)
> >>>
> >>>                at java.lang.Class.newInstance(Class.java:308)
> >>>
> >>>                at
> >>> kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)
> >>>
> >>>                at
> >>>
> >>>
> >>
> kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99)
> >>>
> >>>                at
> >>>
> >>
> kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82)
> >>>
> >>>                at
> >>>
> >>>
> >>
> kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51)
> >>>
> >>>                at
> >>>
> >>>
> >>
> kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50)
> >>>
> >>>                at
> >>> storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36)
> >>>
> >>>                at
> >>>
> >>>
> >>
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75)
> >>>
> >>>                at
> >>>
> >>>
> >>
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
> >>>
> >>>                at
> >>>
> >>>
> >>
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
> >>>
> >>>                ... 6 more
> >>>
> >>
>

Reply via email to