[ 
https://issues.apache.org/jira/browse/STORM-586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252851#comment-14252851
 ] 

ASF GitHub Bot commented on STORM-586:
--------------------------------------

Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/339#issuecomment-67593637
  
    @nathanmarz  there is another case where your topology is running and 
processed all the existing data and there is no new data came into kafka topic 
which could cause offset out of range.


> Trident kafka spout fails instead of updating offset when kafka offset is out 
> of range.
> ---------------------------------------------------------------------------------------
>
>                 Key: STORM-586
>                 URL: https://issues.apache.org/jira/browse/STORM-586
>             Project: Apache Storm
>          Issue Type: Bug
>    Affects Versions: 0.9.3
>            Reporter: Parth Brahmbhatt
>            Assignee: Parth Brahmbhatt
>            Priority: Critical
>
> Trident KafkaEmitter does not catch the newly added UpdateOffsetException 
> which results in the spout failing repeatedly instead of automatically 
> updating the offset to earliest time. 
> PROBLEM: 
> Exception while using the Trident Kafka Spout.
> 2014-12-04 18:38:03 b.s.util ERROR Async loop died! 
> java.lang.RuntimeException: storm.kafka.UpdateOffsetException 
> at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.daemon.executor$fn_4195$fn4207$fn_4254.invoke(executor.clj:745)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at backtype.storm.util$async_loop$fn__442.invoke(util.clj:436) 
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at clojure.lang.AFn.run(AFn.java:24) clojure-1.4.0.jar:na 
> at java.lang.Thread.run(Thread.java:745) na:1.7.0_71 
> Caused by: storm.kafka.UpdateOffsetException: null 
> at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186) ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132)
>  ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113)
>  ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72)
>  ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter.access$400(TridentKafkaEmitter.java:46)
>  ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:233)
>  ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:225)
>  ~stormjar.jar:na 
> at 
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:125)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:83)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:110)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:121)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.daemon.executor$fn_4195$tuple_action_fn_4197.invoke(executor.clj:630)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.daemon.executor$mk_task_receiver$fn__4118.invoke(executor.clj:398)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.disruptor$clojure_handler$reify__723.onEvent(disruptor.clj:58) 
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:99)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> ... 6 common frames omitted 
> 2014-12-04 18:38:03 b.s.d.executor ERROR 
> java.lang.RuntimeException: storm.kafka.UpdateOffsetException 
> at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
>  ~[storm-core-0.9.1.2.1.7.0



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to