Re: KafkaSpout not receiving new messages until restart

2016-08-11 Thread Amber Kulkarni
This is happening in both local and prod mode(topology) ?

On Fri, Aug 5, 2016 at 6:58 AM, Jason Kania  wrote:

> Hello,
>
> I am using Storm 0.10.0 and have an application pulling content off a
> Kafka topic via the Kafka spout but after some time nothing is being
> injected. However, after a restart of the topic, more messages come in for
> a while. I have done my best to confirm that there are no tuple timeouts by
> tracing code and have looked at the tuple visualization but found no lost
> tuples. There is no indication of any tuples lost or any timeouts.
>
> I am logging every time something comes into the first bolt and see
> nothing there either.
>
> At this point, I am not sure what I can do to debug further.
>
> At this point we cannot upgrade.
>
> Any suggestions to get to the bottom of this?
>
> Thanks,
>
> Jason
>



-- 
Regards,
Amber Kulkarni


Re: How long until fields grouping gets overwhelmed with data?

2016-08-11 Thread Navin Ipe
Thank you very much Erik. So this is the code:

@Override
public List chooseTasks(int taskId, List values) {
int targetTaskIndex =
Math.abs(TupleUtils.listHashCode(outFields.select(groupFields,
values))) % numTasks;
return Collections.singletonList(targetTasks.get(targetTaskIndex));
}

TupleUtils.listHashCode leads to
public static  int listHashCode(List alist) {
  if (alist == null) {
  return 1;
  } else {
  return Arrays.deepHashCode(alist.toArray());
  }
}

So it does seem like the function is independent of which Spout emits it.
What matters is the field name based on which the fields grouping is done.
Thanks everyone!


On Thu, Aug 11, 2016 at 3:52 PM, Erik Weathers 
wrote:

> I think these are the appropriate code pointers:
>
> Original Clojure-based storm-core:
>
> https://github.com/apache/storm/blob/v0.9.6/storm-core/
> src/clj/backtype/storm/daemon/executor.clj#L36-L39
>
>
> New Java-based storm-core:
>
> https://github.com/apache/storm/blob/3b1ab3d8a7da7ed35adc448d24f1f1
> ccb6c5ff27/storm-core/src/jvm/org/apache/storm/daemon/
> GrouperFactory.java#L157-L161
>
>
> On Thu, Aug 11, 2016 at 2:57 AM, Navin Ipe  com> wrote:
>
>> True, but that's what I wanted to confirm by mentioning spout S1 and S2.
>> Will S1 and S2 use their own n mod hash functions or is it a common
>> function decided by Storm? (If anyone could offer a pointer on where I
>> could find this in the Storm source code, I could try finding it myself too)
>>
>> On Thu, Aug 11, 2016 at 2:36 PM, Gireesh Ramji 
>> wrote:
>>
>>> It does not matter who hashes it as long as they all use the same hash
>>> function it will go to the same bolt
>>>
>>>
>>> --
>>> *From:* Navin Ipe 
>>> *To:* user@storm.apache.org
>>> *Sent:* Thursday, August 11, 2016 4:56 PM
>>> *Subject:* Re: How long until fields grouping gets overwhelmed with
>>> data?
>>>
>>> If the hash is dynamically computed and is stateless, then that brings
>>> up one more question.
>>>
>>> Let's say there are two spout classes S1 and S2. I create 10 tasks of S1
>>> and 10 tasks of S2.
>>> There are 10 tasks of a bolt B.
>>>
>>> S1 and S2 are fieldsGrouped with B.
>>>
>>> I receive data x in S1 and another data x in S2.
>>>
>>> If S1's emit of x goes to task1 of B, then will S2's emit of x also go
>>> to task1 of B?
>>>
>>> *Basically the question is: *Is the hash value decided by the Spout or
>>> by Storm? Because if it is decided by the spout, then S1's emit of x can go
>>> to task 1 but S2's emit of x might go to some other task of the bolt, and
>>> that won't serve the purpose of someone who wants all x'es to go to one
>>> bolt.
>>>
>>>
>>>
>>>
>>> On Wed, Aug 10, 2016 at 8:58 PM, Navin Ipe <
>>> navin@searchlighthealth.com> wrote:
>>>
>>> Oh that's good to know. I assume it works like this: 
>>> https://en.wikipedia.org/wiki/
>>> Hash_function#Hashing_ uniformly_distributed_data
>>> 
>>>
>>> On Wed, Aug 10, 2016 at 6:23 PM, Nathan Leung  wrote:
>>>
>>> It's based on a modulo of a hash of the field. The fields grouping is
>>> stateless.
>>>
>>> On Aug 10, 2016 8:18 AM, "Navin Ipe" >> > wrote:
>>>
>>> Hi,
>>>
>>> For spouts to be able to continuously send a fields grouped tuple to the
>>> same bolt, it would have to store a key value map something like this,
>>> right?
>>>
>>> field1023 ---> Bolt1
>>> field1343 ---> Bolt3
>>> field1629 ---> Bolt5
>>> field1726 ---> Bolt1
>>> field1481 ---> Bolt3
>>>
>>> So if my topology runs for a very long time and the spout generates many
>>> unique field values, won't this key value map run out of memory eventually?
>>>
>>> OR is there a failsafe or a map limit that Storm has to handle this
>>> without crashing?
>>>
>>> If memory problems could happen, what would be an alternative way to
>>> solve this problem where many unique fields could get generated over time?
>>>
>>> --
>>> Regards,
>>> Navin
>>>
>>>
>>>
>>>
>>> --
>>> Regards,
>>> Navin
>>>
>>>
>>>
>>>
>>> --
>>> Regards,
>>> Navin
>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> Navin
>>
>
>


-- 
Regards,
Navin


Re: [ANNOUNCE] Apache Storm 1.0.2 Released

2016-08-11 Thread 马哲超
Distribution artifacts are not ready in Maven Central yet.

2016-08-11 3:15 GMT+08:00 P. Taylor Goetz :

> The Apache Storm community is pleased to announce the release of Apache
> Storm version 1.0.2.
>
> Storm is a distributed, fault-tolerant, and high-performance realtime
> computation system that provides strong guarantees on the processing of
> data. You can read more about Storm on the project website:
>
> http://storm.apache.org
>
> Downloads of source and binary distributions are listed in our download
> section:
>
> http://storm.apache.org/downloads.html
>
> You can read more about this release in the following blog post:
>
> https://storm.apache.org/2016/08/10/storm102-released.html
>
> Distribution artifacts are available in Maven Central at the following
> coordinates:
>
> groupId: org.apache.storm
> artifactId: storm-core
> version: 1.0.2
>
> The full list of changes is available here[1]. Please let us know [2] if
> you encounter any problems.
>
> Regards,
>
> The Apache Storm Team
>
> [1]: https://github.com/apache/storm/blob/v1.0.2/CHANGELOG.md
> [2]: https://issues.apache.org/jira/browse/STORM
>


Re: How long until fields grouping gets overwhelmed with data?

2016-08-11 Thread Erik Weathers
I think these are the appropriate code pointers:

Original Clojure-based storm-core:

https://github.com/apache/storm/blob/v0.9.6/storm-core/src/clj/backtype/storm/daemon/executor.clj#L36-L39


New Java-based storm-core:

https://github.com/apache/storm/blob/3b1ab3d8a7da7ed35adc448d24f1f1ccb6c5ff27/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java#L157-L161


On Thu, Aug 11, 2016 at 2:57 AM, Navin Ipe 
wrote:

> True, but that's what I wanted to confirm by mentioning spout S1 and S2.
> Will S1 and S2 use their own n mod hash functions or is it a common
> function decided by Storm? (If anyone could offer a pointer on where I
> could find this in the Storm source code, I could try finding it myself too)
>
> On Thu, Aug 11, 2016 at 2:36 PM, Gireesh Ramji 
> wrote:
>
>> It does not matter who hashes it as long as they all use the same hash
>> function it will go to the same bolt
>>
>>
>> --
>> *From:* Navin Ipe 
>> *To:* user@storm.apache.org
>> *Sent:* Thursday, August 11, 2016 4:56 PM
>> *Subject:* Re: How long until fields grouping gets overwhelmed with data?
>>
>> If the hash is dynamically computed and is stateless, then that brings up
>> one more question.
>>
>> Let's say there are two spout classes S1 and S2. I create 10 tasks of S1
>> and 10 tasks of S2.
>> There are 10 tasks of a bolt B.
>>
>> S1 and S2 are fieldsGrouped with B.
>>
>> I receive data x in S1 and another data x in S2.
>>
>> If S1's emit of x goes to task1 of B, then will S2's emit of x also go to
>> task1 of B?
>>
>> *Basically the question is: *Is the hash value decided by the Spout or
>> by Storm? Because if it is decided by the spout, then S1's emit of x can go
>> to task 1 but S2's emit of x might go to some other task of the bolt, and
>> that won't serve the purpose of someone who wants all x'es to go to one
>> bolt.
>>
>>
>>
>>
>> On Wed, Aug 10, 2016 at 8:58 PM, Navin Ipe > om> wrote:
>>
>> Oh that's good to know. I assume it works like this: 
>> https://en.wikipedia.org/wiki/
>> Hash_function#Hashing_ uniformly_distributed_data
>> 
>>
>> On Wed, Aug 10, 2016 at 6:23 PM, Nathan Leung  wrote:
>>
>> It's based on a modulo of a hash of the field. The fields grouping is
>> stateless.
>>
>> On Aug 10, 2016 8:18 AM, "Navin Ipe" > > wrote:
>>
>> Hi,
>>
>> For spouts to be able to continuously send a fields grouped tuple to the
>> same bolt, it would have to store a key value map something like this,
>> right?
>>
>> field1023 ---> Bolt1
>> field1343 ---> Bolt3
>> field1629 ---> Bolt5
>> field1726 ---> Bolt1
>> field1481 ---> Bolt3
>>
>> So if my topology runs for a very long time and the spout generates many
>> unique field values, won't this key value map run out of memory eventually?
>>
>> OR is there a failsafe or a map limit that Storm has to handle this
>> without crashing?
>>
>> If memory problems could happen, what would be an alternative way to
>> solve this problem where many unique fields could get generated over time?
>>
>> --
>> Regards,
>> Navin
>>
>>
>>
>>
>> --
>> Regards,
>> Navin
>>
>>
>>
>>
>> --
>> Regards,
>> Navin
>>
>>
>>
>
>
> --
> Regards,
> Navin
>


Re: How long until fields grouping gets overwhelmed with data?

2016-08-11 Thread Navin Ipe
True, but that's what I wanted to confirm by mentioning spout S1 and S2.
Will S1 and S2 use their own n mod hash functions or is it a common
function decided by Storm? (If anyone could offer a pointer on where I
could find this in the Storm source code, I could try finding it myself too)

On Thu, Aug 11, 2016 at 2:36 PM, Gireesh Ramji 
wrote:

> It does not matter who hashes it as long as they all use the same hash
> function it will go to the same bolt
>
>
> --
> *From:* Navin Ipe 
> *To:* user@storm.apache.org
> *Sent:* Thursday, August 11, 2016 4:56 PM
> *Subject:* Re: How long until fields grouping gets overwhelmed with data?
>
> If the hash is dynamically computed and is stateless, then that brings up
> one more question.
>
> Let's say there are two spout classes S1 and S2. I create 10 tasks of S1
> and 10 tasks of S2.
> There are 10 tasks of a bolt B.
>
> S1 and S2 are fieldsGrouped with B.
>
> I receive data x in S1 and another data x in S2.
>
> If S1's emit of x goes to task1 of B, then will S2's emit of x also go to
> task1 of B?
>
> *Basically the question is: *Is the hash value decided by the Spout or by
> Storm? Because if it is decided by the spout, then S1's emit of x can go to
> task 1 but S2's emit of x might go to some other task of the bolt, and that
> won't serve the purpose of someone who wants all x'es to go to one bolt.
>
>
>
>
> On Wed, Aug 10, 2016 at 8:58 PM, Navin Ipe  com> wrote:
>
> Oh that's good to know. I assume it works like this: 
> https://en.wikipedia.org/wiki/
> Hash_function#Hashing_ uniformly_distributed_data
> 
>
> On Wed, Aug 10, 2016 at 6:23 PM, Nathan Leung  wrote:
>
> It's based on a modulo of a hash of the field. The fields grouping is
> stateless.
>
> On Aug 10, 2016 8:18 AM, "Navin Ipe"  > wrote:
>
> Hi,
>
> For spouts to be able to continuously send a fields grouped tuple to the
> same bolt, it would have to store a key value map something like this,
> right?
>
> field1023 ---> Bolt1
> field1343 ---> Bolt3
> field1629 ---> Bolt5
> field1726 ---> Bolt1
> field1481 ---> Bolt3
>
> So if my topology runs for a very long time and the spout generates many
> unique field values, won't this key value map run out of memory eventually?
>
> OR is there a failsafe or a map limit that Storm has to handle this
> without crashing?
>
> If memory problems could happen, what would be an alternative way to solve
> this problem where many unique fields could get generated over time?
>
> --
> Regards,
> Navin
>
>
>
>
> --
> Regards,
> Navin
>
>
>
>
> --
> Regards,
> Navin
>
>
>


-- 
Regards,
Navin


Re: How long until fields grouping gets overwhelmed with data?

2016-08-11 Thread Gireesh Ramji
It does not matter who hashes it as long as they all use the same hash function 
it will go to the same bolt

  From: Navin Ipe 
 To: user@storm.apache.org 
 Sent: Thursday, August 11, 2016 4:56 PM
 Subject: Re: How long until fields grouping gets overwhelmed with data?
   
If the hash is dynamically computed and is stateless, then that brings up one 
more question.

Let's say there are two spout classes S1 and S2. I create 10 tasks of S1 and 10 
tasks of S2.
There are 10 tasks of a bolt B.

S1 and S2 are fieldsGrouped with B.

I receive data x in S1 and another data x in S2. 

If S1's emit of x goes to task1 of B, then will S2's emit of x also go to task1 
of B?

Basically the question is: Is the hash value decided by the Spout or by Storm? 
Because if it is decided by the spout, then S1's emit of x can go to task 1 but 
S2's emit of x might go to some other task of the bolt, and that won't serve 
the purpose of someone who wants all x'es to go to one bolt.




On Wed, Aug 10, 2016 at 8:58 PM, Navin Ipe  
wrote:

Oh that's good to know. I assume it works like this: 
https://en.wikipedia.org/wiki/ Hash_function#Hashing_ uniformly_distributed_data

On Wed, Aug 10, 2016 at 6:23 PM, Nathan Leung  wrote:

It's based on a modulo of a hash of the field. The fields grouping is stateless.
On Aug 10, 2016 8:18 AM, "Navin Ipe"  wrote:

Hi,

For spouts to be able to continuously send a fields grouped tuple to the same 
bolt, it would have to store a key value map something like this, right?

field1023 ---> Bolt1
field1343 ---> Bolt3
field1629 ---> Bolt5
field1726 ---> Bolt1
field1481 ---> Bolt3

So if my topology runs for a very long time and the spout generates many unique 
field values, won't this key value map run out of memory eventually? 

OR is there a failsafe or a map limit that Storm has to handle this without 
crashing?

If memory problems could happen, what would be an alternative way to solve this 
problem where many unique fields could get generated over time?

-- 
Regards,Navin




-- 
Regards,Navin



-- 
Regards,Navin

   

Re: How long until fields grouping gets overwhelmed with data?

2016-08-11 Thread Navin Ipe
If the hash is dynamically computed and is stateless, then that brings up
one more question.

Let's say there are two spout classes S1 and S2. I create 10 tasks of S1
and 10 tasks of S2.
There are 10 tasks of a bolt B.

S1 and S2 are fieldsGrouped with B.

I receive data x in S1 and another data x in S2.

If S1's emit of x goes to task1 of B, then will S2's emit of x also go to
task1 of B?

*Basically the question is: *Is the hash value decided by the Spout or by
Storm? Because if it is decided by the spout, then S1's emit of x can go to
task 1 but S2's emit of x might go to some other task of the bolt, and that
won't serve the purpose of someone who wants all x'es to go to one bolt.




On Wed, Aug 10, 2016 at 8:58 PM, Navin Ipe 
wrote:

> Oh that's good to know. I assume it works like this:
> https://en.wikipedia.org/wiki/Hash_function#Hashing_
> uniformly_distributed_data
>
> On Wed, Aug 10, 2016 at 6:23 PM, Nathan Leung  wrote:
>
>> It's based on a modulo of a hash of the field. The fields grouping is
>> stateless.
>>
>> On Aug 10, 2016 8:18 AM, "Navin Ipe" 
>> wrote:
>>
>>> Hi,
>>>
>>> For spouts to be able to continuously send a fields grouped tuple to the
>>> same bolt, it would have to store a key value map something like this,
>>> right?
>>>
>>> field1023 ---> Bolt1
>>> field1343 ---> Bolt3
>>> field1629 ---> Bolt5
>>> field1726 ---> Bolt1
>>> field1481 ---> Bolt3
>>>
>>> So if my topology runs for a very long time and the spout generates many
>>> unique field values, won't this key value map run out of memory eventually?
>>>
>>> OR is there a failsafe or a map limit that Storm has to handle this
>>> without crashing?
>>>
>>> If memory problems could happen, what would be an alternative way to
>>> solve this problem where many unique fields could get generated over time?
>>>
>>> --
>>> Regards,
>>> Navin
>>>
>>
>
>
> --
> Regards,
> Navin
>



-- 
Regards,
Navin


(storm-kafka-client) KafkaSpout issues

2016-08-11 Thread Ziemer, Tom
Hi everybody,

I just tried to upgrade from the "traditional" KafkaSpout to the new 
storm-kafka-client version. Initially everything was working fine, yet I soon 
discovered that the new Spout stops emitting tuples after some time.
The happy path (just ack, no errors) seems to work fine, yet when tuples are 
failed or delayed, it seems that the (spout-internal) counters are incorrect.

After some debugging sessions, I found the reason here:

private boolean poll() {
  return !waitingToEmit() && numUncommittedOffsets < 
kafkaSpoutConfig.getMaxUncommittedOffsets();
}

This method always returned false since numUncommittedOffsets was much higher 
than kafkaSpoutConfig.getMaxUncommittedOffsets(). Apparently I am not the only 
one facing this problem because similar issues are reported here:
https://github.com/apache/storm/pull/1131#issuecomment-217776015

I could not find any commits related to these issues.

In the latest 1.0.2 release I saw that there are quite a few fixes for the 
"old" KafkaSpout, yet almost none for the "new" one. Will both versions be 
supported in future? Which one is recommended version to use?

Thanks,
Tom