[jira] [Commented] (STORM-3013) Deactivated topology restarts if data flows into Kafka

2018-07-12 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/STORM-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542429#comment-16542429
 ] 

Jungtaek Lim commented on STORM-3013:
-

Also merged into 1.x-branch.

> Deactivated topology restarts if data flows into Kafka
> --
>
> Key: STORM-3013
> URL: https://issues.apache.org/jira/browse/STORM-3013
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.2.1
>Reporter: Ajeesh B
>Assignee: Stig Rohde Døssing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.2.3
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Hi, I have deactivated the storm topology & then if I produce any records 
> into Kafka, Storm throws an exception. Exception follows,
> {code:java}
> 2018-03-28 09:50:23.804 o.a.s.d.executor Thread-83-kafkaLogs-executor[130 
> 130] [INFO] Deactivating spout kafkaLogs:(130)
> 2018-03-28 09:51:01.289 o.a.s.util Thread-17-kafkaLogs-executor[139 139] 
> [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.IllegalStateException: This consumer 
> has already been closed.
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
> [storm-core-1.2.1.jar:1.2.1]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45]
> Caused by: java.lang.IllegalStateException: This consumer has already been 
> closed.
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
>  ~[stormjar.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
> at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]
> at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]
> at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?]
> at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
>  ~[storm-core-1.2.1.jar:1.2.1]
> ... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-3013) Deactivated topology restarts if data flows into Kafka

2018-04-27 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456947#comment-16456947
 ] 

Stig Rohde Døssing commented on STORM-3013:
---

[~Ajeesh] Kindly reminder, did the fix resolve your problem?

> Deactivated topology restarts if data flows into Kafka
> --
>
> Key: STORM-3013
> URL: https://issues.apache.org/jira/browse/STORM-3013
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.2.1
>Reporter: Ajeesh B
>Assignee: Stig Rohde Døssing
>Priority: Major
>
> Hi, I have deactivated the storm topology & then if I produce any records 
> into Kafka, Storm throws an exception. Exception follows,
> {code:java}
> 2018-03-28 09:50:23.804 o.a.s.d.executor Thread-83-kafkaLogs-executor[130 
> 130] [INFO] Deactivating spout kafkaLogs:(130)
> 2018-03-28 09:51:01.289 o.a.s.util Thread-17-kafkaLogs-executor[139 139] 
> [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.IllegalStateException: This consumer 
> has already been closed.
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
> [storm-core-1.2.1.jar:1.2.1]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45]
> Caused by: java.lang.IllegalStateException: This consumer has already been 
> closed.
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
>  ~[stormjar.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
> at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]
> at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]
> at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?]
> at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
>  ~[storm-core-1.2.1.jar:1.2.1]
> ... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-3013) Deactivated topology restarts if data flows into Kafka

2018-03-30 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420792#comment-16420792
 ] 

Stig Rohde Døssing commented on STORM-3013:
---

Just to clarify, the only changes are in storm-kafka-client, so you only need 
to build that part of Storm.

> Deactivated topology restarts if data flows into Kafka
> --
>
> Key: STORM-3013
> URL: https://issues.apache.org/jira/browse/STORM-3013
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.2.1
>Reporter: Ajeesh B
>Assignee: Stig Rohde Døssing
>Priority: Major
>
> Hi, I have deactivated the storm topology & then if I produce any records 
> into Kafka, Storm throws an exception. Exception follows,
> {code:java}
> 2018-03-28 09:50:23.804 o.a.s.d.executor Thread-83-kafkaLogs-executor[130 
> 130] [INFO] Deactivating spout kafkaLogs:(130)
> 2018-03-28 09:51:01.289 o.a.s.util Thread-17-kafkaLogs-executor[139 139] 
> [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.IllegalStateException: This consumer 
> has already been closed.
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
> [storm-core-1.2.1.jar:1.2.1]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45]
> Caused by: java.lang.IllegalStateException: This consumer has already been 
> closed.
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
>  ~[stormjar.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
> at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]
> at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]
> at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?]
> at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
>  ~[storm-core-1.2.1.jar:1.2.1]
> ... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-3013) Deactivated topology restarts if data flows into Kafka

2018-03-30 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420744#comment-16420744
 ] 

Stig Rohde Døssing commented on STORM-3013:
---

[~Ajeesh] I've fixed the metrics issue. Could you try it out and see if it 
resolves the problems you're having? The code is available at 
https://github.com/srdo/storm/tree/STORM-3013-1.x.

> Deactivated topology restarts if data flows into Kafka
> --
>
> Key: STORM-3013
> URL: https://issues.apache.org/jira/browse/STORM-3013
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.2.1
>Reporter: Ajeesh B
>Assignee: Stig Rohde Døssing
>Priority: Major
>
> Hi, I have deactivated the storm topology & then if I produce any records 
> into Kafka, Storm throws an exception. Exception follows,
> {code:java}
> 2018-03-28 09:50:23.804 o.a.s.d.executor Thread-83-kafkaLogs-executor[130 
> 130] [INFO] Deactivating spout kafkaLogs:(130)
> 2018-03-28 09:51:01.289 o.a.s.util Thread-17-kafkaLogs-executor[139 139] 
> [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.IllegalStateException: This consumer 
> has already been closed.
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
> [storm-core-1.2.1.jar:1.2.1]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45]
> Caused by: java.lang.IllegalStateException: This consumer has already been 
> closed.
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
>  ~[stormjar.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
> at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]
> at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]
> at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?]
> at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
>  ~[storm-core-1.2.1.jar:1.2.1]
> ... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-3013) Deactivated topology restarts if data flows into Kafka

2018-03-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418048#comment-16418048
 ] 

Stig Rohde Døssing commented on STORM-3013:
---

[~Ajeesh]

Whoops, the issue is that we didn't expect Storm to ask the spout for metrics 
while the spout is deactivated. When this happens, the spout tries to use the 
consumer to get e.g. how far behind the latest offset the consumer is, which 
throws this exception. 

I'll take a look at fixing this, unless you want to?

> Deactivated topology restarts if data flows into Kafka
> --
>
> Key: STORM-3013
> URL: https://issues.apache.org/jira/browse/STORM-3013
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.2.1
>Reporter: Ajeesh B
>Priority: Major
>
> Hi, I have deactivated the storm topology & then if I produce any records 
> into Kafka, Storm throws an exception. Exception follows,
> {code:java}
> 2018-03-28 09:50:23.804 o.a.s.d.executor Thread-83-kafkaLogs-executor[130 
> 130] [INFO] Deactivating spout kafkaLogs:(130)
> 2018-03-28 09:51:01.289 o.a.s.util Thread-17-kafkaLogs-executor[139 139] 
> [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.IllegalStateException: This consumer 
> has already been closed.
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
> [storm-core-1.2.1.jar:1.2.1]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45]
> Caused by: java.lang.IllegalStateException: This consumer has already been 
> closed.
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
>  ~[stormjar.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
> at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]
> at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]
> at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?]
> at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
>  ~[storm-core-1.2.1.jar:1.2.1]
> ... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)