[jira] [Commented] (STORM-3013) Deactivated topology restarts if data flows into Kafka
[ https://issues.apache.org/jira/browse/STORM-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542429#comment-16542429 ] Jungtaek Lim commented on STORM-3013: - Also merged into 1.x-branch. > Deactivated topology restarts if data flows into Kafka > -- > > Key: STORM-3013 > URL: https://issues.apache.org/jira/browse/STORM-3013 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.2.1 >Reporter: Ajeesh B >Assignee: Stig Rohde Døssing >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0, 1.2.3 > > Time Spent: 50m > Remaining Estimate: 0h > > Hi, I have deactivated the storm topology & then if I produce any records > into Kafka, Storm throws an exception. Exception follows, > {code:java} > 2018-03-28 09:50:23.804 o.a.s.d.executor Thread-83-kafkaLogs-executor[130 > 130] [INFO] Deactivating spout kafkaLogs:(130) > 2018-03-28 09:51:01.289 o.a.s.util Thread-17-kafkaLogs-executor[139 139] > [ERROR] Async loop died! > java.lang.RuntimeException: java.lang.IllegalStateException: This consumer > has already been closed. > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45] > Caused by: java.lang.IllegalStateException: This consumer has already been > closed. > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79) > ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345) > ~[storm-core-1.2.1.jar:1.2.1] > at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?] > at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?] > at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?] > at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) > ~[clojure-1.7.0.jar:?] > at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?] > at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?] > at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 7 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-3013) Deactivated topology restarts if data flows into Kafka
[ https://issues.apache.org/jira/browse/STORM-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456947#comment-16456947 ] Stig Rohde Døssing commented on STORM-3013: --- [~Ajeesh] Kindly reminder, did the fix resolve your problem? > Deactivated topology restarts if data flows into Kafka > -- > > Key: STORM-3013 > URL: https://issues.apache.org/jira/browse/STORM-3013 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.2.1 >Reporter: Ajeesh B >Assignee: Stig Rohde Døssing >Priority: Major > > Hi, I have deactivated the storm topology & then if I produce any records > into Kafka, Storm throws an exception. Exception follows, > {code:java} > 2018-03-28 09:50:23.804 o.a.s.d.executor Thread-83-kafkaLogs-executor[130 > 130] [INFO] Deactivating spout kafkaLogs:(130) > 2018-03-28 09:51:01.289 o.a.s.util Thread-17-kafkaLogs-executor[139 139] > [ERROR] Async loop died! > java.lang.RuntimeException: java.lang.IllegalStateException: This consumer > has already been closed. > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45] > Caused by: java.lang.IllegalStateException: This consumer has already been > closed. > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79) > ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345) > ~[storm-core-1.2.1.jar:1.2.1] > at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?] > at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?] > at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?] > at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) > ~[clojure-1.7.0.jar:?] > at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?] > at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?] > at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 7 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-3013) Deactivated topology restarts if data flows into Kafka
[ https://issues.apache.org/jira/browse/STORM-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420792#comment-16420792 ] Stig Rohde Døssing commented on STORM-3013: --- Just to clarify, the only changes are in storm-kafka-client, so you only need to build that part of Storm. > Deactivated topology restarts if data flows into Kafka > -- > > Key: STORM-3013 > URL: https://issues.apache.org/jira/browse/STORM-3013 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.2.1 >Reporter: Ajeesh B >Assignee: Stig Rohde Døssing >Priority: Major > > Hi, I have deactivated the storm topology & then if I produce any records > into Kafka, Storm throws an exception. Exception follows, > {code:java} > 2018-03-28 09:50:23.804 o.a.s.d.executor Thread-83-kafkaLogs-executor[130 > 130] [INFO] Deactivating spout kafkaLogs:(130) > 2018-03-28 09:51:01.289 o.a.s.util Thread-17-kafkaLogs-executor[139 139] > [ERROR] Async loop died! > java.lang.RuntimeException: java.lang.IllegalStateException: This consumer > has already been closed. > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45] > Caused by: java.lang.IllegalStateException: This consumer has already been > closed. > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79) > ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345) > ~[storm-core-1.2.1.jar:1.2.1] > at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?] > at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?] > at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?] > at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) > ~[clojure-1.7.0.jar:?] > at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?] > at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?] > at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 7 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-3013) Deactivated topology restarts if data flows into Kafka
[ https://issues.apache.org/jira/browse/STORM-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420744#comment-16420744 ] Stig Rohde Døssing commented on STORM-3013: --- [~Ajeesh] I've fixed the metrics issue. Could you try it out and see if it resolves the problems you're having? The code is available at https://github.com/srdo/storm/tree/STORM-3013-1.x. > Deactivated topology restarts if data flows into Kafka > -- > > Key: STORM-3013 > URL: https://issues.apache.org/jira/browse/STORM-3013 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.2.1 >Reporter: Ajeesh B >Assignee: Stig Rohde Døssing >Priority: Major > > Hi, I have deactivated the storm topology & then if I produce any records > into Kafka, Storm throws an exception. Exception follows, > {code:java} > 2018-03-28 09:50:23.804 o.a.s.d.executor Thread-83-kafkaLogs-executor[130 > 130] [INFO] Deactivating spout kafkaLogs:(130) > 2018-03-28 09:51:01.289 o.a.s.util Thread-17-kafkaLogs-executor[139 139] > [ERROR] Async loop died! > java.lang.RuntimeException: java.lang.IllegalStateException: This consumer > has already been closed. > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45] > Caused by: java.lang.IllegalStateException: This consumer has already been > closed. > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79) > ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345) > ~[storm-core-1.2.1.jar:1.2.1] > at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?] > at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?] > at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?] > at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) > ~[clojure-1.7.0.jar:?] > at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?] > at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?] > at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 7 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-3013) Deactivated topology restarts if data flows into Kafka
[ https://issues.apache.org/jira/browse/STORM-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418048#comment-16418048 ] Stig Rohde Døssing commented on STORM-3013: --- [~Ajeesh] Whoops, the issue is that we didn't expect Storm to ask the spout for metrics while the spout is deactivated. When this happens, the spout tries to use the consumer to get e.g. how far behind the latest offset the consumer is, which throws this exception. I'll take a look at fixing this, unless you want to? > Deactivated topology restarts if data flows into Kafka > -- > > Key: STORM-3013 > URL: https://issues.apache.org/jira/browse/STORM-3013 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 1.2.1 >Reporter: Ajeesh B >Priority: Major > > Hi, I have deactivated the storm topology & then if I produce any records > into Kafka, Storm throws an exception. Exception follows, > {code:java} > 2018-03-28 09:50:23.804 o.a.s.d.executor Thread-83-kafkaLogs-executor[130 > 130] [INFO] Deactivating spout kafkaLogs:(130) > 2018-03-28 09:51:01.289 o.a.s.util Thread-17-kafkaLogs-executor[139 139] > [ERROR] Async loop died! > java.lang.RuntimeException: java.lang.IllegalStateException: This consumer > has already been closed. > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45] > Caused by: java.lang.IllegalStateException: This consumer has already been > closed. > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79) > ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345) > ~[storm-core-1.2.1.jar:1.2.1] > at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?] > at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?] > at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?] > at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) > ~[clojure-1.7.0.jar:?] > at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?] > at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?] > at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 7 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)