[ https://issues.apache.org/jira/browse/STORM-586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252674#comment-14252674 ]
ASF GitHub Bot commented on STORM-586: -------------------------------------- Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/339#issuecomment-67582859 @nathanmarz the re-emit can hit this issue in many pathological cases , bad kafka config, transient network failure causing storm batches to fail repeatedly while kafka queue with low retention getting truncated, very slow topology with really fast kafka producers with low retention rate so mostly around kafka config with lower retention rates. I have made the change you suggested during reemit we don't catch exception but let the spout fail. > Trident kafka spout fails instead of updating offset when kafka offset is out > of range. > --------------------------------------------------------------------------------------- > > Key: STORM-586 > URL: https://issues.apache.org/jira/browse/STORM-586 > Project: Apache Storm > Issue Type: Bug > Affects Versions: 0.9.3 > Reporter: Parth Brahmbhatt > Assignee: Parth Brahmbhatt > Priority: Critical > > Trident KafkaEmitter does not catch the newly added UpdateOffsetException > which results in the spout failing repeatedly instead of automatically > updating the offset to earliest time. > PROBLEM: > Exception while using the Trident Kafka Spout. > 2014-12-04 18:38:03 b.s.util ERROR Async loop died! > java.lang.RuntimeException: storm.kafka.UpdateOffsetException > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.daemon.executor$fn_4195$fn4207$fn_4254.invoke(executor.clj:745) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at backtype.storm.util$async_loop$fn__442.invoke(util.clj:436) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at clojure.lang.AFn.run(AFn.java:24) clojure-1.4.0.jar:na > at java.lang.Thread.run(Thread.java:745) na:1.7.0_71 > Caused by: storm.kafka.UpdateOffsetException: null > at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186) ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132) > ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113) > ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72) > ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter.access$400(TridentKafkaEmitter.java:46) > ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:233) > ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:225) > ~stormjar.jar:na > at > storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:125) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:83) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:110) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:121) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.daemon.executor$fn_4195$tuple_action_fn_4197.invoke(executor.clj:630) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.daemon.executor$mk_task_receiver$fn__4118.invoke(executor.clj:398) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.disruptor$clojure_handler$reify__723.onEvent(disruptor.clj:58) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:99) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > ... 6 common frames omitted > 2014-12-04 18:38:03 b.s.d.executor ERROR > java.lang.RuntimeException: storm.kafka.UpdateOffsetException > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107) > ~[storm-core-0.9.1.2.1.7.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)