Hi Philip.

   It looks like this is our case:
https://github.com/nathanmarz/storm-contrib/pull/15

It is interesting that the issue is still open ( after more then 1 year) so
I am curious how people able to work on production without ability to
deploy another topology.
Can community please share is this patch resolve the issue and who is using
it on production.

Also question : should I change zookeeper , kafka configuration to resolve
the issue? If yes please share what should be changed.

Thanks
Oleg.



On Tue, Nov 19, 2013 at 11:51 AM, Philip O'Toole <phi...@loggly.com> wrote:

> Don't get scared, this if perfectly normal and easily fixed. :-) The second
> topology attempted to fetch messages from an offset in Kafka that does not
> exists. This could happen due to Kafka retention policies (messages
> deleted) or a bug in your code. Your code needs to catch this exception,
> and then ask Kafka for the earliest -- or latest offset (take your pick) --
> and then re-issue the fetch using the returned offset.
>
> Are you using a separate path in ZK for the second topology? It is of a
> completely different nature than the first?
>
> Philip
>
>
>
>
> On Mon, Nov 18, 2013 at 7:40 PM, Oleg Ruchovets <oruchov...@gmail.com
> >wrote:
>
> > We are working with kafka  (0.7.2) + storm.
> >    1) We deployed 1st topology which subscribed on Kafka topic and it is
> > working fine already couple of weeks.
> >     2) Yesterday we deploy 2nd topology which subscribed on the  same
> Kafka
> > topic , but 2nd topology immediately failed with exception:
> >
> > *What can cause such behavior and how we can resolve the issue: *
> >
> >
> > java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException
> >
> >                 at
> >
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> >
> >                 at
> >
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
> >
> >                 at
> >
> >
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> >
> >                 at
> >
> >
> backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)
> >
> >                 at
> > backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
> >
> >                 at clojure.lang.AFn.run(AFn.java:24)
> >
> >                 at java.lang.Thread.run(Thread.java:662)
> >
> > Caused by: kafka.common.OffsetOutOfRangeException
> >
> >                 at
> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >
> >                 at
> >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
> >
> >                 at
> >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
> >
> >                 at
> > java.lang.reflect.Constructor.newInstance(Constructor.java:513)
> >
> >                 at java.lang.Class.newInstance0(Class.java:355)
> >
> >                 at java.lang.Class.newInstance(Class.java:308)
> >
> >                 at
> > kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)
> >
> >                 at
> >
> >
> kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99)
> >
> >                 at
> >
> kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82)
> >
> >                 at
> >
> >
> kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51)
> >
> >                 at
> >
> >
> kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50)
> >
> >                 at
> > storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36)
> >
> >                 at
> >
> >
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75)
> >
> >                 at
> >
> >
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64)
> >
> >                 at
> >
> >
> backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90)
> >
> >                 at
> >
> >
> backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47)
> >
> >                 at
> >
> >
> backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307)
> >
> >                 at
> >
> >
> backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566)
> >
> >                 at
> >
> >
> backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345)
> >
> >                 at
> >
> >
> backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
> >
> >                 at
> >
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
> >
> >                 ... 6 more
> >
> > Mon, 18 Nov 2013 12:36:25 +0000
> >
> > java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException
> >
> >                 at
> >
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> >
> >                 at
> >
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
> >
> >                 at
> >
> >
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> >
> >                 at
> >
> >
> backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)
> >
> >                 at
> > backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
> >
> >                 at clojure.lang.AFn.run(AFn.java:24)
> >
> >                 at java.lang.Thread.run(Thread.java:662)
> >
> > Caused by: kafka.common.OffsetOutOfRangeException
> >
> >                 at
> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >
> >                 at
> >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
> >
> >                 at
> >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
> >
> >                 at
> > java.lang.reflect.Constructor.newInstance(Constructor.java:513)
> >
> >                 at java.lang.Class.newInstance0(Class.java:355)
> >
> >                 at java.lang.Class.newInstance(Class.java:308)
> >
> >                 at
> > kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)
> >
> >                 at
> >
> >
> kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99)
> >
> >                 at
> >
> kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82)
> >
> >                 at
> >
> >
> kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51)
> >
> >                 at
> >
> >
> kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50)
> >
> >                 at
> > storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36)
> >
> >                 at
> >
> >
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75)
> >
> >                 at
> >
> >
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64)
> >
> >                 at
> >
> >
> backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90)
> >
> >                 at
> >
> >
> backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47)
> >
> >                 at
> >
> >
> backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307)
> >
> >                 at
> >
> >
> backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566)
> >
> >                 at
> >
> >
> backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345)
> >
> >                 at
> >
> >
> backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
> >
> >                 at
> >
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
> >
> >                 ... 6 more
> >
> > Mon, 18 Nov 2013 12:35:49 +0000
> >
> > java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException
> >
> >                 at
> >
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> >
> >                 at
> >
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
> >
> >                 at
> >
> >
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> >
> >                 at
> >
> >
> backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)
> >
> >                 at
> > backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
> >
> >                 at clojure.lang.AFn.run(AFn.java:24)
> >
> >                 at java.lang.Thread.run(Thread.java:662)
> >
> > Caused by: kafka.common.OffsetOutOfRangeException
> >
> >                 at
> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >
> >                 at
> >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
> >
> >                 at
> >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
> >
> >                 at
> > java.lang.reflect.Constructor.newInstance(Constructor.java:513)
> >
> >                 at java.lang.Class.newInstance0(Class.java:355)
> >
> >                 at java.lang.Class.newInstance(Class.java:308)
> >
> >                 at
> > kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)
> >
> >                 at
> >
> >
> kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99)
> >
> >                 at
> >
> kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82)
> >
> >                 at
> >
> >
> kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51)
> >
> >                 at
> >
> >
> kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50)
> >
> >                 at
> > storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36)
> >
> >                 at
> >
> >
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75)
> >
> >                 at
> >
> >
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64)
> >
> >                 at
> >
> >
> backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90)
> >
> >                 at
> >
> >
> backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47)
> >
> >                 at
> >
> >
> backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307)
> >
> >                 at
> >
> >
> backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566)
> >
> >                 at
> >
> >
> backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345)
> >
> >                 at
> >
> >
> backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
> >
> >                 at
> >
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
> >
> >                 ... 6 more
> >
>

Reply via email to