[jira] [Commented] (STORM-677) Maximum retries strategy may cause data loss
[ https://issues.apache.org/jira/browse/STORM-677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14327642#comment-14327642 ] Nathan Marz commented on STORM-677: --- Option 2 doesn't have to be long term as it should be easy to implement. I do not view the options as looking very similar as I think Option 2 will be significantly more robust – getting out of a weird state as fast as possible is really important. If that itself can cause other workers to give up on a connection it could result in the topology never reaching a stable state. – This is exactly why the amount of time attempting to make a connection must be related to the start timeout for a worker. Maximum retries strategy may cause data loss Key: STORM-677 URL: https://issues.apache.org/jira/browse/STORM-677 Project: Apache Storm Issue Type: Bug Affects Versions: 0.9.3, 0.10.0 Reporter: Michael Noll Priority: Minor Labels: Netty h3. Background Storm currently supports the configuration setting storm.messaging.netty.max_retries. This setting is supposed to limit the number of reconnection attempts a Netty client will perform in case of a connection loss. Unfortunately users have run into situations where this behavior will result in data loss: {quote} https://github.com/apache/storm/pull/429/files#r24681006 This could be a separate JIRA, but we ran into a situation where we hit the maximum number of reconnection attempts, and the exception was eaten because it was thrown from a background thread and it just killed the background thread. This code appears to do the same thing. {quote} The problem can be summarized by the following example: Once a Netty client hits the maximum number of connection retries, it will stop trying to reconnect (as intended) but will also continue to run forever without being able to send any messages to its designated remote targets. At this point data will be lost because any messages that the Netty client is supposed to send will be dropped (by design). And since the Netty client is still alive and thus considered functional, Storm is not able to do something about this data loss situation. For a more detailed description please take a look at the discussion in https://github.com/apache/storm/pull/429/files#r24742354. h3. Possible solutions (Most of this section is copy-pasted from an [earlier discussion on this problem|https://github.com/apache/storm/pull/429/files#r24742354].) There are at least three approaches we may consider: # Let the Netty client die if max retries is reached, so that the Storm task has the chance to re-create a client and thus break out of the client's discard-messages-forever state. # Let the parent Storm task die if (one of its possibly many) Netty clients dies, so that by restarting the task we'll also get a new Netty client. # Remove the max retries semantics as well as the corresponding setting from Storm's configuration. Here, a Netty client will continue to reconnect to a remote destination forever. The possible negative impact of these reconnects (e.g. number of TCP connection attempts in a cluster) are kept in check by our exponential backoff policy for such connection retries. My personal opinion on these three approaches: - I do not like (1) because I feel it introduces potentially confusing semantics: We keep having a max retries setting, but it is not really a hard limit anymore. It rather becomes a max retries until we recreate a Netty client, and would also reset any exponential backoff strategy of the previous Netty client instance (cf. StormBoundedExponentialBackoffRetry). If we do want such resets (but I don't think we do at this point), then a cleaner approach would be to implement such resetting inside the retry policy (again, cf. StormBoundedExponentialBackoffRetry). - I do not like (2) because a single bad Netty client would be able to take down a Storm task, which among other things would also impact any other, working Netty clients of the Storm task. - Option (3) seems a reasonable approach, although it breaks backwards compatibility with regard to Storm's configuration (because we'd now ignore storm.messaging.netty.max_retries). Here's initial feedback from other developers: {quote} https://github.com/apache/storm/pull/429/files#r24824540 revans2: I personally prefer option 3, no maximum number of reconnection attempts. Having the client decide that it is done, before nimbus does feels like it is asking for trouble. {quote} {quote} https://github.com/ptgoetz ptgoetz: I'm in favor of option 3 as well. I'm not that concerned about storm.messaging.netty.max_retries being ignored. We could probably just log a warning that that
[jira] [Commented] (STORM-561) Add ability to create topologies dynamically
[ https://issues.apache.org/jira/browse/STORM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14233947#comment-14233947 ] Nathan Marz commented on STORM-561: --- [~amontalenti] Why are you using the Clojure DSL for streamparse? I think you'll find things a lot easier to just create the Thrift objects directly in Python code. Storm is designed so that you don't even have to touch Java or any JVM language in order to construct and submit topologies. Storm already has the ability to create topologies completely dynamically, where we define dynamically as not needing any recompilation. Storm's Thrift interfaces allow you to: 1. Create spout and bolt object of any language implementation from any other language. The ComponentObject struct can be specified either as a serialized java object, a java class name + arguments, or as a ShellComponent (for running spouts/bolts via subprocesses to allow for other languages). 2. Define topologies from any language. Storm gets this for free by the nature of topologies being Thrift objects. 3. Submit topologies from any language. Again, Storm gets this for free by the nature of Nimbus being a Thrift server and topologies being Thrift objects. This means that the majority of what's been proposed in this issue is redundant with what's already in Storm. The exception is Trident, but that should have its own issue opened for this feature. The direction I'd like to see a patch for this issue go is making a nice *library* in an interpreted language (like Python) that makes a pretty wrapper interface around the Thrift stuff, since manipulating Thrift objects directly is a little verbose. In addition, it can handle packaging of any artifacts the topology will need (like spout and bolt implementations) into a .jar file. The generated Python code for manipulating the Thrift structures is packaged with Storm at storm-core/src/py/. Add ability to create topologies dynamically Key: STORM-561 URL: https://issues.apache.org/jira/browse/STORM-561 Project: Apache Storm Issue Type: Improvement Reporter: Nathan Leung Assignee: Nathan Leung Original Estimate: 336h Remaining Estimate: 336h It would be nice if a storm topology could be built dynamically, instead of requiring a recompile to change parameters (e.g. number of workers, number of tasks, layout, etc). I would propose the following data structures for building core storm topologies. I haven't done a design for trident yet but the intention would be to add trident support when core storm support is complete (or in parallel if there are other people working on it): {code} // fields value and arguments are mutually exclusive class Argument { String argumentType; // Class used to lookup arguments in method/constructor String implementationType; // Class used to create this argument String value; // String used to construct this argument ListArgument arguments; // arguments used to build this argument } class Dependency { String upstreamComponent; // name of upstream component String grouping; ListArgument arguments; // arguments for the grouping } class StormSpout { String name; String klazz; // Class of this spout List Argument arguments; int numTasks; int numExecutors; } class StormBolt { String name; String klazz; // Class of this bolt List Argument arguments; int numTasks; int numExecutors; ListDependency dependencies; } class StormTopologyRepresentation { String name; ListStormSpout spouts; ListStormBolt bolts; Map config; int numWorkers; } {code} Topology creation will be built on top of the data structures above. The benefits: * Dependency free. Code to unmarshal from json, xml, etc, can be kept in extensions, or as examples, and users can write a different unmarshaller if they want to use a different text representation. * support arbitrary spout and bolts types * support of all groupings, streams, via reflections * ability to specify configuration map via config file * reification of spout / bolt / dependency arguments ** recursive argument reification for complex objects -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (STORM-561) Add ability to create topologies dynamically
[ https://issues.apache.org/jira/browse/STORM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14233947#comment-14233947 ] Nathan Marz edited comment on STORM-561 at 12/4/14 6:36 AM: [~amontalenti] Why are you using the Clojure DSL for streamparse? I think you'll find things a lot easier to just create the Thrift objects directly in Python code. Storm is designed so that you don't even have to touch Java or any JVM language in order to construct and submit topologies. Storm already has the ability to create topologies completely dynamically, where we define dynamically as not needing any recompilation. Storm's Thrift interfaces allow you to: 1. Create spout and bolt object of any language implementation from any other language. The ComponentObject struct can be specified either as a serialized java object, a java class name + arguments, or as a ShellComponent (for running spouts/bolts via subprocesses to allow for other languages). 2. Define topologies from any language. Storm gets this for free by the nature of topologies being Thrift objects. 3. Submit topologies from any language. Again, Storm gets this for free by the nature of Nimbus being a Thrift server and topologies being Thrift objects. This means that the majority of what's been proposed in this issue is redundant with what's already in Storm. The exception is Trident, but that should have its own issue opened for this feature. The direction I'd like to see a patch for this issue go is making a nice *library* in an interpreted language (like Python) that makes a pretty wrapper interface around the Thrift stuff, since manipulating Thrift objects directly is a little verbose. In addition, it can handle packaging of any artifacts the topology will need (like spout and bolt implementations) into a .jar file. The generated Python code for manipulating the Thrift structures is packaged with Storm at storm-core/src/py/. For reference, here's the storm.thrift file: https://github.com/apache/storm/blob/master/storm-core/src/storm.thrift was (Author: marz): [~amontalenti] Why are you using the Clojure DSL for streamparse? I think you'll find things a lot easier to just create the Thrift objects directly in Python code. Storm is designed so that you don't even have to touch Java or any JVM language in order to construct and submit topologies. Storm already has the ability to create topologies completely dynamically, where we define dynamically as not needing any recompilation. Storm's Thrift interfaces allow you to: 1. Create spout and bolt object of any language implementation from any other language. The ComponentObject struct can be specified either as a serialized java object, a java class name + arguments, or as a ShellComponent (for running spouts/bolts via subprocesses to allow for other languages). 2. Define topologies from any language. Storm gets this for free by the nature of topologies being Thrift objects. 3. Submit topologies from any language. Again, Storm gets this for free by the nature of Nimbus being a Thrift server and topologies being Thrift objects. This means that the majority of what's been proposed in this issue is redundant with what's already in Storm. The exception is Trident, but that should have its own issue opened for this feature. The direction I'd like to see a patch for this issue go is making a nice *library* in an interpreted language (like Python) that makes a pretty wrapper interface around the Thrift stuff, since manipulating Thrift objects directly is a little verbose. In addition, it can handle packaging of any artifacts the topology will need (like spout and bolt implementations) into a .jar file. The generated Python code for manipulating the Thrift structures is packaged with Storm at storm-core/src/py/. Add ability to create topologies dynamically Key: STORM-561 URL: https://issues.apache.org/jira/browse/STORM-561 Project: Apache Storm Issue Type: Improvement Reporter: Nathan Leung Assignee: Nathan Leung Original Estimate: 336h Remaining Estimate: 336h It would be nice if a storm topology could be built dynamically, instead of requiring a recompile to change parameters (e.g. number of workers, number of tasks, layout, etc). I would propose the following data structures for building core storm topologies. I haven't done a design for trident yet but the intention would be to add trident support when core storm support is complete (or in parallel if there are other people working on it): {code} // fields value and arguments are mutually exclusive class Argument { String argumentType; // Class used to lookup arguments in method/constructor String implementationType; // Class used to create
[jira] [Resolved] (STORM-523) Project.clj missing
[ https://issues.apache.org/jira/browse/STORM-523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nathan Marz resolved STORM-523. --- Resolution: Not a Problem -1, Storm's build is Maven now. Maintaing two separate builds would be a nightmare. Project.clj missing --- Key: STORM-523 URL: https://issues.apache.org/jira/browse/STORM-523 Project: Apache Storm Issue Type: Improvement Reporter: nicolas ginder Priority: Minor project.clj files are missing. There are only pom.xml files, it would be good to generate project.clj from pom.xml. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-404) Worker on one machine crashes due to a failure of another worker on another machine
[ https://issues.apache.org/jira/browse/STORM-404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14159592#comment-14159592 ] Nathan Marz commented on STORM-404: --- Do you see this problem with the ZeroMQ transport or just with Netty? The big change from 0.8.* to 0.9.* was making Netty the default. The behavior of the transport is supposed to be to just drop messages if it can't connect to the other worker (or buffer and drop once the reassignment is received). It sounds like the Netty transport might not be doing this. Worker on one machine crashes due to a failure of another worker on another machine --- Key: STORM-404 URL: https://issues.apache.org/jira/browse/STORM-404 Project: Apache Storm Issue Type: Bug Affects Versions: 0.9.2-incubating Reporter: Itai Frenkel I have two workers (one on each machine). The first worker(10.30.206.125) had a problem starting (could not find Nimbus host), however the second worker crashed too since it could not connect to the first worker. This looks like a cascading failure, which seems like a bug. 2014-07-15 17:43:32 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [17] 2014-07-15 17:43:33 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [18] 2014-07-15 17:43:34 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [19] 2014-07-15 17:43:35 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [20] 2014-07-15 17:43:36 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [21] 2014-07-15 17:43:37 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [22] 2014-07-15 17:43:38 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [23] 2014-07-15 17:43:39 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [24] 2014-07-15 17:43:40 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [25] 2014-07-15 17:43:41 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [26] 2014-07-15 17:43:42 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [27] 2014-07-15 17:43:43 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [28] 2014-07-15 17:43:44 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [29] 2014-07-15 17:43:45 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700... [30] 2014-07-15 17:43:46 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700 2014-07-15 17:43:46 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-ip-10-30-206-125.ec2.internal/10.30.206.125:6700..., timeout: 60ms, pendings: 0 2014-07-15 17:43:46 b.s.util [ERROR] Async loop died! java.lang.RuntimeException: java.lang.RuntimeException: Client is being closed, and does not take requests any more at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.disruptor$consume_loop_STAR_$fn__758.invoke(disruptor.clj:94) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60] Caused by: java.lang.RuntimeException: Client is being closed, and does not take requests any more at backtype.storm.messaging.netty.Client.send(Client.java:194) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5927$fn__5928.invoke(worker.clj:322)