[jira] [Comment Edited] (STORM-2915) How could I to get the fail Number in Bolt When I use Kafka Spout
[ https://issues.apache.org/jira/browse/STORM-2915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457132#comment-16457132 ] Stig Rohde Døssing edited comment on STORM-2915 at 4/27/18 9:57 PM: Could you explain your use case? The KafkaSpout as-is can't emit the failure count for the tuple, but it would be possible to add this feature by modifying the RecordTranslator interface. I'm just wondering why you need it? Edit: Regarding the failure reason, that isn't really possible right now. See this thread http://mail-archives.apache.org/mod_mbox/storm-user/201804.mbox/%3CCA%2BGk7M3jZKFphyONuB3DjDR0-pJmAmOnphgt%3DFBRpT7zy21O%2BA%40mail.gmail.com%3E was (Author: srdo): Could you explain your use case? The KafkaSpout as-is can't emit the failure count for the tuple, but it would be possible to add this feature by modifying the RecordTranslator interface. I'm just wondering why you need it? > How could I to get the fail Number in Bolt When I use Kafka Spout > > > Key: STORM-2915 > URL: https://issues.apache.org/jira/browse/STORM-2915 > Project: Apache Storm > Issue Type: New Feature > Components: storm-kafka-client >Affects Versions: 1.0.2, 1.1.0, 1.0.3, 1.0.4, 1.1.1, 1.0.5 >Reporter: Gergo Hong >Priority: Minor > > I want to get fail num in bolt , how could I to get it? > if fail it retry, I see This > if (!isScheduled || retryService.isReady(msgId)) { > final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) > tuple).getStream() : Utils.DEFAULT_STREAM_ID; > if (!isAtLeastOnceProcessing()) { > if (kafkaSpoutConfig.isTupleTrackingEnforced()) { > collector.emit(stream, tuple, msgId); > LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, > record, msgId); > } else { > collector.emit(stream, tuple); > LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); > } > } else { > emitted.add(msgId); > offsetManagers.get(tp).addToEmitMsgs(msgId.offset()); > if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from > schedule. > retryService.remove(msgId); > } > collector.emit(stream, tuple, msgId); > tupleListener.onEmit(tuple, msgId); > LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, > record, msgId); > } > return true; > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2915) How could I to get the fail Number in Bolt When I use Kafka Spout
[ https://issues.apache.org/jira/browse/STORM-2915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457132#comment-16457132 ] Stig Rohde Døssing commented on STORM-2915: --- Could you explain your use case? The KafkaSpout as-is can't emit the failure count for the tuple, but it would be possible to add this feature by modifying the RecordTranslator interface. I'm just wondering why you need it? > How could I to get the fail Number in Bolt When I use Kafka Spout > > > Key: STORM-2915 > URL: https://issues.apache.org/jira/browse/STORM-2915 > Project: Apache Storm > Issue Type: New Feature > Components: storm-kafka-client >Affects Versions: 1.0.2, 1.1.0, 1.0.3, 1.0.4, 1.1.1, 1.0.5 >Reporter: Gergo Hong >Priority: Minor > > I want to get fail num in bolt , how could I to get it? > if fail it retry, I see This > if (!isScheduled || retryService.isReady(msgId)) { > final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) > tuple).getStream() : Utils.DEFAULT_STREAM_ID; > if (!isAtLeastOnceProcessing()) { > if (kafkaSpoutConfig.isTupleTrackingEnforced()) { > collector.emit(stream, tuple, msgId); > LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, > record, msgId); > } else { > collector.emit(stream, tuple); > LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); > } > } else { > emitted.add(msgId); > offsetManagers.get(tp).addToEmitMsgs(msgId.offset()); > if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from > schedule. > retryService.remove(msgId); > } > collector.emit(stream, tuple, msgId); > tupleListener.onEmit(tuple, msgId); > LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, > record, msgId); > } > return true; > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2915) How could I to get the fail Number in Bolt When I use Kafka Spout
[ https://issues.apache.org/jira/browse/STORM-2915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457117#comment-16457117 ] Gergo Hong commented on STORM-2915: --- yes, you’re right I want to get the failture count for a specific tuple , it is useful, bye the way the fail reason is maybe useful too. > How could I to get the fail Number in Bolt When I use Kafka Spout > > > Key: STORM-2915 > URL: https://issues.apache.org/jira/browse/STORM-2915 > Project: Apache Storm > Issue Type: New Feature > Components: storm-kafka-client >Affects Versions: 1.0.2, 1.1.0, 1.0.3, 1.0.4, 1.1.1, 1.0.5 >Reporter: Gergo Hong >Priority: Minor > > I want to get fail num in bolt , how could I to get it? > if fail it retry, I see This > if (!isScheduled || retryService.isReady(msgId)) { > final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) > tuple).getStream() : Utils.DEFAULT_STREAM_ID; > if (!isAtLeastOnceProcessing()) { > if (kafkaSpoutConfig.isTupleTrackingEnforced()) { > collector.emit(stream, tuple, msgId); > LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, > record, msgId); > } else { > collector.emit(stream, tuple); > LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); > } > } else { > emitted.add(msgId); > offsetManagers.get(tp).addToEmitMsgs(msgId.offset()); > if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from > schedule. > retryService.remove(msgId); > } > collector.emit(stream, tuple, msgId); > tupleListener.onEmit(tuple, msgId); > LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, > record, msgId); > } > return true; > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2915) How could I to get the fail Number in Bolt When I use Kafka Spout
[ https://issues.apache.org/jira/browse/STORM-2915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457014#comment-16457014 ] Stig Rohde Døssing commented on STORM-2915: --- [~Gergo Hong] Sorry, this was missed. Could you try to elaborate what you're asking? Are you asking about getting the failure count for a specific tuple (e.g. "tuple 1 has failed 3 times") from a bolt, or does your bolt want to know the total failure count for the spout? > How could I to get the fail Number in Bolt When I use Kafka Spout > > > Key: STORM-2915 > URL: https://issues.apache.org/jira/browse/STORM-2915 > Project: Apache Storm > Issue Type: New Feature > Components: storm-kafka-client >Affects Versions: 1.0.2, 1.1.0, 1.0.3, 1.0.4, 1.1.1, 1.0.5 >Reporter: Gergo Hong >Priority: Minor > > I want to get fail num in bolt , how could I to get it? > if fail it retry, I see This > if (!isScheduled || retryService.isReady(msgId)) { > final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) > tuple).getStream() : Utils.DEFAULT_STREAM_ID; > if (!isAtLeastOnceProcessing()) { > if (kafkaSpoutConfig.isTupleTrackingEnforced()) { > collector.emit(stream, tuple, msgId); > LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, > record, msgId); > } else { > collector.emit(stream, tuple); > LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); > } > } else { > emitted.add(msgId); > offsetManagers.get(tp).addToEmitMsgs(msgId.offset()); > if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from > schedule. > retryService.remove(msgId); > } > collector.emit(stream, tuple, msgId); > tupleListener.onEmit(tuple, msgId); > LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, > record, msgId); > } > return true; > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (STORM-3013) Deactivated topology restarts if data flows into Kafka
[ https://issues.apache.org/jira/browse/STORM-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated STORM-3013: -- Labels: pull-request-available (was: ) > Deactivated topology restarts if data flows into Kafka > -- > > Key: STORM-3013 > URL: https://issues.apache.org/jira/browse/STORM-3013 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.2.1 >Reporter: Ajeesh B >Assignee: Stig Rohde Døssing >Priority: Major > Labels: pull-request-available > > Hi, I have deactivated the storm topology & then if I produce any records > into Kafka, Storm throws an exception. Exception follows, > {code:java} > 2018-03-28 09:50:23.804 o.a.s.d.executor Thread-83-kafkaLogs-executor[130 > 130] [INFO] Deactivating spout kafkaLogs:(130) > 2018-03-28 09:51:01.289 o.a.s.util Thread-17-kafkaLogs-executor[139 139] > [ERROR] Async loop died! > java.lang.RuntimeException: java.lang.IllegalStateException: This consumer > has already been closed. > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45] > Caused by: java.lang.IllegalStateException: This consumer has already been > closed. > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79) > ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345) > ~[storm-core-1.2.1.jar:1.2.1] > at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?] > at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?] > at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?] > at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) > ~[clojure-1.7.0.jar:?] > at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?] > at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?] > at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 7 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (STORM-3043) NullPointerException thrown in SimpleRecordTranslator.apply()
[ https://issues.apache.org/jira/browse/STORM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing updated STORM-3043: -- Affects Version/s: 2.0.0 1.1.2 1.0.6 1.2.1 > NullPointerException thrown in SimpleRecordTranslator.apply() > - > > Key: STORM-3043 > URL: https://issues.apache.org/jira/browse/STORM-3043 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.1.2, 1.0.6, 1.2.1 >Reporter: Cedric Le Roux >Assignee: Cedric Le Roux >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > Time Spent: 20m > Remaining Estimate: 0h > > When using a SimpleRecordTranslator with a user-defined translator Func, a > NullPointerException will be thrown if Func.apply() returns null. A null List > object is a valid return value from apply() if the ConsumerRecord is invalid. > SimpleRecordTranslator does not check for a null result before attempting to > call the addAll method of the List. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (STORM-3043) NullPointerException thrown in SimpleRecordTranslator.apply()
[ https://issues.apache.org/jira/browse/STORM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing reassigned STORM-3043: - Assignee: Stig Rohde Døssing > NullPointerException thrown in SimpleRecordTranslator.apply() > - > > Key: STORM-3043 > URL: https://issues.apache.org/jira/browse/STORM-3043 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Reporter: Cedric Le Roux >Assignee: Stig Rohde Døssing >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > Time Spent: 20m > Remaining Estimate: 0h > > When using a SimpleRecordTranslator with a user-defined translator Func, a > NullPointerException will be thrown if Func.apply() returns null. A null List > object is a valid return value from apply() if the ConsumerRecord is invalid. > SimpleRecordTranslator does not check for a null result before attempting to > call the addAll method of the List. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (STORM-3043) NullPointerException thrown in SimpleRecordTranslator.apply()
[ https://issues.apache.org/jira/browse/STORM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing reassigned STORM-3043: - Assignee: Cedric Le Roux (was: Stig Rohde Døssing) > NullPointerException thrown in SimpleRecordTranslator.apply() > - > > Key: STORM-3043 > URL: https://issues.apache.org/jira/browse/STORM-3043 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Reporter: Cedric Le Roux >Assignee: Cedric Le Roux >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > Time Spent: 20m > Remaining Estimate: 0h > > When using a SimpleRecordTranslator with a user-defined translator Func, a > NullPointerException will be thrown if Func.apply() returns null. A null List > object is a valid return value from apply() if the ConsumerRecord is invalid. > SimpleRecordTranslator does not check for a null result before attempting to > call the addAll method of the List. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (STORM-3043) NullPointerException thrown in SimpleRecordTranslator.apply()
[ https://issues.apache.org/jira/browse/STORM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing resolved STORM-3043. --- Resolution: Fixed Fix Version/s: 2.0.0 > NullPointerException thrown in SimpleRecordTranslator.apply() > - > > Key: STORM-3043 > URL: https://issues.apache.org/jira/browse/STORM-3043 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Reporter: Cedric Le Roux >Assignee: Cedric Le Roux >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > Time Spent: 20m > Remaining Estimate: 0h > > When using a SimpleRecordTranslator with a user-defined translator Func, a > NullPointerException will be thrown if Func.apply() returns null. A null List > object is a valid return value from apply() if the ConsumerRecord is invalid. > SimpleRecordTranslator does not check for a null result before attempting to > call the addAll method of the List. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2514) Incorrect logs for mapping between Kafka partitions and task IDs
[ https://issues.apache.org/jira/browse/STORM-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456964#comment-16456964 ] Stig Rohde Døssing commented on STORM-2514: --- [~srishtyagraw...@gmail.com] Did you have any luck with this? > Incorrect logs for mapping between Kafka partitions and task IDs > > > Key: STORM-2514 > URL: https://issues.apache.org/jira/browse/STORM-2514 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Reporter: Srishty Agrawal >Assignee: Hugo Louro >Priority: Major > Attachments: NewClass.java, worker.log > > > While working on > [STORM-2506|https://issues.apache.org/jira/browse/STORM-2506], the worker > logs were generated with debug mode on. The information printed about mapping > between Task IDs and kafka partitions was contradictory to my assumptions. I > ran a topology which used KafkaSpout from the storm-kafka-client module, it > had a parallelism hint of 2 (number of executors) and a total of 16 tasks. > The log lines mentioned below show assigned mapping between executors and > kafka partitions: > {noformat} > o.a.k.c.c.i.ConsumerCoordinator Thread-12-kafkaspout-executor[3 10] [INFO] > Setting newly assigned partitions [8topic-4, 8topic-6, 8topic-5, 8topic-7] > for group kafkaSpoutTestGroup > o.a.s.k.s.KafkaSpout Thread-12-kafkaspout-executor[3 10] [INFO] Partitions > reassignment. [taskID=10, consumer-group=kafkaSpoutTestGroup, > consumer=org.apache.kafka.clients.consumer.KafkaConsumer@108e79ce, > topic-partitions=[8topic-4, 8topic-6, 8topic-5, 8topic-7]] > o.a.k.c.c.i.ConsumerCoordinator Thread-8-kafkaspout-executor[11 18] [INFO] > Setting newly assigned partitions [8topic-2, 8topic-1, 8topic-3, 8topic-0] > for group kafkaSpoutTestGroup > o.a.s.k.s.KafkaSpout Thread-8-kafkaspout-executor[11 18] [INFO] Partitions > reassignment. [taskID=15, consumer-group=kafkaSpoutTestGroup, > consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2dc37126, > topic-partitions=[8topic-2, 8topic-1, 8topic-3, 8topic-0]] > {noformat} > It is evident that only tasks (with ID 3, 4, 5, 6, 7, 8, 9, 10) in Executor1 > (3 10) will be reading from kafka partitions 4, 5, 6 and 7. Similarly, tasks > in Executor2 (11 18) will be reading from kafka partitions 0, 1, 2 and 3. > These log lines are being printed by Tasks with IDs 10 and 15 in respective > executors. > Logs which emit individual messages do not abide by the above assumption. For > example in the log mentioned below, Task ID 3 (added code, as a part of > debugging STORM-2506, to print the Task ID right next to component ID) which > runs on Executor1 reads from partition 2 (the second value inside the square > brackets), instead of 4, 5, 6 or 7. > {noformat}Thread-12-kafkaspout-executor[3 10] [INFO] Emitting: kafkaspout 3 > default [8topic, 2, 0, null, 1]{noformat} > This behavior has been summarized in the table below : > {noformat} > Task IDs --- 3, 4, 7, 8, 9, 11, 15, 18 Partitions 0, 1, 2, 3 > Task IDs --- 5, 6, 10, 12, 13, 14, 16, 17 - Partition 4, 5, 6, 7 > {noformat} > [You can find the relevant parts of log file > here.|https://gist.github.com/srishtyagrawal/f7c53db6b8391e2c3bd522afc93b5351] > > Am I misunderstanding something here? Do tasks {{5, 6, 10, 12, 13, 14, 16, > 17}} correspond to executor1 and {{3, 4, 7, 8, 9, 11, 15, 18}} correspond to > executor2? Are (3 10) not the starting and ending task IDs in Executor1? > Another interesting thing to note is that, Task IDs 10 and 15 are always > reading from the partitions they claimed to be reading from (while setting > partition assignments). > If my assumptions are correct, there is a bug in the way the mapping > information is being/passed to worker logs. If not, we need to make changes > in our docs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2504) At the start of topology, the KafkaTridentSpoutOpaque will sometimes emit the first batch twice
[ https://issues.apache.org/jira/browse/STORM-2504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456961#comment-16456961 ] Stig Rohde Døssing commented on STORM-2504: --- Sorry, this was missed. If this is still a problem in current versions, please open a PR on Github https://github.com/apache/storm/pulls. > At the start of topology, the KafkaTridentSpoutOpaque will sometimes emit the > first batch twice > > > Key: STORM-2504 > URL: https://issues.apache.org/jira/browse/STORM-2504 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 1.1.0 >Reporter: Jing Weng >Priority: Major > Attachments: storm-kafka-bug.zip > > > The unit test in the attachment can reproduce the problem, and there is my > simple fix. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2554) Trident Kafka Spout Refactoring to Include Manual Partition Assignment
[ https://issues.apache.org/jira/browse/STORM-2554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456956#comment-16456956 ] Stig Rohde Døssing commented on STORM-2554: --- [~hmclouro] Is this issue still valid, or can we close it? > Trident Kafka Spout Refactoring to Include Manual Partition Assignment > -- > > Key: STORM-2554 > URL: https://issues.apache.org/jira/browse/STORM-2554 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.x >Reporter: Hugo Louro >Assignee: Hugo Louro >Priority: Critical > Time Spent: 1h 40m > Remaining Estimate: 0h > > Incorporate changes done in STORM-2541 and do some refactoring to internal > state partition management to make it cleaner and more properly handle > partitions reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (STORM-2711) use KafkaTridentSpoutOpaque poll msg slowly.
[ https://issues.apache.org/jira/browse/STORM-2711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing closed STORM-2711. - Resolution: Unresolved Please reopen if you still have a question > use KafkaTridentSpoutOpaque poll msg slowly. > > > Key: STORM-2711 > URL: https://issues.apache.org/jira/browse/STORM-2711 > Project: Apache Storm > Issue Type: Question > Components: examples, storm-kafka-client >Affects Versions: 1.1.0 > Environment: mac,IDEA, LocalCluster >Reporter: shezler >Priority: Major > Labels: newbie > Original Estimate: 1h > Remaining Estimate: 1h > > At first I run producer examples to make msgs in kafka, which topic is 5 > partition 1 replication, then the number of total message was about 4000, per > partition almost 800. Then I {color:red}run the part of consumer example in > TridentKafkaClientWordCountNamedTopics in storm-kafka-client-examples{color}, > First pull messages at a certain speed, when each partition to more than 500, > significantly {color:red}slower speed {color}. I wonder why -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2711) use KafkaTridentSpoutOpaque poll msg slowly.
[ https://issues.apache.org/jira/browse/STORM-2711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456952#comment-16456952 ] Stig Rohde Døssing commented on STORM-2711: --- Sorry, seems we missed this. Please let me know if there's still a question here. The spout implementation has changed a lot since 1.1.0. > use KafkaTridentSpoutOpaque poll msg slowly. > > > Key: STORM-2711 > URL: https://issues.apache.org/jira/browse/STORM-2711 > Project: Apache Storm > Issue Type: Question > Components: examples, storm-kafka-client >Affects Versions: 1.1.0 > Environment: mac,IDEA, LocalCluster >Reporter: shezler >Priority: Major > Labels: newbie > Original Estimate: 1h > Remaining Estimate: 1h > > At first I run producer examples to make msgs in kafka, which topic is 5 > partition 1 replication, then the number of total message was about 4000, per > partition almost 800. Then I {color:red}run the part of consumer example in > TridentKafkaClientWordCountNamedTopics in storm-kafka-client-examples{color}, > First pull messages at a certain speed, when each partition to more than 500, > significantly {color:red}slower speed {color}. I wonder why -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (STORM-2880) Minor optimisation about kafka spout
[ https://issues.apache.org/jira/browse/STORM-2880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing reopened STORM-2880: --- > Minor optimisation about kafka spout > > > Key: STORM-2880 > URL: https://issues.apache.org/jira/browse/STORM-2880 > Project: Apache Storm > Issue Type: Improvement > Components: storm-kafka-client >Affects Versions: 2.0.0 >Reporter: OuYang Liang >Assignee: OuYang Liang >Priority: Minor > Labels: pull-request-available > Attachments: wx20180103-165...@2x.png > > Original Estimate: 24h > Time Spent: 1h 10m > Remaining Estimate: 22h 50m > > Based on the single responsibility principle, method > isAtLeastOnceProcessing() should reside in KafkaSpoutConfig rather than > KafkaSpout. This patch removes the dependency of > KafkaSpoutConfig.ProcessingGuarantee from KafkaSpout. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (STORM-2880) Minor optimisation about kafka spout
[ https://issues.apache.org/jira/browse/STORM-2880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing closed STORM-2880. - Resolution: Not A Problem > Minor optimisation about kafka spout > > > Key: STORM-2880 > URL: https://issues.apache.org/jira/browse/STORM-2880 > Project: Apache Storm > Issue Type: Improvement > Components: storm-kafka-client >Affects Versions: 2.0.0 >Reporter: OuYang Liang >Assignee: OuYang Liang >Priority: Minor > Labels: pull-request-available > Attachments: wx20180103-165...@2x.png > > Original Estimate: 24h > Time Spent: 1h 10m > Remaining Estimate: 22h 50m > > Based on the single responsibility principle, method > isAtLeastOnceProcessing() should reside in KafkaSpoutConfig rather than > KafkaSpout. This patch removes the dependency of > KafkaSpoutConfig.ProcessingGuarantee from KafkaSpout. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (STORM-2865) KafkaSpout constructor with KafkaConsumerFactory shoud be public
[ https://issues.apache.org/jira/browse/STORM-2865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing closed STORM-2865. - Resolution: Not A Problem Fix Version/s: (was: 1.x) (was: 2.0.0) > KafkaSpout constructor with KafkaConsumerFactory shoud be public > > > Key: STORM-2865 > URL: https://issues.apache.org/jira/browse/STORM-2865 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.x > Environment: kafka version:0.11 > storm-kafka-client version: 1.1.* >Reporter: YulongZ >Priority: Minor > > When we use custom implement of interface "Deserializer",KafkaSpout > constructor can only use class "KafkaConsumerFactoryDefault",the method > “configure” of Interface “Deserializer” will not be called。 > We need change the "KafkaSpout" constructor with "KafkaConsumerFactory" to be > public, so we can create custom custom implement of interface > "KafkaConsumerFactory". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (STORM-2880) Minor optimisation about kafka spout
[ https://issues.apache.org/jira/browse/STORM-2880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing closed STORM-2880. - Resolution: Won't Fix > Minor optimisation about kafka spout > > > Key: STORM-2880 > URL: https://issues.apache.org/jira/browse/STORM-2880 > Project: Apache Storm > Issue Type: Improvement > Components: storm-kafka-client >Affects Versions: 2.0.0 >Reporter: OuYang Liang >Assignee: OuYang Liang >Priority: Minor > Labels: pull-request-available > Fix For: 2.0.0 > > Attachments: wx20180103-165...@2x.png > > Original Estimate: 24h > Time Spent: 1h 10m > Remaining Estimate: 22h 50m > > Based on the single responsibility principle, method > isAtLeastOnceProcessing() should reside in KafkaSpoutConfig rather than > KafkaSpout. This patch removes the dependency of > KafkaSpoutConfig.ProcessingGuarantee from KafkaSpout. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-3013) Deactivated topology restarts if data flows into Kafka
[ https://issues.apache.org/jira/browse/STORM-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456947#comment-16456947 ] Stig Rohde Døssing commented on STORM-3013: --- [~Ajeesh] Kindly reminder, did the fix resolve your problem? > Deactivated topology restarts if data flows into Kafka > -- > > Key: STORM-3013 > URL: https://issues.apache.org/jira/browse/STORM-3013 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.2.1 >Reporter: Ajeesh B >Assignee: Stig Rohde Døssing >Priority: Major > > Hi, I have deactivated the storm topology & then if I produce any records > into Kafka, Storm throws an exception. Exception follows, > {code:java} > 2018-03-28 09:50:23.804 o.a.s.d.executor Thread-83-kafkaLogs-executor[130 > 130] [INFO] Deactivating spout kafkaLogs:(130) > 2018-03-28 09:51:01.289 o.a.s.util Thread-17-kafkaLogs-executor[139 139] > [ERROR] Async loop died! > java.lang.RuntimeException: java.lang.IllegalStateException: This consumer > has already been closed. > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45] > Caused by: java.lang.IllegalStateException: This consumer has already been > closed. > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79) > ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345) > ~[storm-core-1.2.1.jar:1.2.1] > at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?] > at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?] > at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?] > at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) > ~[clojure-1.7.0.jar:?] > at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?] > at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?] > at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 7 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (STORM-2974) Add a transactional non-opaque spout to storm-kafka-client
[ https://issues.apache.org/jira/browse/STORM-2974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing updated STORM-2974: -- Component/s: storm-kafka-client > Add a transactional non-opaque spout to storm-kafka-client > -- > > Key: STORM-2974 > URL: https://issues.apache.org/jira/browse/STORM-2974 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-kafka-client >Affects Versions: 2.0.0 >Reporter: Stig Rohde Døssing >Assignee: Stig Rohde Døssing >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing updated STORM-3046: -- Component/s: (was: rm-kafka-client) storm-kafka-client > Getting a NPE leading worker to die when starting a topology. > - > > Key: STORM-3046 > URL: https://issues.apache.org/jira/browse/STORM-3046 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client, trident >Affects Versions: 1.2.1 >Reporter: Kush Khandelwal >Priority: Blocker > Labels: kafka, storm-kafka-client, trident > Attachments: TestTopology.java > > > I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients > version 1.1.0. > We have an external kafka from where we get the messages. > Whenever I try to run the topology, I get a NPE, which leads to the worker > getting died. > If I set poll strategy to earliest and the topic already contains some > messages, it works fine. > I have used a custom record translator which is working fine. > Can someone please help me fix the issue? > Thanks. > > Error - > 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR > o.a.s.util - Async loop died! > java.lang.RuntimeException: java.lang.NullPointerException > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] > Caused by: java.lang.NullPointerException > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 6 more > > > Topology class - > > > > > import org.apache.storm.Config; > import org.apache.storm.LocalCluster; > import org.apache.storm.StormSubmitter; > import org.apache.storm.generated.*; > import org.apache.storm.kafka.spout.KafkaSpoutConfig; > import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; > import org.apache.storm.trident.Stream; > import org.apache.storm.trident.TridentState; > import org.apache.storm.trident.TridentTopology; > import org.apache.storm.tuple.Fields; > import java.util.Properties; > > public class TestTopology { > > private static StormTopology buildTopology(Properties stormProperties) { > > Properties kafkaProperties = getProperties("/kafka.properties"); > TridentTopology topology = new TridentTopology(); > Fields stageArguments = new Fields("test", "issue"); > KafkaSpoutConfigkafkaSpoutConfig = > KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), > "test") > .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) > .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) > .setRecordTranslator(new RecordTranslator(), stageArguments) > .build(); > KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new >
[jira] [Commented] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456942#comment-16456942 ] Stig Rohde Døssing commented on STORM-3046: --- This is caused by a bad assumption I made here https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L161. I assumed that the last batch metadata would only be null for the first batch, or reemits of the first batch. I forgot to account for cases where the first batch contains no tuples, because in that case we'll set the metadata for the first batch to null (see https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L125). In the following call to emitBatch, we're going to hit this line with a null lastBatchMeta https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L181. This is very likely going to happen if you use e.g. LATEST as the FirstPollOffsetStrategy, and as you observe it doesn't happen if you start at the beginning of a partition and there are messages to emit. Let me know if you'd like to try to fix it. If not I'll be happy to give it a shot, and would appreciate if you would try out the potential fix. > Getting a NPE leading worker to die when starting a topology. > - > > Key: STORM-3046 > URL: https://issues.apache.org/jira/browse/STORM-3046 > Project: Apache Storm > Issue Type: Bug > Components: rm-kafka-client, trident >Affects Versions: 1.2.1 >Reporter: Kush Khandelwal >Priority: Blocker > Labels: kafka, storm-kafka-client, trident > Attachments: TestTopology.java > > > I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients > version 1.1.0. > We have an external kafka from where we get the messages. > Whenever I try to run the topology, I get a NPE, which leads to the worker > getting died. > If I set poll strategy to earliest and the topic already contains some > messages, it works fine. > I have used a custom record translator which is working fine. > Can someone please help me fix the issue? > Thanks. > > Error - > 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR > o.a.s.util - Async loop died! > java.lang.RuntimeException: java.lang.NullPointerException > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] > Caused by: java.lang.NullPointerException > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 6 more > > > Topology class - > > > > > import org.apache.storm.Config; >
[jira] [Assigned] (STORM-3045) Microsoft Azure EventHubs: Storm Spout and Bolt improvements
[ https://issues.apache.org/jira/browse/STORM-3045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing reassigned STORM-3045: - Assignee: Sreeram Garlapati > Microsoft Azure EventHubs: Storm Spout and Bolt improvements > > > Key: STORM-3045 > URL: https://issues.apache.org/jira/browse/STORM-3045 > Project: Apache Storm > Issue Type: Improvement > Components: storm-eventhubs >Affects Versions: 2.0.0 >Reporter: Sreeram Garlapati >Assignee: Sreeram Garlapati >Priority: Minor > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (STORM-3044) AutoTGT should ideally check if a TGT is specific to IP addresses and reject
[ https://issues.apache.org/jira/browse/STORM-3044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated STORM-3044: -- Labels: pull-request-available (was: ) > AutoTGT should ideally check if a TGT is specific to IP addresses and reject > > > Key: STORM-3044 > URL: https://issues.apache.org/jira/browse/STORM-3044 > Project: Apache Storm > Issue Type: Improvement >Reporter: Ethan Li >Assignee: Ethan Li >Priority: Major > Labels: pull-request-available > > There are several options that a TGT can have, one of them is being > forwardable, another is having one or more IP addresses in it that make it so > it cannot be used anywhere else. If the ticket is forwardable, but is tied to > IP addresses it will not likely work for storm so we should reject it. > It looks like we can call getClientAddresses() on the ticket and if it > returns something then we should reject it. We should also include > instructions about how to get a proper ticekt in the error message. > `kinit -A -f` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kush Khandelwal updated STORM-3046: --- Description: I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients version 1.1.0. We have an external kafka from where we get the messages. Whenever I try to run the topology, I get a NPE, which leads to the worker getting died. If I set poll strategy to earliest and the topic already contains some messages, it works fine. I have used a custom record translator which is working fine. Can someone please help me fix the issue? Thanks. Error - 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR o.a.s.util - Async loop died! java.lang.RuntimeException: java.lang.NullPointerException at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] Caused by: java.lang.NullPointerException at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1] ... 6 more Topology class - import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.*; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.tuple.Fields; import java.util.Properties; public class TestTopology { private static StormTopology buildTopology(Properties stormProperties) { Properties kafkaProperties = getProperties("/kafka.properties"); TridentTopology topology = new TridentTopology(); Fields stageArguments = new Fields("test", "issue"); KafkaSpoutConfigkafkaSpoutConfig = KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), "test") .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .setRecordTranslator(new RecordTranslator(), stageArguments) .build(); KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new KafkaTridentSpoutOpaque(kafkaSpoutConfig); Grouping partitionGroup = getPartitionGroup("test"); log.info("Creating Opaque-Trident-Kafka-Spout"); final Stream kafkaSpout = topology.newStream(stormProperties.getProperty("SPOUT_NAME"), kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1); TridentState testUpdate = kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), stageArguments).parallelismHint(1); Stream viewUpdate = ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments, new UpdateView(), new Fields()).parallelismHint(2); return topology.build(); } public static void main(String[] args) { Config conf = new Config();
[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kush Khandelwal updated STORM-3046: --- Description: I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients version 1.1.0. We have an external kafka from where we get the messages. Whenever I try to run the topology, I get a NPE, which leads to the worker getting died. Can someone please help me fix the issue? Thanks. Error - 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR o.a.s.util - Async loop died! java.lang.RuntimeException: java.lang.NullPointerException at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] Caused by: java.lang.NullPointerException at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1] ... 6 more Topology class - import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.*; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.tuple.Fields; import java.util.Properties; public class TestTopology { private static StormTopology buildTopology(Properties stormProperties) { Properties kafkaProperties = getProperties("/kafka.properties"); TridentTopology topology = new TridentTopology(); Fields stageArguments = new Fields("test", "issue"); KafkaSpoutConfigkafkaSpoutConfig = KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), "test") .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .setRecordTranslator(new RecordTranslator(), stageArguments) .build(); // If I set poll strategy to earliest and the topic already contains some messages, it works fine. // I have used a custom record translator which is working fine. KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new KafkaTridentSpoutOpaque(kafkaSpoutConfig); Grouping partitionGroup = getPartitionGroup("test"); log.info("Creating Opaque-Trident-Kafka-Spout"); final Stream kafkaSpout = topology.newStream(stormProperties.getProperty("SPOUT_NAME"), kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1); TridentState testUpdate = kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), stageArguments).parallelismHint(1); Stream viewUpdate = ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments, new UpdateView(), new Fields()).parallelismHint(2); return topology.build(); } public static void main(String[] args) { Config conf = new
[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kush Khandelwal updated STORM-3046: --- Description: I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients version 1.1.0. We have an external kafka from where we get the messages. Whenever I try to run the topology, I get a NPE, which leads to the worker getting died. Can someone please help me fix the issue? Thanks. Error - 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR o.a.s.util - Async loop died! java.lang.RuntimeException: java.lang.NullPointerException at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] Caused by: java.lang.NullPointerException at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1] ... 6 more Topology class - import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.*; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.tuple.Fields; import java.util.Properties; public class TestTopology { private static StormTopology buildTopology(Properties stormProperties) { Properties kafkaProperties = getProperties("/kafka.properties"); TridentTopology topology = new TridentTopology(); Fields stageArguments = new Fields("test", "issue"); KafkaSpoutConfigkafkaSpoutConfig = KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), "test") .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .setRecordTranslator(new RecordTranslator(), stageArguments) .build(); // If I set poll strategy to earliest and the topic already contains some messages, it works fine. // I have used a custom record translator which is working fine. KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new KafkaTridentSpoutOpaque(kafkaSpoutConfig); Grouping partitionGroup = getPartitionGroup("test"); log.info("Creating Opaque-Trident-Kafka-Spout"); final Stream kafkaSpout = topology.newStream(stormProperties.getProperty("SPOUT_NAME"), kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1); TridentState testUpdate = kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), stageArguments).parallelismHint(1); Stream viewUpdate = ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments, new UpdateView(), new Fields()).parallelismHint(2); return topology.build(); } public static void main(String[] args) { Config conf = new Config();
[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kush Khandelwal updated STORM-3046: --- Description: I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients version 1.1.0. We have an external kafka from where we get the messages. Whenever I try to run the topology, I get a NPE, which leads to the worker getting died. Can someone please help me fix the issue? Thanks. Error - 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR o.a.s.util - Async loop died! java.lang.RuntimeException: java.lang.NullPointerException at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] Caused by: java.lang.NullPointerException at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1] ... 6 more Topology class - import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.*; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.tuple.Fields; import java.util.Properties; public class TestTopology { private static StormTopology buildTopology(Properties stormProperties) { Properties kafkaProperties = getProperties("/kafka.properties"); TridentTopology topology = new TridentTopology(); Fields stageArguments = new Fields("test", "issue"); KafkaSpoutConfigkafkaSpoutConfig = KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), "test") .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .setRecordTranslator(new RecordTranslator(), stageArguments) .build(); // If I set poll strategy to earliest and the topic already contains some messages, it works fine. // I have used a custom record translator which is working fine. KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new KafkaTridentSpoutOpaque(kafkaSpoutConfig); Grouping partitionGroup = getPartitionGroup("test"); log.info("Creating Opaque-Trident-Kafka-Spout"); final Stream kafkaSpout = topology.newStream(stormProperties.getProperty("SPOUT_NAME"), kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1); TridentState testUpdate = kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), stageArguments).parallelismHint(1); Stream viewUpdate = ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments, new UpdateView(), new Fields()).parallelismHint(2); return topology.build(); } public static void main(String[] args) { Config conf = new Config();
[jira] [Created] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
Kush Khandelwal created STORM-3046: -- Summary: Getting a NPE leading worker to die when starting a topology. Key: STORM-3046 URL: https://issues.apache.org/jira/browse/STORM-3046 Project: Apache Storm Issue Type: Bug Components: rm-kafka-client, trident Affects Versions: 1.2.1 Reporter: Kush Khandelwal Attachments: TestTopology.java I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients version 1.1.0. We have an external kafka from where we get the messages. Whenever I try to run the topology, I get a NPE, which leads to the worker getting died. Can someone please help me fix the issue? Thanks. Error - 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR o.a.s.util - Async loop died! java.lang.RuntimeException: java.lang.NullPointerException at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] Caused by: java.lang.NullPointerException at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1] ... 6 more Topology class - import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.*; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.tuple.Fields; import java.util.Properties; public class TestTopology { private static StormTopology buildTopology(Properties stormProperties) { Properties kafkaProperties = getProperties("/kafka.properties"); TridentTopology topology = new TridentTopology(); Fields stageArguments = new Fields("test", "issue"); KafkaSpoutConfigkafkaSpoutConfig = KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), "test") .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .setRecordTranslator(new RecordTranslator(), stageArguments) .build(); // If I set poll strategy to earliest and the topic already contains some messages, it works fine. // I have used a custom record translator which is working fine. KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new KafkaTridentSpoutOpaque(kafkaSpoutConfig); Grouping partitionGroup = getPartitionGroup("test"); log.info("Creating Opaque-Trident-Kafka-Spout"); final Stream kafkaSpout = topology.newStream(stormProperties.getProperty("SPOUT_NAME"), kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1); TridentState testUpdate = kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), stageArguments).parallelismHint(1); Stream viewUpdate =