Massive Number of Spout Failures

2016-07-27 Thread Kevin Peek
We have a topology that is experiencing massive amounts of spout failures
without corresponding bolt failures. We have been interpreting these as
tuple timeouts, but we seem to be getting more of these failures than we
understand to be possible with timeouts.

Our topology uses a Kafka spout and the topology is configured with:
topology.message.timeout.secs = 300
topology.max.spout.pending = 2500

Based on these settings, I would expect the topology to experience a
maximum of 2500 tuple timeouts per 300 seconds. But from the Storm UI, we
see that after running for about 10 minutes, the topology will show about
50K spout failures and zero bolt failures.

Am I misunderstanding something that would allow more tuples to time out,
or is there another source of spout failures?

Thanks in advance,
Kevin Peek


Re: Massive Number of Spout Failures

2016-07-27 Thread Kevin Peek
Thanks for the reply.

In either of these cases, shouldn't storm stop letting the spout emit
tuples once max_spout_pending is reached? In that case, the tuples already
in the topology (or dropped by accident, collected in a bolt, etc) will
take 5 minutes to time out, and the number of tuples failing in this way
will be limited to max_spout_pending per 5 minutes. The issue is we are
seeing a much higher level of spout failures.

On Wed, Jul 27, 2016 at 3:48 PM, Igor Kuzmenko  wrote:

> We have such fails with two reasons:
>
> 1) Bolt doesn't ack tuple immidiatly, but collects a batch and at some
> point ack's them all. In that case thes situation when batch bigger than
> max_spout_pending and some tuples fails.
>
> 2) Bolt doesn't ack tuple at all. Make sure Bolt acks or fails tuples
> without any exclusions.
>
> On Wed, Jul 27, 2016 at 10:22 PM, Kevin Peek  wrote:
>
>> We have a topology that is experiencing massive amounts of spout failures
>> without corresponding bolt failures. We have been interpreting these as
>> tuple timeouts, but we seem to be getting more of these failures than we
>> understand to be possible with timeouts.
>>
>> Our topology uses a Kafka spout and the topology is configured with:
>> topology.message.timeout.secs = 300
>> topology.max.spout.pending = 2500
>>
>> Based on these settings, I would expect the topology to experience a
>> maximum of 2500 tuple timeouts per 300 seconds. But from the Storm UI, we
>> see that after running for about 10 minutes, the topology will show about
>> 50K spout failures and zero bolt failures.
>>
>> Am I misunderstanding something that would allow more tuples to time out,
>> or is there another source of spout failures?
>>
>> Thanks in advance,
>> Kevin Peek
>>
>
>


Re: Massive Number of Spout Failures

2016-07-27 Thread Kevin Peek
Erik, we actually have 35 spout instances, so I think you've found the
issue. Thanks!

On Wed, Jul 27, 2016 at 4:44 PM, Erik Weathers 
wrote:

> How many spout tasks do you have?  The topology.max.spout.pending setting
> is *per* task.  Maybe you have 20?  20*2500 == 50K.
>
> On Wed, Jul 27, 2016 at 1:32 PM, Kevin Peek  wrote:
>
>> Thanks for the reply.
>>
>> In either of these cases, shouldn't storm stop letting the spout emit
>> tuples once max_spout_pending is reached? In that case, the tuples already
>> in the topology (or dropped by accident, collected in a bolt, etc) will
>> take 5 minutes to time out, and the number of tuples failing in this way
>> will be limited to max_spout_pending per 5 minutes. The issue is we are
>> seeing a much higher level of spout failures.
>>
>> On Wed, Jul 27, 2016 at 3:48 PM, Igor Kuzmenko 
>> wrote:
>>
>>> We have such fails with two reasons:
>>>
>>> 1) Bolt doesn't ack tuple immidiatly, but collects a batch and at some
>>> point ack's them all. In that case thes situation when batch bigger than
>>> max_spout_pending and some tuples fails.
>>>
>>> 2) Bolt doesn't ack tuple at all. Make sure Bolt acks or fails tuples
>>> without any exclusions.
>>>
>>> On Wed, Jul 27, 2016 at 10:22 PM, Kevin Peek 
>>> wrote:
>>>
>>>> We have a topology that is experiencing massive amounts of spout
>>>> failures without corresponding bolt failures. We have been interpreting
>>>> these as tuple timeouts, but we seem to be getting more of these failures
>>>> than we understand to be possible with timeouts.
>>>>
>>>> Our topology uses a Kafka spout and the topology is configured with:
>>>> topology.message.timeout.secs = 300
>>>> topology.max.spout.pending = 2500
>>>>
>>>> Based on these settings, I would expect the topology to experience a
>>>> maximum of 2500 tuple timeouts per 300 seconds. But from the Storm UI, we
>>>> see that after running for about 10 minutes, the topology will show about
>>>> 50K spout failures and zero bolt failures.
>>>>
>>>> Am I misunderstanding something that would allow more tuples to time
>>>> out, or is there another source of spout failures?
>>>>
>>>> Thanks in advance,
>>>> Kevin Peek
>>>>
>>>
>>>
>>
>


WorkerHook deserialization problem

2016-10-21 Thread Kevin Peek
I am running into problems with WorkerHooks on a local cluster. Even using
only a BaseWorkerHook, I get an Exception. When I run the following code,
an EOFException is thrown - it seems the Worker is trying to deserialize an
empty byte[] for one of the WorkerHooks. Comment out the line adding the
hook and this runs fine.

Can someone help me understand what is going wrong here and whether or not
this is strictly an issue with the LocalCluster and how I am using it.


TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spoutId", new RandomNumberSpout());
builder.addWorkerHook(new BaseWorkerHook());
StormTopology topology = builder.createTopology();
Config config = new Config();
config.setMessageTimeoutSecs(1);
String topologyName = "dummy-topology";

LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, config, topology);
Thread.sleep(5000);
cluster.killTopology(topologyName);
Thread.sleep(1);
cluster.shutdown();


Produces:


java.lang.RuntimeException: java.io.EOFException

at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:185)
at
org.apache.storm.daemon.worker$run_worker_shutdown_hooks$iter__8540__8544$fn__8545.invoke(worker.clj:576)
at clojure.lang.LazySeq.sval(LazySeq.java:40)
at clojure.lang.LazySeq.seq(LazySeq.java:49)
at clojure.lang.RT.seq(RT.java:507)
at clojure.core$seq__4128.invoke(core.clj:137)
at clojure.core$dorun.invoke(core.clj:3009)
at clojure.core$doall.invoke(core.clj:3025)
at
org.apache.storm.daemon.worker$run_worker_shutdown_hooks.invoke(worker.clj:574)
at
org.apache.storm.daemon.worker$fn__8555$exec_fn__2466__auto__$reify__8557$shutdown_STAR___8577.invoke(worker.clj:691)
at
org.apache.storm.daemon.worker$fn__8555$exec_fn__2466__auto__$reify$reify__8603.shutdown(worker.clj:704)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
at
org.apache.storm.process_simulator$kill_process.invoke(process_simulator.clj:46)
at
org.apache.storm.daemon.supervisor$shutdown_worker.invoke(supervisor.clj:286)
at
org.apache.storm.daemon.supervisor$fn__9307$exec_fn__2466__auto__$reify__9332.shutdown_all_workers(supervisor.clj:852)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
at org.apache.storm.testing$kill_local_storm_cluster.invoke(testing.clj:199)
at org.apache.storm.LocalCluster$_shutdown.invoke(LocalCluster.clj:66)
at org.apache.storm.LocalCluster.shutdown(Unknown Source)


Re: WorkerHook deserialization problem

2016-11-01 Thread Kevin Peek
thank you for the detailed response.

On Fri, Oct 28, 2016 at 4:15 PM, P. Taylor Goetz  wrote:

> I was able to verify this to be a bug in how worker hooks work in local
> mode.
>
> In trying to see if this affects distributed mode as well, a found a more
> serious issue that prevents workers from shutting down gracefully (an thus
> preventing shutdown hooks from running):
>
> https://issues.apache.org/jira/browse/STORM-2176
>
> So for the time being I don’t believe worker shutdown hooks work in either
> local or distributed mode. I can confirm the start portion of worker hooks
> functions properly, but not shutdown. Hopefully we will be able to fix both
> these issues in an upcoming release.
>
> -Taylor
>
>
> On Oct 21, 2016, at 9:58 AM, Kevin Peek  wrote:
>
> I am running into problems with WorkerHooks on a local cluster. Even using
> only a BaseWorkerHook, I get an Exception. When I run the following code,
> an EOFException is thrown - it seems the Worker is trying to deserialize an
> empty byte[] for one of the WorkerHooks. Comment out the line adding the
> hook and this runs fine.
>
> Can someone help me understand what is going wrong here and whether or not
> this is strictly an issue with the LocalCluster and how I am using it.
>
>
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("spoutId", new RandomNumberSpout());
> builder.addWorkerHook(new BaseWorkerHook());
> StormTopology topology = builder.createTopology();
> Config config = new Config();
> config.setMessageTimeoutSecs(1);
> String topologyName = "dummy-topology";
>
> LocalCluster cluster = new LocalCluster();
> cluster.submitTopology(topologyName, config, topology);
> Thread.sleep(5000);
> cluster.killTopology(topologyName);
> Thread.sleep(1);
> cluster.shutdown();
>
>
> Produces:
>
>
> java.lang.RuntimeException: java.io.EOFException
>
> at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:185)
> at org.apache.storm.daemon.worker$run_worker_shutdown_
> hooks$iter__8540__8544$fn__8545.invoke(worker.clj:576)
> at clojure.lang.LazySeq.sval(LazySeq.java:40)
> at clojure.lang.LazySeq.seq(LazySeq.java:49)
> at clojure.lang.RT.seq(RT.java:507)
> at clojure.core$seq__4128.invoke(core.clj:137)
> at clojure.core$dorun.invoke(core.clj:3009)
> at clojure.core$doall.invoke(core.clj:3025)
> at org.apache.storm.daemon.worker$run_worker_shutdown_
> hooks.invoke(worker.clj:574)
> at org.apache.storm.daemon.worker$fn__8555$exec_fn__2466_
> _auto__$reify__8557$shutdown_STAR___8577.invoke(worker.clj:691)
> at org.apache.storm.daemon.worker$fn__8555$exec_fn__2466_
> _auto__$reify$reify__8603.shutdown(worker.clj:704)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
> at org.apache.storm.process_simulator$kill_process.invoke(
> process_simulator.clj:46)
> at org.apache.storm.daemon.supervisor$shutdown_worker.
> invoke(supervisor.clj:286)
> at org.apache.storm.daemon.supervisor$fn__9307$exec_fn__
> 2466__auto__$reify__9332.shutdown_all_workers(supervisor.clj:852)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
> at org.apache.storm.testing$kill_local_storm_cluster.invoke(
> testing.clj:199)
> at org.apache.storm.LocalCluster$_shutdown.invoke(LocalCluster.clj:66)
> at org.apache.storm.LocalCluster.shutdown(Unknown Source)
>
>
>


Re: problem with shuffleGrouping

2016-11-21 Thread Kevin Peek
I played around a little bit with Stephen's test and it seems that the
Collection.shuffle() call here is causing the problem (at least the problem
Stephen is talking about).
https://github.com/apache/storm/blob/1.0.x-branch/storm-core/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java#L58

I created a ticket to address this uneven task distribution:
https://issues.apache.org/jira/browse/STORM-2210

On Mon, Nov 21, 2016 at 11:20 AM, Stephen Powis 
wrote:

> So we've seen some weird distributions using ShuffleGrouping as well.  I
> noticed there's no test case for ShuffleGrouping and got curious.  Also the
> implementation seemed overly complicated (in my head anyhow, perhaps
> there's a reason for it?) so I put together a much more simple version of
> round robin shuffling.
>
> Gist here: https://gist.github.com/Crim/61537958df65a5e13b3844b2d5e28cde
>
> Its possible I've setup my test cases incorrectly, but it seems like when
> using multiple threads in my test ShuffleGrouping provides wildly un-even
> distribution?  In the Javadocs above each test case I've pasted the output
> that I get locally.
>
> Thoughts?
>
> On Sat, Nov 19, 2016 at 2:49 AM, Ohad Edelstein  wrote:
>
>> It happened to you also?
>> We are upgrading from 0.9.3 to 1.0.1,
>> In 0.9.3 we didn’t have that problem.
>>
>> But Ones I use localOrShuffle the messages are send only to the same
>> machine.
>>
>> From: Chien Le 
>> Reply-To: "user@storm.apache.org" 
>> Date: Saturday, 19 November 2016 at 6:05
>> To: "user@storm.apache.org" 
>> Subject: Re: Testing serializers with multiple workers
>>
>> Ohad,
>>
>>
>> We found that we had to use localOrShuffle grouping in order to see
>> activity in the same worker as the spout.
>>
>>
>> -Chien
>>
>>
>> --
>> *From:* Ohad Edelstein 
>> *Sent:* Friday, November 18, 2016 8:38:35 AM
>> *To:* user@storm.apache.org
>> *Subject:* Re: Testing serializers with multiple workers
>>
>> Hello,
>>
>> We just finished setting up storm 1.0.1 with 3 supervisors and one nimbus
>> machine.
>> Total of 4 machines in aws.
>>
>> We see the following phanomenon:
>> lets say spout on host2,
>> host1 - using 100% cpu
>> host3 - using 100% cpu
>> host2 - idle (some message are being handled by it, not many)
>> its not slots problem, we have even amount of bolts.
>>
>> We also tried to deploy only 2 host, and the same thing happened, the
>> host with the spout is idle, the other host at 100% cpu.
>>
>> We switched from shuffleGrouping to noneGrouping, and its seems to work,
>> The documentation says that:
>> None grouping: This grouping specifies that you don't care how the stream
>> is grouped. Currently, none groupings are equivalent to shuffle groupings.
>> Eventually though, Storm will push down bolts with none groupings to
>> execute in the same thread as the bolt or spout they subscribe from (when
>> possible).
>>
>> We are still trying to understand what is wrong with shuffleGrouping in
>> our system,
>>
>> Any ideas?
>>
>> Thanks!
>>
>> From: Aaron Niskodé-Dossett 
>> Reply-To: "user@storm.apache.org" 
>> Date: Friday, 18 November 2016 at 17:04
>> To: "user@storm.apache.org" 
>> Subject: Re: Testing serializers with multiple workers
>>
>> Hit send too soon... that really is the option :-)
>>
>> On Fri, Nov 18, 2016 at 9:03 AM Aaron Niskodé-Dossett 
>> wrote:
>>
>>> topology.testing.always.try.serialize = true
>>>
>>> On Fri, Nov 18, 2016 at 8:57 AM Kristopher Kane 
>>> wrote:
>>>
>>> Does anyone have any techniques for testing serializers that would only
>>> surface when the serializer is uses in a multi-worker topology?
>>>
>>> Kris
>>>
>>>
>


Is LoadAwareShuffleGrouping The Default?

2016-11-22 Thread Kevin Peek
Hello. I have a question about the load aware shuffle grouping.

If I create a grouping like below, will I end up with a
LoadAwareShuffleGrouping by default?

topologyBuilder.setBolt("boltId", new
SomeBolt()).shuffleGrouping("otherBoltId");

I would expect this to result in a traditional ShuffleGrouping, but I think
it does not based on the following:

(1) the config option TOPOLOGY-DISABLE-LOADAWARE-MESSAGING defaults to
false.
(2) This block of code seems to create a LoadAwareShuffleGrouping
automatically if load messaging is enabled (the default).
https://github.com/revans2/incubator-storm/blob/024ff263800e4032d7b2395fbbf072e0183ebb5b/storm-core/src/clj/backtype/storm/daemon/executor.clj#L66


Is this correct? Is load aware the default behavior in Storm 1.0.x?


Re: Is LoadAwareShuffleGrouping The Default?

2016-11-22 Thread Kevin Peek
My apologies, the code I referenced above is old. I believe the correct
references are these:

https://github.com/apache/storm/blob/cd5c9e8f904205a6ca6eee9222ca954ca8b37ec3/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java#L63

https://github.com/apache/storm/blob/master/conf/defaults.yaml#L261

Based on these, I believe the LoadAwareShuffleGrouping is the default. Can
anyone more familiar with the code confirm this?

On Tue, Nov 22, 2016 at 11:43 AM, Kevin Peek  wrote:

> Hello. I have a question about the load aware shuffle grouping.
>
> If I create a grouping like below, will I end up with a
> LoadAwareShuffleGrouping by default?
>
> topologyBuilder.setBolt("boltId", new SomeBolt()).shuffleGrouping("
> otherBoltId");
>
> I would expect this to result in a traditional ShuffleGrouping, but I
> think it does not based on the following:
>
> (1) the config option TOPOLOGY-DISABLE-LOADAWARE-MESSAGING defaults to
> false.
> (2) This block of code seems to create a LoadAwareShuffleGrouping
> automatically if load messaging is enabled (the default).
> https://github.com/revans2/incubator-storm/blob/
> 024ff263800e4032d7b2395fbbf072e0183ebb5b/storm-core/src/clj/
> backtype/storm/daemon/executor.clj#L66
>
>
> Is this correct? Is load aware the default behavior in Storm 1.0.x?
>


Re: Spout failures too high

2016-12-12 Thread Kevin Peek
The most common reason for spout failures but no bolt failures is tuple
timeouts. Try increasing MESSAGE_TIMEOUT_SECONDS. If you are seeing very
low execute latencies for your bolts but very high complete latency, you
can also try decreasing SPOUT_MAX_PENDING.

Hopefully this helps.

On Fri, Dec 9, 2016 at 10:26 PM, pradeep s 
wrote:

> Hi,
> We are running a 5 node cluster(2x large ec2) with below config. Topology
> is consuming message from SQS and writing to RDS and S3. Even if there are
> no bolt failures , we are seeing many spout failures.
> Can you please help in checking the config.Also i am setting tasks as
> parallelism count * 2. Is this fine?
>
> TOPOLOGY_NAME=MDP_STORM_PRD
> MARIA_BOLT_PARALLELISM=50
> S3_BOLT_PARALLELISM=50
> SQS_DELETE_BOLT_PARALLELISM=100
> SPOUT_PARALLELISM=50
> NUMBER_OF_WORKERS=5
> NUMBER_OF_ACKERS=5
> SPOUT_MAX_PENDING=5000
> MESSAGE_TIMEOUT_SECONDS=240
>
>
> Topology Code
> =
>
> Config config = new Config();
> config.setNumWorkers(numWorkers);
> config.setDebug(false);
> config.setNumAckers(numAckers);
> config.setMaxSpoutPending(maxSpoutPending);
> config.setMessageTimeoutSecs(messageTimeoutSecs);
>
>
> topologyBuilder.setSpout(spoutId, new 
> SQSMessageReaderSpout(sqsUtils.getSQSUrl(dataQueue),
> properties),
> spoutParallelism).*setNumTasks(spoutParallelism * TWO);*
> topologyBuilder.setBolt(mariaBoltId, new MariaDbBolt(properties),
> mariaBoltParallelism)
> .*setNumTasks(mariaBoltParallelism * TWO)*.fieldsGrouping(spoutId,
> new Fields(MESSAGE_ID));
>
> topologyBuilder.setBolt(s3BoltId, new S3WriteBolt(properties,
> s3Properties), s3BoltParallelism)
> .*setNumTasks(s3BoltParallelism * TWO).*shuffleGrouping(
> mariaBoltId);
>
> topologyBuilder
> .setBolt(sqsDeleteBoltId, new 
> SQSMessageDeleteBolt(sqsUtils.getSQSUrl(dataQueue)),
> sqsBoltParallelism)
> .*setNumTasks(sqsBoltParallelism * TWO)*.shuffleGrouping(s3BoltId)
> ;
>
> StormSubmitter.submitTopology(topologyName, config, topologyBuilder.
> createTopology());
>
>
> Regards
>
> Pradeep S
>


Re: Message distribution among workers

2016-12-13 Thread Kevin Peek
So, how the messages are distributed depends on the type of Stream Grouping
you select. Probably the most common choice is ShuffleGrouping which does
send an equal number of tuples to each bolt instance.

The various types of stream groupings are described here (scroll down to
the Stream Groupings heading):
http://storm.apache.org/releases/current/Concepts.html

Storm 1.x added the LoadAwareShuffleGrouping, which distributes tuples
randomly, but gives instances under light load a higher probability of
being chosen. It is missing from the document above, but perhaps it would
be helpful to you if you are worried about how many tuples each Worker
gets.

LoadAwareShuffleGrouping:
https://github.com/apache/storm/blob/cd5c9e8f904205a6ca6eee9222ca954ca8b37ec3/storm-core/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java

On Tue, Dec 13, 2016 at 12:20 AM, anshu shukla 
wrote:

> Hello ,
>
>
> I am unable to find in the documentation that  how messages are
> distributed to the workers of same bolt . *Are they distributed equally
> among all the workers ?*
>
>
> *This will impact the performance if thread  of same bolt are not
> uniformly  distributed among multiple workers.*
>
> --
> Thanks & Regards,
> Anshu Shukla
>