[jira] [Commented] (STORM-677) Maximum retries strategy may cause data loss

2015-02-19 Thread Nathan Marz (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14327642#comment-14327642
 ] 

Nathan Marz commented on STORM-677:
---

Option 2 doesn't have to be long term as it should be easy to implement. I do 
not view the options as looking very similar as I think Option 2 will be 
significantly more robust – getting out of a weird state as fast as possible is 
really important.

If that itself can cause other workers to give up on a connection it could 
result in the topology never reaching a stable state. – This is exactly why 
the amount of time attempting to make a connection must be related to the start 
timeout for a worker. 

 Maximum retries strategy may cause data loss
 

 Key: STORM-677
 URL: https://issues.apache.org/jira/browse/STORM-677
 Project: Apache Storm
  Issue Type: Bug
Affects Versions: 0.9.3, 0.10.0
Reporter: Michael Noll
Priority: Minor
  Labels: Netty

 h3. Background
 Storm currently supports the configuration setting 
 storm.messaging.netty.max_retries.  This setting is supposed to limit the 
 number of reconnection attempts a Netty client will perform in case of a 
 connection loss.
 Unfortunately users have run into situations where this behavior will result 
 in data loss:
 {quote}
 https://github.com/apache/storm/pull/429/files#r24681006
 This could be a separate JIRA, but we ran into a situation where we hit the 
 maximum number of reconnection attempts, and the exception was eaten because 
 it was thrown from a background thread and it just killed the background 
 thread. This code appears to do the same thing.
 {quote}
 The problem can be summarized by the following example:  Once a Netty client 
 hits the maximum number of connection retries, it will stop trying to 
 reconnect (as intended) but will also continue to run forever without being 
 able to send any messages to its designated remote targets.  At this point 
 data will be lost because any messages that the Netty client is supposed to 
 send will be dropped (by design).  And since the Netty client is still alive 
 and thus considered functional, Storm is not able to do something about 
 this data loss situation.
 For a more detailed description please take a look at the discussion in 
 https://github.com/apache/storm/pull/429/files#r24742354.
 h3. Possible solutions
 (Most of this section is copy-pasted from an [earlier discussion on this 
 problem|https://github.com/apache/storm/pull/429/files#r24742354].)
 There are at least three approaches we may consider:
 # Let the Netty client die if max retries is reached, so that the Storm task 
 has the chance to re-create a client and thus break out of the client's 
 discard-messages-forever state.
 # Let the parent Storm task die if (one of its possibly many) Netty clients 
 dies, so that by restarting the task we'll also get a new Netty client.
 # Remove the max retries semantics as well as the corresponding setting from 
 Storm's configuration. Here, a Netty client will continue to reconnect to a 
 remote destination forever. The possible negative impact of these reconnects 
 (e.g. number of TCP connection attempts in a cluster) are kept in check by 
 our exponential backoff policy for such connection retries.
 My personal opinion on these three approaches:
 - I do not like (1) because I feel it introduces potentially confusing 
 semantics: We keep having a max retries setting, but it is not really a hard 
 limit anymore. It rather becomes a max retries until we recreate a Netty 
 client, and would also reset any exponential backoff strategy of the 
 previous Netty client instance (cf. StormBoundedExponentialBackoffRetry). 
 If we do want such resets (but I don't think we do at this point), then a 
 cleaner approach would be to implement such resetting inside the retry policy 
 (again, cf. StormBoundedExponentialBackoffRetry).
 - I do not like (2) because a single bad Netty client would be able to take 
 down a Storm task, which among other things would also impact any other, 
 working Netty clients of the Storm task.
 - Option (3) seems a reasonable approach, although it breaks backwards 
 compatibility with regard to Storm's configuration (because we'd now ignore 
 storm.messaging.netty.max_retries).
 Here's initial feedback from other developers:
 {quote}
 https://github.com/apache/storm/pull/429/files#r24824540
 revans2: I personally prefer option 3, no maximum number of reconnection 
 attempts. Having the client decide that it is done, before nimbus does feels 
 like it is asking for trouble.
 {quote}
 {quote}
 https://github.com/ptgoetz
 ptgoetz: I'm in favor of option 3 as well. I'm not that concerned about 
 storm.messaging.netty.max_retries being ignored. We could probably just log a 
 warning that that 

[jira] [Commented] (STORM-561) Add ability to create topologies dynamically

2014-12-03 Thread Nathan Marz (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14233947#comment-14233947
 ] 

Nathan Marz commented on STORM-561:
---

[~amontalenti] Why are you using the Clojure DSL for streamparse? I think 
you'll find things a lot easier to just create the Thrift objects directly in 
Python code. Storm is designed so that you don't even have to touch Java or any 
JVM language in order to construct and submit topologies.

Storm already has the ability to create topologies completely dynamically, 
where we define dynamically as not needing any recompilation. Storm's Thrift 
interfaces allow you to:

1. Create spout and bolt object of any language implementation from any other 
language. The ComponentObject struct can be specified either as a serialized 
java object, a java class name + arguments, or as a ShellComponent (for running 
spouts/bolts via subprocesses to allow for other languages).
2. Define topologies from any language. Storm gets this for free by the nature 
of topologies being Thrift objects.
3. Submit topologies from any language. Again, Storm gets this for free by the 
nature of Nimbus being a Thrift server and topologies being Thrift objects.

This means that the majority of what's been proposed in this issue is redundant 
with what's already in Storm. The exception is Trident, but that should have 
its own issue opened for this feature.

The direction I'd like to see a patch for this issue go is making a nice 
*library* in an interpreted language (like Python) that makes a pretty wrapper 
interface around the Thrift stuff, since manipulating Thrift objects directly 
is a little verbose. In addition, it can handle packaging of any artifacts the 
topology will need (like spout and bolt implementations) into a .jar file. The 
generated Python code for manipulating the Thrift structures is packaged with 
Storm at storm-core/src/py/. 


 Add ability to create topologies dynamically
 

 Key: STORM-561
 URL: https://issues.apache.org/jira/browse/STORM-561
 Project: Apache Storm
  Issue Type: Improvement
Reporter: Nathan Leung
Assignee: Nathan Leung
   Original Estimate: 336h
  Remaining Estimate: 336h

 It would be nice if a storm topology could be built dynamically, instead of 
 requiring a recompile to change parameters (e.g. number of workers, number of 
 tasks, layout, etc).
 I would propose the following data structures for building core storm 
 topologies.  I haven't done a design for trident yet but the intention would 
 be to add trident support when core storm support is complete (or in parallel 
 if there are other people working on it):
 {code}
 // fields value and arguments are mutually exclusive
 class Argument {
 String argumentType;  // Class used to lookup arguments in 
 method/constructor
 String implementationType; // Class used to create this argument
 String value; // String used to construct this argument
 ListArgument arguments; // arguments used to build this argument
 }
 class Dependency {
 String upstreamComponent; // name of upstream component
 String grouping;
 ListArgument arguments; // arguments for the grouping
 }
 class StormSpout {
 String name;
 String klazz;  // Class of this spout
 List Argument arguments;
 int numTasks;
 int numExecutors;
 }
 class StormBolt {
 String name;
 String klazz; // Class of this bolt
 List Argument arguments;
 int numTasks;
 int numExecutors;
 ListDependency dependencies;
 }
 class StormTopologyRepresentation {
 String name;
 ListStormSpout spouts;
 ListStormBolt bolts;
 Map config;
 int numWorkers;
 }
 {code}
 Topology creation will be built on top of the data structures above.  The 
 benefits:
 * Dependency free.  Code to unmarshal from json, xml, etc, can be kept in 
 extensions, or as examples, and users can write a different unmarshaller if 
 they want to use a different text representation.
 * support arbitrary spout and bolts types
 * support of all groupings, streams, via reflections
 * ability to specify configuration map via config file
 * reification of spout / bolt / dependency arguments
 ** recursive argument reification for complex objects



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (STORM-561) Add ability to create topologies dynamically

2014-12-03 Thread Nathan Marz (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14233947#comment-14233947
 ] 

Nathan Marz edited comment on STORM-561 at 12/4/14 6:36 AM:


[~amontalenti] Why are you using the Clojure DSL for streamparse? I think 
you'll find things a lot easier to just create the Thrift objects directly in 
Python code. Storm is designed so that you don't even have to touch Java or any 
JVM language in order to construct and submit topologies.

Storm already has the ability to create topologies completely dynamically, 
where we define dynamically as not needing any recompilation. Storm's Thrift 
interfaces allow you to:

1. Create spout and bolt object of any language implementation from any other 
language. The ComponentObject struct can be specified either as a serialized 
java object, a java class name + arguments, or as a ShellComponent (for running 
spouts/bolts via subprocesses to allow for other languages).
2. Define topologies from any language. Storm gets this for free by the nature 
of topologies being Thrift objects.
3. Submit topologies from any language. Again, Storm gets this for free by the 
nature of Nimbus being a Thrift server and topologies being Thrift objects.

This means that the majority of what's been proposed in this issue is redundant 
with what's already in Storm. The exception is Trident, but that should have 
its own issue opened for this feature.

The direction I'd like to see a patch for this issue go is making a nice 
*library* in an interpreted language (like Python) that makes a pretty wrapper 
interface around the Thrift stuff, since manipulating Thrift objects directly 
is a little verbose. In addition, it can handle packaging of any artifacts the 
topology will need (like spout and bolt implementations) into a .jar file. The 
generated Python code for manipulating the Thrift structures is packaged with 
Storm at storm-core/src/py/. 

For reference, here's the storm.thrift file: 
https://github.com/apache/storm/blob/master/storm-core/src/storm.thrift



was (Author: marz):
[~amontalenti] Why are you using the Clojure DSL for streamparse? I think 
you'll find things a lot easier to just create the Thrift objects directly in 
Python code. Storm is designed so that you don't even have to touch Java or any 
JVM language in order to construct and submit topologies.

Storm already has the ability to create topologies completely dynamically, 
where we define dynamically as not needing any recompilation. Storm's Thrift 
interfaces allow you to:

1. Create spout and bolt object of any language implementation from any other 
language. The ComponentObject struct can be specified either as a serialized 
java object, a java class name + arguments, or as a ShellComponent (for running 
spouts/bolts via subprocesses to allow for other languages).
2. Define topologies from any language. Storm gets this for free by the nature 
of topologies being Thrift objects.
3. Submit topologies from any language. Again, Storm gets this for free by the 
nature of Nimbus being a Thrift server and topologies being Thrift objects.

This means that the majority of what's been proposed in this issue is redundant 
with what's already in Storm. The exception is Trident, but that should have 
its own issue opened for this feature.

The direction I'd like to see a patch for this issue go is making a nice 
*library* in an interpreted language (like Python) that makes a pretty wrapper 
interface around the Thrift stuff, since manipulating Thrift objects directly 
is a little verbose. In addition, it can handle packaging of any artifacts the 
topology will need (like spout and bolt implementations) into a .jar file. The 
generated Python code for manipulating the Thrift structures is packaged with 
Storm at storm-core/src/py/. 


 Add ability to create topologies dynamically
 

 Key: STORM-561
 URL: https://issues.apache.org/jira/browse/STORM-561
 Project: Apache Storm
  Issue Type: Improvement
Reporter: Nathan Leung
Assignee: Nathan Leung
   Original Estimate: 336h
  Remaining Estimate: 336h

 It would be nice if a storm topology could be built dynamically, instead of 
 requiring a recompile to change parameters (e.g. number of workers, number of 
 tasks, layout, etc).
 I would propose the following data structures for building core storm 
 topologies.  I haven't done a design for trident yet but the intention would 
 be to add trident support when core storm support is complete (or in parallel 
 if there are other people working on it):
 {code}
 // fields value and arguments are mutually exclusive
 class Argument {
 String argumentType;  // Class used to lookup arguments in 
 method/constructor
 String implementationType; // Class used to create 

[jira] [Resolved] (STORM-523) Project.clj missing

2014-10-09 Thread Nathan Marz (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nathan Marz resolved STORM-523.
---
Resolution: Not a Problem

-1, Storm's build is Maven now. Maintaing two separate builds would be a 
nightmare.

 Project.clj missing
 ---

 Key: STORM-523
 URL: https://issues.apache.org/jira/browse/STORM-523
 Project: Apache Storm
  Issue Type: Improvement
Reporter: nicolas ginder
Priority: Minor

 project.clj files are missing. There are only pom.xml files, it would be good 
 to generate project.clj from pom.xml. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-404) Worker on one machine crashes due to a failure of another worker on another machine

2014-10-05 Thread Nathan Marz (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14159592#comment-14159592
 ] 

Nathan Marz commented on STORM-404:
---

Do you see this problem with the ZeroMQ transport or just with Netty? The big 
change from 0.8.* to 0.9.* was making Netty the default. The behavior of the 
transport is supposed to be to just drop messages if it can't connect to the 
other worker (or buffer and drop once the reassignment is received). It sounds 
like the Netty transport might not be doing this.

 Worker on one machine crashes due to a failure of another worker on another 
 machine
 ---

 Key: STORM-404
 URL: https://issues.apache.org/jira/browse/STORM-404
 Project: Apache Storm
  Issue Type: Bug
Affects Versions: 0.9.2-incubating
Reporter: Itai Frenkel

 I have two workers (one on each machine). The first worker(10.30.206.125) had 
 a problem starting (could not find Nimbus host), however the second worker 
 crashed too since it could not connect to the first worker.
 This looks like a cascading failure, which seems like a bug.
 2014-07-15 17:43:32 b.s.m.n.Client [INFO] Reconnect started for 
 Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [17]
 2014-07-15 17:43:33 b.s.m.n.Client [INFO] Reconnect started for 
 Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [18]
 2014-07-15 17:43:34 b.s.m.n.Client [INFO] Reconnect started for 
 Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [19]
 2014-07-15 17:43:35 b.s.m.n.Client [INFO] Reconnect started for 
 Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [20]
 2014-07-15 17:43:36 b.s.m.n.Client [INFO] Reconnect started for 
 Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [21]
 2014-07-15 17:43:37 b.s.m.n.Client [INFO] Reconnect started for 
 Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [22]
 2014-07-15 17:43:38 b.s.m.n.Client [INFO] Reconnect started for 
 Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [23]
 2014-07-15 17:43:39 b.s.m.n.Client [INFO] Reconnect started for 
 Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [24]
 2014-07-15 17:43:40 b.s.m.n.Client [INFO] Reconnect started for 
 Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [25]
 2014-07-15 17:43:41 b.s.m.n.Client [INFO] Reconnect started for 
 Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [26]
 2014-07-15 17:43:42 b.s.m.n.Client [INFO] Reconnect started for 
 Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [27]
 2014-07-15 17:43:43 b.s.m.n.Client [INFO] Reconnect started for 
 Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [28]
 2014-07-15 17:43:44 b.s.m.n.Client [INFO] Reconnect started for 
 Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [29]
 2014-07-15 17:43:45 b.s.m.n.Client [INFO] Reconnect started for 
 Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [30]
 2014-07-15 17:43:46 b.s.m.n.Client [INFO] Closing Netty Client 
 Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700
 2014-07-15 17:43:46 b.s.m.n.Client [INFO] Waiting for pending batchs to be 
 sent with Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700..., 
 timeout: 60ms, pendings: 0
 2014-07-15 17:43:46 b.s.util [ERROR] Async loop died!
 java.lang.RuntimeException: java.lang.RuntimeException: Client is being 
 closed, and does not take requests any more
 at 
 backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
  ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
 at 
 backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
  ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
 at 
 backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
  ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
 at 
 backtype.storm.disruptor$consume_loop_STAR_$fn__758.invoke(disruptor.clj:94) 
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
 at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) 
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
 at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
 at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60]
 Caused by: java.lang.RuntimeException: Client is being closed, and does not 
 take requests any more
 at backtype.storm.messaging.netty.Client.send(Client.java:194) 
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
 at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) 
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
 at 
 backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5927$fn__5928.invoke(worker.clj:322)