[jira] [Comment Edited] (STORM-2915) How could I to get the fail Number in Bolt When I use Kafka Spout

2018-04-27 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457132#comment-16457132
 ] 

Stig Rohde Døssing edited comment on STORM-2915 at 4/27/18 9:57 PM:


Could you explain your use case? The KafkaSpout as-is can't emit the failure 
count for the tuple, but it would be possible to add this feature by modifying 
the RecordTranslator interface. I'm just wondering why you need it?

Edit: Regarding the failure reason, that isn't really possible right now. See 
this thread 
http://mail-archives.apache.org/mod_mbox/storm-user/201804.mbox/%3CCA%2BGk7M3jZKFphyONuB3DjDR0-pJmAmOnphgt%3DFBRpT7zy21O%2BA%40mail.gmail.com%3E


was (Author: srdo):
Could you explain your use case? The KafkaSpout as-is can't emit the failure 
count for the tuple, but it would be possible to add this feature by modifying 
the RecordTranslator interface. I'm just wondering why you need it?

> How could I to get the fail Number   in Bolt When I use  Kafka Spout
> 
>
> Key: STORM-2915
> URL: https://issues.apache.org/jira/browse/STORM-2915
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-kafka-client
>Affects Versions: 1.0.2, 1.1.0, 1.0.3, 1.0.4, 1.1.1, 1.0.5
>Reporter: Gergo Hong
>Priority: Minor
>
> I want to get fail num in bolt , how could  I  to get it? 
> if  fail it  retry, I see This 
> if (!isScheduled || retryService.isReady(msgId)) {
>  final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) 
> tuple).getStream() : Utils.DEFAULT_STREAM_ID;
>  if (!isAtLeastOnceProcessing()) {
>  if (kafkaSpoutConfig.isTupleTrackingEnforced()) {
>  collector.emit(stream, tuple, msgId);
>  LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, 
> record, msgId);
>  } else {
>  collector.emit(stream, tuple);
>  LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
>  }
>  } else {
>  emitted.add(msgId);
>  offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
>  if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from 
> schedule.
>  retryService.remove(msgId);
>  }
>  collector.emit(stream, tuple, msgId);
>  tupleListener.onEmit(tuple, msgId);
>  LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, 
> record, msgId);
>  }
>  return true;
> }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2915) How could I to get the fail Number in Bolt When I use Kafka Spout

2018-04-27 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457132#comment-16457132
 ] 

Stig Rohde Døssing commented on STORM-2915:
---

Could you explain your use case? The KafkaSpout as-is can't emit the failure 
count for the tuple, but it would be possible to add this feature by modifying 
the RecordTranslator interface. I'm just wondering why you need it?

> How could I to get the fail Number   in Bolt When I use  Kafka Spout
> 
>
> Key: STORM-2915
> URL: https://issues.apache.org/jira/browse/STORM-2915
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-kafka-client
>Affects Versions: 1.0.2, 1.1.0, 1.0.3, 1.0.4, 1.1.1, 1.0.5
>Reporter: Gergo Hong
>Priority: Minor
>
> I want to get fail num in bolt , how could  I  to get it? 
> if  fail it  retry, I see This 
> if (!isScheduled || retryService.isReady(msgId)) {
>  final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) 
> tuple).getStream() : Utils.DEFAULT_STREAM_ID;
>  if (!isAtLeastOnceProcessing()) {
>  if (kafkaSpoutConfig.isTupleTrackingEnforced()) {
>  collector.emit(stream, tuple, msgId);
>  LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, 
> record, msgId);
>  } else {
>  collector.emit(stream, tuple);
>  LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
>  }
>  } else {
>  emitted.add(msgId);
>  offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
>  if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from 
> schedule.
>  retryService.remove(msgId);
>  }
>  collector.emit(stream, tuple, msgId);
>  tupleListener.onEmit(tuple, msgId);
>  LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, 
> record, msgId);
>  }
>  return true;
> }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2915) How could I to get the fail Number in Bolt When I use Kafka Spout

2018-04-27 Thread Gergo Hong (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457117#comment-16457117
 ] 

Gergo Hong commented on STORM-2915:
---

yes, you’re right I want to get the failture count for a specific tuple , it is 
useful, bye the way the fail reason is maybe useful too.

> How could I to get the fail Number   in Bolt When I use  Kafka Spout
> 
>
> Key: STORM-2915
> URL: https://issues.apache.org/jira/browse/STORM-2915
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-kafka-client
>Affects Versions: 1.0.2, 1.1.0, 1.0.3, 1.0.4, 1.1.1, 1.0.5
>Reporter: Gergo Hong
>Priority: Minor
>
> I want to get fail num in bolt , how could  I  to get it? 
> if  fail it  retry, I see This 
> if (!isScheduled || retryService.isReady(msgId)) {
>  final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) 
> tuple).getStream() : Utils.DEFAULT_STREAM_ID;
>  if (!isAtLeastOnceProcessing()) {
>  if (kafkaSpoutConfig.isTupleTrackingEnforced()) {
>  collector.emit(stream, tuple, msgId);
>  LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, 
> record, msgId);
>  } else {
>  collector.emit(stream, tuple);
>  LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
>  }
>  } else {
>  emitted.add(msgId);
>  offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
>  if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from 
> schedule.
>  retryService.remove(msgId);
>  }
>  collector.emit(stream, tuple, msgId);
>  tupleListener.onEmit(tuple, msgId);
>  LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, 
> record, msgId);
>  }
>  return true;
> }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2915) How could I to get the fail Number in Bolt When I use Kafka Spout

2018-04-27 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457014#comment-16457014
 ] 

Stig Rohde Døssing commented on STORM-2915:
---

[~Gergo Hong] Sorry, this was missed. Could you try to elaborate what you're 
asking? Are you asking about getting the failure count for a specific tuple 
(e.g. "tuple 1 has failed 3 times") from a bolt, or does your bolt want to know 
the total failure count for the spout?

> How could I to get the fail Number   in Bolt When I use  Kafka Spout
> 
>
> Key: STORM-2915
> URL: https://issues.apache.org/jira/browse/STORM-2915
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-kafka-client
>Affects Versions: 1.0.2, 1.1.0, 1.0.3, 1.0.4, 1.1.1, 1.0.5
>Reporter: Gergo Hong
>Priority: Minor
>
> I want to get fail num in bolt , how could  I  to get it? 
> if  fail it  retry, I see This 
> if (!isScheduled || retryService.isReady(msgId)) {
>  final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) 
> tuple).getStream() : Utils.DEFAULT_STREAM_ID;
>  if (!isAtLeastOnceProcessing()) {
>  if (kafkaSpoutConfig.isTupleTrackingEnforced()) {
>  collector.emit(stream, tuple, msgId);
>  LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, 
> record, msgId);
>  } else {
>  collector.emit(stream, tuple);
>  LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
>  }
>  } else {
>  emitted.add(msgId);
>  offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
>  if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from 
> schedule.
>  retryService.remove(msgId);
>  }
>  collector.emit(stream, tuple, msgId);
>  tupleListener.onEmit(tuple, msgId);
>  LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, 
> record, msgId);
>  }
>  return true;
> }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-3013) Deactivated topology restarts if data flows into Kafka

2018-04-27 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated STORM-3013:
--
Labels: pull-request-available  (was: )

> Deactivated topology restarts if data flows into Kafka
> --
>
> Key: STORM-3013
> URL: https://issues.apache.org/jira/browse/STORM-3013
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.2.1
>Reporter: Ajeesh B
>Assignee: Stig Rohde Døssing
>Priority: Major
>  Labels: pull-request-available
>
> Hi, I have deactivated the storm topology & then if I produce any records 
> into Kafka, Storm throws an exception. Exception follows,
> {code:java}
> 2018-03-28 09:50:23.804 o.a.s.d.executor Thread-83-kafkaLogs-executor[130 
> 130] [INFO] Deactivating spout kafkaLogs:(130)
> 2018-03-28 09:51:01.289 o.a.s.util Thread-17-kafkaLogs-executor[139 139] 
> [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.IllegalStateException: This consumer 
> has already been closed.
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
> [storm-core-1.2.1.jar:1.2.1]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45]
> Caused by: java.lang.IllegalStateException: This consumer has already been 
> closed.
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
>  ~[stormjar.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
> at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]
> at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]
> at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?]
> at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
>  ~[storm-core-1.2.1.jar:1.2.1]
> ... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-3043) NullPointerException thrown in SimpleRecordTranslator.apply()

2018-04-27 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing updated STORM-3043:
--
Affects Version/s: 2.0.0
   1.1.2
   1.0.6
   1.2.1

> NullPointerException thrown in SimpleRecordTranslator.apply()
> -
>
> Key: STORM-3043
> URL: https://issues.apache.org/jira/browse/STORM-3043
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.1.2, 1.0.6, 1.2.1
>Reporter: Cedric Le Roux
>Assignee: Cedric Le Roux
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When using a SimpleRecordTranslator with a user-defined translator Func, a 
> NullPointerException will be thrown if Func.apply() returns null. A null List 
> object is a valid return value from apply() if the ConsumerRecord is invalid.
> SimpleRecordTranslator does not check for a null result before attempting to 
> call the addAll method of the List.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (STORM-3043) NullPointerException thrown in SimpleRecordTranslator.apply()

2018-04-27 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing reassigned STORM-3043:
-

Assignee: Stig Rohde Døssing

> NullPointerException thrown in SimpleRecordTranslator.apply()
> -
>
> Key: STORM-3043
> URL: https://issues.apache.org/jira/browse/STORM-3043
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Reporter: Cedric Le Roux
>Assignee: Stig Rohde Døssing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When using a SimpleRecordTranslator with a user-defined translator Func, a 
> NullPointerException will be thrown if Func.apply() returns null. A null List 
> object is a valid return value from apply() if the ConsumerRecord is invalid.
> SimpleRecordTranslator does not check for a null result before attempting to 
> call the addAll method of the List.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (STORM-3043) NullPointerException thrown in SimpleRecordTranslator.apply()

2018-04-27 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing reassigned STORM-3043:
-

Assignee: Cedric Le Roux  (was: Stig Rohde Døssing)

> NullPointerException thrown in SimpleRecordTranslator.apply()
> -
>
> Key: STORM-3043
> URL: https://issues.apache.org/jira/browse/STORM-3043
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Reporter: Cedric Le Roux
>Assignee: Cedric Le Roux
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When using a SimpleRecordTranslator with a user-defined translator Func, a 
> NullPointerException will be thrown if Func.apply() returns null. A null List 
> object is a valid return value from apply() if the ConsumerRecord is invalid.
> SimpleRecordTranslator does not check for a null result before attempting to 
> call the addAll method of the List.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (STORM-3043) NullPointerException thrown in SimpleRecordTranslator.apply()

2018-04-27 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-3043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing resolved STORM-3043.
---
   Resolution: Fixed
Fix Version/s: 2.0.0

> NullPointerException thrown in SimpleRecordTranslator.apply()
> -
>
> Key: STORM-3043
> URL: https://issues.apache.org/jira/browse/STORM-3043
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Reporter: Cedric Le Roux
>Assignee: Cedric Le Roux
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When using a SimpleRecordTranslator with a user-defined translator Func, a 
> NullPointerException will be thrown if Func.apply() returns null. A null List 
> object is a valid return value from apply() if the ConsumerRecord is invalid.
> SimpleRecordTranslator does not check for a null result before attempting to 
> call the addAll method of the List.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2514) Incorrect logs for mapping between Kafka partitions and task IDs

2018-04-27 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456964#comment-16456964
 ] 

Stig Rohde Døssing commented on STORM-2514:
---

[~srishtyagraw...@gmail.com] Did you have any luck with this?

> Incorrect logs for mapping between Kafka partitions and task IDs
> 
>
> Key: STORM-2514
> URL: https://issues.apache.org/jira/browse/STORM-2514
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Reporter: Srishty Agrawal
>Assignee: Hugo Louro
>Priority: Major
> Attachments: NewClass.java, worker.log
>
>
> While working on 
> [STORM-2506|https://issues.apache.org/jira/browse/STORM-2506], the worker 
> logs were generated with debug mode on. The information printed about mapping 
> between Task IDs and kafka partitions was contradictory to my assumptions. I 
> ran a topology which used KafkaSpout from the storm-kafka-client module, it 
> had a parallelism hint of 2 (number of executors) and a total of 16 tasks. 
> The log lines mentioned below show assigned mapping between executors and 
> kafka partitions:
> {noformat}
> o.a.k.c.c.i.ConsumerCoordinator Thread-12-kafkaspout-executor[3 10] [INFO] 
> Setting newly assigned partitions [8topic-4, 8topic-6, 8topic-5, 8topic-7] 
> for group kafkaSpoutTestGroup
> o.a.s.k.s.KafkaSpout Thread-12-kafkaspout-executor[3 10] [INFO] Partitions 
> reassignment. [taskID=10, consumer-group=kafkaSpoutTestGroup, 
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@108e79ce, 
> topic-partitions=[8topic-4, 8topic-6, 8topic-5, 8topic-7]]
> o.a.k.c.c.i.ConsumerCoordinator Thread-8-kafkaspout-executor[11 18] [INFO] 
> Setting newly assigned partitions [8topic-2, 8topic-1, 8topic-3, 8topic-0] 
> for group kafkaSpoutTestGroup
> o.a.s.k.s.KafkaSpout Thread-8-kafkaspout-executor[11 18] [INFO] Partitions 
> reassignment. [taskID=15, consumer-group=kafkaSpoutTestGroup, 
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2dc37126, 
> topic-partitions=[8topic-2, 8topic-1, 8topic-3, 8topic-0]]
> {noformat}
> It is evident that only tasks (with ID 3, 4, 5, 6, 7, 8, 9, 10) in Executor1 
> (3 10) will be reading from kafka partitions 4, 5, 6 and 7. Similarly, tasks 
> in Executor2 (11 18) will be reading from kafka partitions 0, 1, 2 and 3. 
> These log lines are being printed by Tasks with IDs 10 and 15 in respective 
> executors. 
> Logs which emit individual messages do not abide by the above assumption. For 
> example in the log mentioned below, Task ID 3 (added code, as a part of 
> debugging STORM-2506, to print the Task ID right next to component ID) which 
> runs on Executor1 reads from partition 2 (the second value inside the square 
> brackets), instead of 4, 5, 6 or 7. 
> {noformat}Thread-12-kafkaspout-executor[3 10] [INFO] Emitting: kafkaspout 3 
> default [8topic, 2, 0, null, 1]{noformat}
> This behavior has been summarized in the table below : 
> {noformat}
> Task IDs --- 3, 4, 7, 8, 9, 11, 15, 18  Partitions 0, 1, 2, 3
> Task IDs --- 5, 6, 10, 12, 13, 14, 16, 17 - Partition 4, 5, 6, 7
> {noformat}
> [You can find the relevant parts of log file 
> here.|https://gist.github.com/srishtyagrawal/f7c53db6b8391e2c3bd522afc93b5351]
>  
> Am I misunderstanding something here? Do tasks {{5, 6, 10, 12, 13, 14, 16, 
> 17}} correspond to executor1 and {{3, 4, 7, 8, 9, 11, 15, 18}} correspond to 
> executor2? Are (3 10) not the starting and ending task IDs in Executor1? 
> Another interesting thing to note is that, Task IDs 10 and 15 are always 
> reading from the partitions they claimed to be reading from (while setting 
> partition assignments). 
> If my assumptions are correct, there is a bug in the way the mapping 
> information is being/passed to worker logs. If not, we need to make changes 
> in our docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2504) At the start of topology, the KafkaTridentSpoutOpaque will sometimes emit the first batch twice

2018-04-27 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456961#comment-16456961
 ] 

Stig Rohde Døssing commented on STORM-2504:
---

Sorry, this was missed. If this is still a problem in current versions, please 
open a PR on Github https://github.com/apache/storm/pulls.

> At the start of topology, the KafkaTridentSpoutOpaque will sometimes emit the 
> first batch twice 
> 
>
> Key: STORM-2504
> URL: https://issues.apache.org/jira/browse/STORM-2504
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.1.0
>Reporter: Jing Weng
>Priority: Major
> Attachments: storm-kafka-bug.zip
>
>
> The unit test in the attachment can reproduce the problem, and there is my 
> simple fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2554) Trident Kafka Spout Refactoring to Include Manual Partition Assignment

2018-04-27 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456956#comment-16456956
 ] 

Stig Rohde Døssing commented on STORM-2554:
---

[~hmclouro] Is this issue still valid, or can we close it?

> Trident Kafka Spout Refactoring to Include Manual Partition Assignment
> --
>
> Key: STORM-2554
> URL: https://issues.apache.org/jira/browse/STORM-2554
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.x
>Reporter: Hugo Louro
>Assignee: Hugo Louro
>Priority: Critical
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Incorporate changes done in STORM-2541 and do some refactoring to internal 
> state partition management to make it cleaner and more properly handle 
> partitions reassignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (STORM-2711) use KafkaTridentSpoutOpaque poll msg slowly.

2018-04-27 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-2711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing closed STORM-2711.
-
Resolution: Unresolved

Please reopen if you still have a question

> use KafkaTridentSpoutOpaque poll msg slowly.
> 
>
> Key: STORM-2711
> URL: https://issues.apache.org/jira/browse/STORM-2711
> Project: Apache Storm
>  Issue Type: Question
>  Components: examples, storm-kafka-client
>Affects Versions: 1.1.0
> Environment: mac,IDEA, LocalCluster
>Reporter: shezler
>Priority: Major
>  Labels: newbie
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> At first I run producer examples to make msgs in kafka, which topic is 5 
> partition 1 replication, then the number of total message was about 4000, per 
> partition almost 800. Then I {color:red}run the part of consumer example in 
> TridentKafkaClientWordCountNamedTopics in storm-kafka-client-examples{color}, 
> First pull messages at a certain speed, when each partition to more than 500, 
> significantly {color:red}slower speed {color}. I wonder why



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2711) use KafkaTridentSpoutOpaque poll msg slowly.

2018-04-27 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456952#comment-16456952
 ] 

Stig Rohde Døssing commented on STORM-2711:
---

Sorry, seems we missed this. Please let me know if there's still a question 
here. The spout implementation has changed a lot since 1.1.0.

> use KafkaTridentSpoutOpaque poll msg slowly.
> 
>
> Key: STORM-2711
> URL: https://issues.apache.org/jira/browse/STORM-2711
> Project: Apache Storm
>  Issue Type: Question
>  Components: examples, storm-kafka-client
>Affects Versions: 1.1.0
> Environment: mac,IDEA, LocalCluster
>Reporter: shezler
>Priority: Major
>  Labels: newbie
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> At first I run producer examples to make msgs in kafka, which topic is 5 
> partition 1 replication, then the number of total message was about 4000, per 
> partition almost 800. Then I {color:red}run the part of consumer example in 
> TridentKafkaClientWordCountNamedTopics in storm-kafka-client-examples{color}, 
> First pull messages at a certain speed, when each partition to more than 500, 
> significantly {color:red}slower speed {color}. I wonder why



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (STORM-2880) Minor optimisation about kafka spout

2018-04-27 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-2880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing reopened STORM-2880:
---

> Minor optimisation about kafka spout
> 
>
> Key: STORM-2880
> URL: https://issues.apache.org/jira/browse/STORM-2880
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-kafka-client
>Affects Versions: 2.0.0
>Reporter: OuYang Liang
>Assignee: OuYang Liang
>Priority: Minor
>  Labels: pull-request-available
> Attachments: wx20180103-165...@2x.png
>
>   Original Estimate: 24h
>  Time Spent: 1h 10m
>  Remaining Estimate: 22h 50m
>
> Based on the single responsibility principle, method 
> isAtLeastOnceProcessing() should reside in KafkaSpoutConfig rather than 
> KafkaSpout. This patch removes the dependency of 
> KafkaSpoutConfig.ProcessingGuarantee from KafkaSpout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (STORM-2880) Minor optimisation about kafka spout

2018-04-27 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-2880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing closed STORM-2880.
-
Resolution: Not A Problem

> Minor optimisation about kafka spout
> 
>
> Key: STORM-2880
> URL: https://issues.apache.org/jira/browse/STORM-2880
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-kafka-client
>Affects Versions: 2.0.0
>Reporter: OuYang Liang
>Assignee: OuYang Liang
>Priority: Minor
>  Labels: pull-request-available
> Attachments: wx20180103-165...@2x.png
>
>   Original Estimate: 24h
>  Time Spent: 1h 10m
>  Remaining Estimate: 22h 50m
>
> Based on the single responsibility principle, method 
> isAtLeastOnceProcessing() should reside in KafkaSpoutConfig rather than 
> KafkaSpout. This patch removes the dependency of 
> KafkaSpoutConfig.ProcessingGuarantee from KafkaSpout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (STORM-2865) KafkaSpout constructor with KafkaConsumerFactory shoud be public

2018-04-27 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-2865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing closed STORM-2865.
-
   Resolution: Not A Problem
Fix Version/s: (was: 1.x)
   (was: 2.0.0)

> KafkaSpout constructor with KafkaConsumerFactory shoud be public
> 
>
> Key: STORM-2865
> URL: https://issues.apache.org/jira/browse/STORM-2865
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.x
> Environment: kafka version:0.11
> storm-kafka-client version: 1.1.*
>Reporter: YulongZ
>Priority: Minor
>
> When we use custom implement of interface "Deserializer",KafkaSpout 
> constructor can only use class "KafkaConsumerFactoryDefault",the method 
> “configure” of Interface “Deserializer” will not be called。
> We need change the "KafkaSpout" constructor with "KafkaConsumerFactory" to be 
> public, so we can create custom custom implement of interface 
> "KafkaConsumerFactory".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (STORM-2880) Minor optimisation about kafka spout

2018-04-27 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-2880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing closed STORM-2880.
-
Resolution: Won't Fix

> Minor optimisation about kafka spout
> 
>
> Key: STORM-2880
> URL: https://issues.apache.org/jira/browse/STORM-2880
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-kafka-client
>Affects Versions: 2.0.0
>Reporter: OuYang Liang
>Assignee: OuYang Liang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.0.0
>
> Attachments: wx20180103-165...@2x.png
>
>   Original Estimate: 24h
>  Time Spent: 1h 10m
>  Remaining Estimate: 22h 50m
>
> Based on the single responsibility principle, method 
> isAtLeastOnceProcessing() should reside in KafkaSpoutConfig rather than 
> KafkaSpout. This patch removes the dependency of 
> KafkaSpoutConfig.ProcessingGuarantee from KafkaSpout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-3013) Deactivated topology restarts if data flows into Kafka

2018-04-27 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456947#comment-16456947
 ] 

Stig Rohde Døssing commented on STORM-3013:
---

[~Ajeesh] Kindly reminder, did the fix resolve your problem?

> Deactivated topology restarts if data flows into Kafka
> --
>
> Key: STORM-3013
> URL: https://issues.apache.org/jira/browse/STORM-3013
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.2.1
>Reporter: Ajeesh B
>Assignee: Stig Rohde Døssing
>Priority: Major
>
> Hi, I have deactivated the storm topology & then if I produce any records 
> into Kafka, Storm throws an exception. Exception follows,
> {code:java}
> 2018-03-28 09:50:23.804 o.a.s.d.executor Thread-83-kafkaLogs-executor[130 
> 130] [INFO] Deactivating spout kafkaLogs:(130)
> 2018-03-28 09:51:01.289 o.a.s.util Thread-17-kafkaLogs-executor[139 139] 
> [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.IllegalStateException: This consumer 
> has already been closed.
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
> [storm-core-1.2.1.jar:1.2.1]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45]
> Caused by: java.lang.IllegalStateException: This consumer has already been 
> closed.
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
>  ~[stormjar.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
> at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]
> at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]
> at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) 
> ~[clojure-1.7.0.jar:?]
> at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?]
> at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
>  ~[storm-core-1.2.1.jar:1.2.1]
> ... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-2974) Add a transactional non-opaque spout to storm-kafka-client

2018-04-27 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-2974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing updated STORM-2974:
--
Component/s: storm-kafka-client

> Add a transactional non-opaque spout to storm-kafka-client
> --
>
> Key: STORM-2974
> URL: https://issues.apache.org/jira/browse/STORM-2974
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-kafka-client
>Affects Versions: 2.0.0
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-04-27 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing updated STORM-3046:
--
Component/s: (was: rm-kafka-client)
 storm-kafka-client

> Getting a NPE leading worker to die when starting a topology.
> -
>
> Key: STORM-3046
> URL: https://issues.apache.org/jira/browse/STORM-3046
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client, trident
>Affects Versions: 1.2.1
>Reporter: Kush Khandelwal
>Priority: Blocker
>  Labels: kafka, storm-kafka-client, trident
> Attachments: TestTopology.java
>
>
> I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
> version 1.1.0.
> We have an external kafka from where we get the messages.
>  Whenever I try to run the topology, I get a NPE, which leads to the worker 
> getting died.
> If I set poll strategy to earliest and the topic already contains some 
> messages, it works fine.
>  I have used a custom record translator which is working fine.
>  Can someone please help me fix the issue?
> Thanks.
>  
> Error - 
> 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
> o.a.s.util - Async loop died!
>  java.lang.RuntimeException: java.lang.NullPointerException
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
> [storm-core-1.2.1.jar:1.2.1]
>  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
>  Caused by: java.lang.NullPointerException
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  ... 6 more
>  
>  
> Topology class - 
>  
>  
>  
>  
> import org.apache.storm.Config;
> import org.apache.storm.LocalCluster;
> import org.apache.storm.StormSubmitter;
> import org.apache.storm.generated.*;
> import org.apache.storm.kafka.spout.KafkaSpoutConfig;
> import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
> import org.apache.storm.trident.Stream;
> import org.apache.storm.trident.TridentState;
> import org.apache.storm.trident.TridentTopology;
> import org.apache.storm.tuple.Fields;
> import java.util.Properties;
>  
> public class TestTopology {
>  
> private static StormTopology buildTopology(Properties stormProperties) {
>  
> Properties kafkaProperties = getProperties("/kafka.properties");
>  TridentTopology topology = new TridentTopology();
> Fields stageArguments = new Fields("test", "issue");
> KafkaSpoutConfig kafkaSpoutConfig = 
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
> "test")
>  .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
>  .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
>  .setRecordTranslator(new RecordTranslator(), stageArguments)
>  .build();
> KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new 
> 

[jira] [Commented] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-04-27 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456942#comment-16456942
 ] 

Stig Rohde Døssing commented on STORM-3046:
---

This is caused by a bad assumption I made here 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L161.

I assumed that the last batch metadata would only be null for the first batch, 
or reemits of the first batch. I forgot to account for cases where the first 
batch contains no tuples, because in that case we'll set the metadata for the 
first batch to null (see 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L125).
 In the following call to emitBatch, we're going to hit this line with a null 
lastBatchMeta 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L181.
 This is very likely going to happen if you use e.g. LATEST as the 
FirstPollOffsetStrategy, and as you observe it doesn't happen if you start at 
the beginning of a partition and there are messages to emit.

Let me know if you'd like to try to fix it. If not I'll be happy to give it a 
shot, and would appreciate if you would try out the potential fix.



> Getting a NPE leading worker to die when starting a topology.
> -
>
> Key: STORM-3046
> URL: https://issues.apache.org/jira/browse/STORM-3046
> Project: Apache Storm
>  Issue Type: Bug
>  Components: rm-kafka-client, trident
>Affects Versions: 1.2.1
>Reporter: Kush Khandelwal
>Priority: Blocker
>  Labels: kafka, storm-kafka-client, trident
> Attachments: TestTopology.java
>
>
> I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
> version 1.1.0.
> We have an external kafka from where we get the messages.
>  Whenever I try to run the topology, I get a NPE, which leads to the worker 
> getting died.
> If I set poll strategy to earliest and the topic already contains some 
> messages, it works fine.
>  I have used a custom record translator which is working fine.
>  Can someone please help me fix the issue?
> Thanks.
>  
> Error - 
> 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
> o.a.s.util - Async loop died!
>  java.lang.RuntimeException: java.lang.NullPointerException
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
> [storm-core-1.2.1.jar:1.2.1]
>  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
>  Caused by: java.lang.NullPointerException
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  ... 6 more
>  
>  
> Topology class - 
>  
>  
>  
>  
> import org.apache.storm.Config;
> 

[jira] [Assigned] (STORM-3045) Microsoft Azure EventHubs: Storm Spout and Bolt improvements

2018-04-27 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-3045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing reassigned STORM-3045:
-

Assignee: Sreeram Garlapati

> Microsoft Azure EventHubs: Storm Spout and Bolt improvements
> 
>
> Key: STORM-3045
> URL: https://issues.apache.org/jira/browse/STORM-3045
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-eventhubs
>Affects Versions: 2.0.0
>Reporter: Sreeram Garlapati
>Assignee: Sreeram Garlapati
>Priority: Minor
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-3044) AutoTGT should ideally check if a TGT is specific to IP addresses and reject

2018-04-27 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-3044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated STORM-3044:
--
Labels: pull-request-available  (was: )

> AutoTGT should ideally check if a TGT is specific to IP addresses and reject
> 
>
> Key: STORM-3044
> URL: https://issues.apache.org/jira/browse/STORM-3044
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Ethan Li
>Assignee: Ethan Li
>Priority: Major
>  Labels: pull-request-available
>
> There are several options that a TGT can have, one of them is being 
> forwardable, another is having one or more IP addresses in it that make it so 
> it cannot be used anywhere else. If the ticket is forwardable, but is tied to 
> IP addresses it will not likely work for storm so we should reject it.
> It looks like we can call getClientAddresses() on the ticket and if it 
> returns something then we should reject it. We should also include 
> instructions about how to get a proper ticekt in the error message.
> `kinit -A -f`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-04-27 Thread Kush Khandelwal (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kush Khandelwal updated STORM-3046:
---
Description: 
I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
version 1.1.0.

We have an external kafka from where we get the messages.
 Whenever I try to run the topology, I get a NPE, which leads to the worker 
getting died.
If I set poll strategy to earliest and the topic already contains some 
messages, it works fine.
 I have used a custom record translator which is working fine.

 Can someone please help me fix the issue?

Thanks.

 

Error - 

10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
o.a.s.util - Async loop died!
 java.lang.RuntimeException: java.lang.NullPointerException
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
 ~[storm-core-1.2.1.jar:1.2.1]
 at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
[storm-core-1.2.1.jar:1.2.1]
 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
 Caused by: java.lang.NullPointerException
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
 ~[storm-core-1.2.1.jar:1.2.1]
 ... 6 more

 

 

Topology class - 

 

 

 

 

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.*;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;

import java.util.Properties;

 


public class TestTopology {

 

private static StormTopology buildTopology(Properties stormProperties) {
 

Properties kafkaProperties = getProperties("/kafka.properties");
 TridentTopology topology = new TridentTopology();



Fields stageArguments = new Fields("test", "issue");




KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
"test")
 .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
 .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
 .setRecordTranslator(new RecordTranslator(), stageArguments)
 .build();






KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new 
KafkaTridentSpoutOpaque(kafkaSpoutConfig);


Grouping partitionGroup = getPartitionGroup("test");

log.info("Creating Opaque-Trident-Kafka-Spout");



final Stream kafkaSpout = 
topology.newStream(stormProperties.getProperty("SPOUT_NAME"), 
kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1);
 
TridentState testUpdate = 
kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new 
MainMemoryStateFactory(), stageArguments, new 
MainMemoryStateUpdater(), stageArguments).parallelismHint(1);
 

Stream viewUpdate = 
ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments,
 new UpdateView(), new Fields()).parallelismHint(2);

return topology.build();
 }

public static void main(String[] args) {
 Config conf = new Config();
 

[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-04-27 Thread Kush Khandelwal (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kush Khandelwal updated STORM-3046:
---
Description: 
I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
version 1.1.0.

We have an external kafka from where we get the messages.
 Whenever I try to run the topology, I get a NPE, which leads to the worker 
getting died.
 Can someone please help me fix the issue?

Thanks.

 

Error - 

10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
o.a.s.util - Async loop died!
 java.lang.RuntimeException: java.lang.NullPointerException
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
 ~[storm-core-1.2.1.jar:1.2.1]
 at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
[storm-core-1.2.1.jar:1.2.1]
 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
 Caused by: java.lang.NullPointerException
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
 ~[storm-core-1.2.1.jar:1.2.1]
 ... 6 more

 

 

Topology class - 

 

 

 

 

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.*;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;

import java.util.Properties;

 


public class TestTopology {

 

private static StormTopology buildTopology(Properties stormProperties) {
 

Properties kafkaProperties = getProperties("/kafka.properties");
 TridentTopology topology = new TridentTopology();



Fields stageArguments = new Fields("test", "issue");




KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
"test")
 .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
 .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
 .setRecordTranslator(new RecordTranslator(), stageArguments)
 .build();




// If I set poll strategy to earliest and the topic already contains some 
messages, it works fine.
 // I have used a custom record translator which is working fine.


KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new 
KafkaTridentSpoutOpaque(kafkaSpoutConfig);


Grouping partitionGroup = getPartitionGroup("test");

log.info("Creating Opaque-Trident-Kafka-Spout");



final Stream kafkaSpout = 
topology.newStream(stormProperties.getProperty("SPOUT_NAME"), 
kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1);
 
TridentState testUpdate = 
kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new 
MainMemoryStateFactory(), stageArguments, new 
MainMemoryStateUpdater(), stageArguments).parallelismHint(1);
 

Stream viewUpdate = 
ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments,
 new UpdateView(), new Fields()).parallelismHint(2);

return topology.build();
 }

public static void main(String[] args) {
 Config conf = new 

[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-04-27 Thread Kush Khandelwal (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kush Khandelwal updated STORM-3046:
---
Description: 
I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
version 1.1.0.

We have an external kafka from where we get the messages.
 Whenever I try to run the topology, I get a NPE, which leads to the worker 
getting died.
 Can someone please help me fix the issue?

Thanks.

 

Error - 

10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
o.a.s.util - Async loop died!
 java.lang.RuntimeException: java.lang.NullPointerException
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
 ~[storm-core-1.2.1.jar:1.2.1]
 at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
[storm-core-1.2.1.jar:1.2.1]
 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
 Caused by: java.lang.NullPointerException
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
 ~[storm-core-1.2.1.jar:1.2.1]
 ... 6 more

 

 

Topology class - 

 

 

 

import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.*;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.tuple.Fields;

import java.util.Properties;

 

public class TestTopology {

 

private static StormTopology buildTopology(Properties stormProperties)

{ Properties kafkaProperties = getProperties("/kafka.properties"); 
TridentTopology topology = new TridentTopology(); Fields stageArguments = 
new Fields("test", "issue"); KafkaSpoutConfig kafkaSpoutConfig 
= KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
"test") 
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) 
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) 
.setRecordTranslator(new RecordTranslator(), stageArguments) .build(); // If I 
set poll strategy to earliest and the topic already contains some messages, it 
works fine. // I have used a custom record translator which is working fine. 
KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new 
KafkaTridentSpoutOpaque(kafkaSpoutConfig); Grouping partitionGroup = 
getPartitionGroup("test"); log.info("Creating Opaque-Trident-Kafka-Spout"); 
final Stream kafkaSpout = 
topology.newStream(stormProperties.getProperty("SPOUT_NAME"), 
kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1); TridentState 
testUpdate = 
kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new 
MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), 
stageArguments).parallelismHint(1); Stream viewUpdate = 
ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments,
 new UpdateView(), new Fields()).parallelismHint(2); return topology.build(); }

public static void main(String[] args) {
 Config conf = new Config();
 

[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-04-27 Thread Kush Khandelwal (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kush Khandelwal updated STORM-3046:
---
Description: 
I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
version 1.1.0.

We have an external kafka from where we get the messages.
 Whenever I try to run the topology, I get a NPE, which leads to the worker 
getting died.
 Can someone please help me fix the issue?

Thanks.

 

Error - 

10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
o.a.s.util - Async loop died!
 java.lang.RuntimeException: java.lang.NullPointerException
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
 ~[storm-core-1.2.1.jar:1.2.1]
 at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
[storm-core-1.2.1.jar:1.2.1]
 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
 Caused by: java.lang.NullPointerException
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
 ~[storm-core-1.2.1.jar:1.2.1]
 ... 6 more

 

 

Topology class - 

 

 

 

import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.*;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.tuple.Fields;

import java.util.Properties;

 

public class TestTopology {

 

private static StormTopology buildTopology(Properties stormProperties)

{

Properties kafkaProperties = getProperties("/kafka.properties");

TridentTopology topology = new TridentTopology();

Fields stageArguments = new Fields("test", "issue");

KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
"test") 
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) 
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)

.setRecordTranslator(new RecordTranslator(), stageArguments)

.build();

// If I set poll strategy to earliest and the topic already contains some 
messages, it works fine.

// I have used a custom record translator which is working fine.

KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new 
KafkaTridentSpoutOpaque(kafkaSpoutConfig);

Grouping partitionGroup = getPartitionGroup("test");

log.info("Creating Opaque-Trident-Kafka-Spout");

final Stream kafkaSpout = 
topology.newStream(stormProperties.getProperty("SPOUT_NAME"), 
kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1);

TridentState testUpdate = 
kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new 
MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), 
stageArguments).parallelismHint(1);

Stream viewUpdate = 
ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments,
 new UpdateView(), new Fields()).parallelismHint(2); return topology.build(); }

public static void main(String[] args) {
 Config conf = new Config();
 

[jira] [Created] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-04-27 Thread Kush Khandelwal (JIRA)
Kush Khandelwal created STORM-3046:
--

 Summary: Getting a NPE leading worker to die when starting a 
topology.
 Key: STORM-3046
 URL: https://issues.apache.org/jira/browse/STORM-3046
 Project: Apache Storm
  Issue Type: Bug
  Components: rm-kafka-client, trident
Affects Versions: 1.2.1
Reporter: Kush Khandelwal
 Attachments: TestTopology.java

I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
version 1.1.0.

We have an external kafka from where we get the messages.
Whenever I try to run the topology, I get a NPE, which leads to the worker 
getting died.
Can someone please help me fix the issue?

Thanks.

 

Error - 

10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
o.a.s.util - Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
 ~[storm-core-1.2.1.jar:1.2.1]
 at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
[storm-core-1.2.1.jar:1.2.1]
 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
Caused by: java.lang.NullPointerException
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
 ~[storm-core-1.2.1.jar:1.2.1]
 ... 6 more

 

 

Topology class - 

 

 

 

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.*;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;

import java.util.Properties;

 


public class TestTopology {

 

private static StormTopology buildTopology(Properties stormProperties) {
 Properties kafkaProperties = getProperties("/kafka.properties");
 TridentTopology topology = new TridentTopology();

Fields stageArguments = new Fields("test", "issue");

KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
"test")
 .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
 .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
 .setRecordTranslator(new RecordTranslator(), stageArguments)
 .build();

// If I set poll strategy to earliest and the topic already contains some 
messages, it works fine.
 // I have used a custom record translator which is working fine.

KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new 
KafkaTridentSpoutOpaque(kafkaSpoutConfig);

Grouping partitionGroup = getPartitionGroup("test");

log.info("Creating Opaque-Trident-Kafka-Spout");

final Stream kafkaSpout = 
topology.newStream(stormProperties.getProperty("SPOUT_NAME"), 
kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1);
 TridentState testUpdate = 
kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new 
MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), 
stageArguments).parallelismHint(1);
 Stream viewUpdate =