Apache Storm Graduation to a TLP
I’m pleased to announce that Apache Storm has graduated to a Top-Level Project (TLP), and I’d like to thank everyone in the Storm community for your contributions and help in achieving this important milestone. As part of the graduation process, a number of infrastructure changes have taken place: New website url: http://storm.apache.org New git repo urls: https://git-wip-us.apache.org/repos/asf/storm.git (for committer push) g...@github.com:apache/storm.git -or- https://github.com/apache/storm.git (for github pull requests) Mailing Lists: If you are already subscribed, you’re subscription has been migrated. New messages should be sent to the new address: [list]@storm.apache.org This includes any subscribe/unsubscribe requests. Note: The mail-archives.apache.org site will not reflect these changes until October 1. Most of these changes have already occurred and are seamless. Please update your git remotes and address books accordingly. - Taylor signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Test timeout
We should change the fact that it is hard coded. I'd rather see it look for a system property (with a default value). That way it can be overridden easily, for example in resource-constrained environments (e.g. Jenkins). Feel free to open a JIRA ticket for this. I'd be happy to do it as well. -Taylor On Sep 8, 2014, at 4:31 PM, Devika Nair s.devikan...@gmail.com wrote: I can see that the test time out is set to 5000ms in https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/testing.clj. Is there anyway to override this value? On Mon, Sep 8, 2014 at 1:39 PM, Devika Nair s.devikan...@gmail.com wrote: Hi all, I am trying to follow the example on https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java and writing tests for my topology. However, my tests(similar to testBasicTopology()) fail due to timeout when I add a bolt(the prepare for this bolt takes quite long). Is there any way to increase the timeout? I have been looking around, but unable to find any. If I send my spout data to TestWordCounter it works. THIS WORKS public void run(ILocalCluster cluster) { // build the test topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(1, new MySpout(true), 1); builder.setBolt(2, new TestWordCounter(), 1).shuffleGrouping(1); THIS DOES NOT WORK public void run(ILocalCluster cluster) { // build the test topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(1, new MySpout(true), 1); builder.setBolt(2, new MyBolt(), 1).shuffleGrouping(1); Exception: java.lang.AssertionError: Test timed out (5000ms) at backtype.storm.testing$complete_topology.doInvoke(testing.clj:454) ~[storm-core-0.9.0.1.jar:na] at clojure.lang.RestFn.invoke(RestFn.java:826) ~[clojure-1.4.0.jar:na] at backtype.storm.testing4j$_completeTopology.invoke(testing4j.clj:45) ~[storm-core-0.9.0.1.jar:na] at backtype.storm.Testing.completeTopology(Unknown Source) [storm-core-0.9.0.1.jar:na] at analytics.AAMTopologyTest$1.run(AAMTopologyTest.java:162) ~[test-classes/:na] at backtype.storm.testing4j$_withSimulatedTimeLocalCluster$fn__7112.invoke(testing4j.clj:75) [storm-core-0.9.0.1.jar:na] at backtype.storm.testing4j$_withSimulatedTimeLocalCluster.invoke(testing4j.clj:75) [storm-core-0.9.0.1.jar:na] at backtype.storm.Testing.withSimulatedTimeLocalCluster(Unknown Source) [storm-core-0.9.0.1.jar:na] at analytics.AAMTopologyTest.testBasicTopology(AAMTopologyTest.java:90) [test-classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_65] at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_65] at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_65] at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_65] at junit.framework.TestCase.runTest(TestCase.java:168) [junit-4.8.2.jar:na] at junit.framework.TestCase.runBare(TestCase.java:134) [junit-4.8.2.jar:na] at junit.framework.TestResult$1.protect(TestResult.java:110) [junit-4.8.2.jar:na] Thanks, Devika
Re: cannot run ready project
What IDE are you using? On Sep 3, 2014, at 5:26 PM, researcher cs prog.researc...@gmail.com wrote: any help .. ? On 9/2/14, researcher cs prog.researc...@gmail.com wrote: i imported ready project and when run it i got this Resource Path Location Type The project was not built since its build path is incomplete. Cannot find the class file for storm.trident.state.State. Fix the build path then try building this project first-stories-twitter-master Unknown Java Problem The type storm.trident.state.State cannot be resolved. It is indirectly referenced from required .class files RecentTweetsDB.java /first-stories-twitter-master/src/main/java/trident/state can i find help on this ?
Re: Please fix the code samples in the documentation
Hi Andras, There are efforts underway to update and improve the Storm documentation. Contributions are always welcome if you'd like to get involved. -Taylor On Sep 2, 2014, at 7:22 PM, Andras Hatvani andras.hatv...@andrashatvani.com wrote: The Clojure DSL samples are merely a subset, but relevant, of course. Andras On 03 Sep 2014, at 00:28, Derek Dagit der...@yahoo-inc.com wrote: I think this has been pointed out before. It is being tracked: https://issues.apache.org/jira/browse/STORM-385 -- Derek On 9/2/14, 15:34, Andras Hatvani wrote: Hi, To the Storm-developers: Please fix the code samples in the documentation, because currently every single one is unformatted, without syntax highlighting and in one row. Thanks in advance, Andras
[DISCUSS] Apache Storm Release 0.9.3/0.10.0
I’d like to gather community feedback for the next two releases of Apache Storm. 0.9.3-incubating will be our next release. Please indicate (by JIRA ticket ID) which bug fixes and/or new features you would like to be considered for inclusion in the next release. If there is not an existing for a particular issue or feature, please consider adding one. For the next and subsequent releases, we will be using a slightly different approach than what we did in the past. Instead of voting right away on a build, we will make one or more “unofficial” release candidate builds available prior to voting on an official release. This will give the Apache Storm community more time to discuss, evaluate, identify and fix potential issues before the official release. This should enable us to ensure the final release is as bug free as possible. Apache Storm 0.10.0 (STORM-216) As some of you are aware, the engineering team at Yahoo! has done a lot of work to bring security and multi-tenancy to Storm, and has contributed that work back to the community. Over the past few months we have been in the process of enhancing and syncing that work with the master branch in a separate branch labeled “security.” That work is now nearing completion, and I would like us to consider merging it into master after the 0.9.3 release. Since the security work includes a large number of changes and enhancements, I propose we bump the version number to 0.10.0 for the first release to include those features. More information about the security branch can be found in this pull request [1], as well as the SECURITY.md file in the security branch [2]. I also discussed it in a blog post [3] on the Hortonworks website. Please feel free to direct any comments or questions about the security branch to the mailing list. Similar to the process we’ll follow for 0.9.3, we plan to make several unofficial “development” builds available for those who would like to help with testing the new security features. -Taylor [1] https://github.com/apache/incubator-storm/pull/121 [2] https://github.com/apache/incubator-storm/blob/security/SECURITY.md [3] http://hortonworks.com/blog/the-future-of-apache-storm/ signature.asc Description: Message signed with OpenPGP using GPGMail
Re: supervisor not listening on port 6700?
This has been resolved in the master branch. -Taylor On Aug 27, 2014, at 12:10 PM, Harsha st...@harsha.io wrote: Looks like a build/release issue with storm 0.9.2. we might need to update the package there shouldn't be two versions of netty in lib dir. Can you please file a JIRA for this. Thanks, Harsha On Wed, Aug 27, 2014, at 08:54 AM, Naga Vij wrote: Got it ; Thank you! BTW, I have worked around this issue thus ... Noticed two netty jars in lib dir - netty-3.2.2.Final.jar and netty-3.6.3.Final.jar Eliminated both of them, and placed netty-3.9.4.Final.jar The core worker processes are steady now. On Wed, Aug 27, 2014 at 8:08 AM, Harsha st...@harsha.io wrote: You need to do these following steps git clone https://github.com/apache/incubator-storm.git git checkout v0.9.2-incubating -b 0.9.2-incubating On Wed, Aug 27, 2014, at 08:03 AM, Naga Vij wrote: Is the Git Url right? I just tried and got ... git clone https://github.com/apache/incubator-storm/tree/v0.9.2-incubating Cloning into 'v0.9.2-incubating'... fatal: repository 'https://github.com/apache/incubator-storm/tree/v0.9.2-incubating/' not found On Wed, Aug 27, 2014 at 7:41 AM, Harsha st...@harsha.io wrote: Storm 0.9.2 is tag under github repo https://github.com/apache/incubator-storm/tree/v0.9.2-incubating. -Harsha On Tue, Aug 26, 2014, at 10:26 PM, Naga Vij wrote: Does anyone know what the git branch name is for 0.9.2 ? On Tue, Aug 26, 2014 at 10:24 PM, Naga Vij nvbuc...@gmail.com wrote: When it gets into `still hasn't started` state, I have noticed this in UI - java.lang.RuntimeException: java.net.ConnectException: Connection refused at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) at backtype.storm.utils.DisruptorQueue. and am wondering how to overcome this. On Tue, Aug 26, 2014 at 10:04 PM, Naga Vij nvbuc...@gmail.com wrote: I left supervisor running with the `still hasn't started` state on one window, and tried starting the worker on another window. That triggered an attempt to start another worker (with another distinct id) in the first window (the supervisor window) which in turn went into the `still hasn't started` state. On Tue, Aug 26, 2014 at 7:50 PM, Vikas Agarwal vi...@infoobjects.com wrote: I am even having the almost same versions of storm (0.9.1) and kafka. And my topologies were also facing the same issue. When I ran the worker command directly, I came to know that somehow hostname was wrong in the configuration passed to the workers. So, I fixed that in storm config and my topology worked after that. However, now again it has stuck with same still hasn't started error message and in my case now the error in running the worker command is Address already in use for supervisor port. So, what is the error when you directly run the worker command? On Tue, Aug 26, 2014 at 9:39 PM, Naga Vij nvbuc...@gmail.com wrote: I fail to understand why that should happen, as testing with LocalCluster goes through fine. I did a clean fresh start to figure out what could be happening, and here are my observations - - fresh clean start: cleanup in zk (rmr /storm), and /bin/rm -fr {storm's tmp dir} - used local pseudo cluster on my mac - nimbus process started fine - supervisor process started fine - ensured toplogy works fine with (the embedded) LocalCluster - topology was then submitted to local pseudo cluster on my mac ; that's when I see ``still hasn't started`` messages in supervisor terminal window When submitting topology to local pseudo cluster, had to add jars to overcome these ... Caused by: java.lang.ClassNotFoundException: storm.kafka.BrokerHosts Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest Caused by: java.lang.ClassNotFoundException: scala.Product Above were overcome by adding these to lib dir - storm-kafka-0.9.2-incubating.jar kafka_2.10-0.8.1.1.jar scala-library-2.10.1.jar I have tried the command in log as well ; hasn't helped. What am I missing? On Mon, Aug 25, 2014 at 11:41 PM, Vikas Agarwal vi...@infoobjects.com wrote: dd7c588e-5fa0-4c4b-96ed-de0d420001e9 still hasn't started This is the clue. One of your topology is failing to start. You must see the worker command before these logs in the same log file. Just try to run those directly on console and it would show the exact error. On Tue, Aug 26, 2014 at 11:45 AM, Naga Vij nvbuc...@gmail.com wrote: Hello, I am trying out Storm 0.9.2-incubating pseudo cluster (on just one box) on these two systems - cat /etc/redhat-release CentOS release 6.3 (Final) and sw_vers ProductName:Mac OS X ProductVersion:10.9.2 BuildVersion:13C64 After starting supervisor, I notice it is not listening on the configured port (6700) - nc -zv
Re: How do I implement this topology in Storm?
The logic for fields grouping is stateless. It does a hash mod on the field values. Essentially: selectedTask = fields.hashCode() % numTasks - Taylor On Aug 8, 2014, at 1:39 PM, Jonathan Yom-Tov jon.yom...@gmail.com wrote: It's an option but if I understand correctly the field grouping will make sure each word goes to the same bolt every time, I'm guessing this requires some sort of central authority to coordinate. How does that happen? On Fri, Aug 8, 2014 at 8:27 PM, Nathan Leung ncle...@gmail.com wrote: Why not use fields grouping like the example so that you don't have to do any coordination across bolts? On Aug 8, 2014 12:25 PM, Jonathan Yom-Tov jon.yom...@gmail.com wrote: I want to implement a topology that is similar to the RollingTopWords topology in the Storm examples. The idea is to count the frequency of words emitted. Basically, the spouts emit words at random, the first level bolts count the frequency and pass them on. The twist is that I want the bolts to pass on the frequency of a word only if its frequency in one of the bolts exceeded a threshold. So, for example, if the word Nathan passed the threshold of 5 occurrences within a time window on one bolt then all bolts would start passing Nathan's frequency onwards. What I thought of doing is having another layer of bolts which would have the list of words which have passed a threshold. They would then receive the words and frequencies from the previous layer of bolts and pass them on only if they appear in the list. Obviously, this list would have to be synchronized across the whole layer of bolts. Is this a good idea? What would be the best way of implementing it? -- Got a cool idea for a web startup? How about I build it for you? Check out http://chapter64.com/ signature.asc Description: Message signed with OpenPGP using GPGMail
Re: kafka-spout running error
You are running in local mode. So storm will start an in-process zookeeper for it’s own use (usually on port 2000). In distributed mode, Storm will connect to the zookeeper quorum specified in your storm.yaml. In local mode, you would only need the external zookeeper for kafka and the kafka spout. When configuring the kafka spout, point it to the zookeeper used by kafka. - Taylor On Aug 6, 2014, at 3:34 PM, Sa Li sa.in.v...@gmail.com wrote: Hi, Kushan You are completely right, I noticed this after you mentioned it, apparently I am able to consumer the messages by kafka-console-consumer.sh which listen to 2181, but storm goes to 2000 instead. 1319 [main] INFO backtype.storm.zookeeper - Starting inprocess zookeeper at port 2000 and dir /tmp/f41ad971-9f6b-433f-9dc9-9797afcc2e46 1425 [main] INFO backtype.storm.daemon.nimbus - Starting Nimbus with conf {dev.zookeeper.path /tmp/dev-storm-zookeeper, I spent whole morning to walk through my configuration, this is the zoo.cfg # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=5 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=2 # the directory where the snapshot is stored. dataDir=/var/lib/zookeeper # Place the dataLogDir to a separate physical disc for better performance # dataLogDir=/disk2/zookeeper # the port at which the clients will connect clientPort=2181 # specify all zookeeper servers # The fist port is used by followers to connect to the leader # The second one is used for leader election #server.1=zookeeper1:2888:3888 #server.2=zookeeper2:2888:3888 #server.3=zookeeper3:2888:3888 # To avoid seeks ZooKeeper allocates space in the transaction log file in # blocks of preAllocSize kilobytes. The default block size is 64M. One reason # for changing the size of the blocks is to reduce the block size if snapshots # are taken more often. (Also, see snapCount). #preAllocSize=65536 # Clients can submit requests faster than ZooKeeper can process them, # especially if there are a lot of clients. To prevent ZooKeeper from running # out of memory due to queued requests, ZooKeeper will throttle clients so that # there is no more than globalOutstandingLimit outstanding requests in the # system. The default limit is 1,000.ZooKeeper logs transactions to a # transaction log. After snapCount transactions are written to a log file a # snapshot is started and a new transaction log file is started. The default # snapCount is 10,000. #snapCount=1000 # If this option is defined, requests will be will logged to a trace file named # traceFile.year.month.day. #traceFile= # Leader accepts client connections. Default value is yes. The leader machine # coordinates updates. For higher update throughput at thes slight expense of # read throughput the leader can be configured to not accept clients and focus # on coordination. leaderServes=yes # Enable regular purging of old data and transaction logs every 24 hours autopurge.purgeInterval=24 autopurge.snapRetainCount=5 Only thing that I thought to change was to make multi-server setup, uncomment the server.1, server.2, server.3, but didn't help. And this is the storm.yaml sitting in ~/.storm storm.zookeeper.servers: - 10.100.70.128 # - server2 storm.zookeeper.port: 2181 nimbus.host: 10.100.70.128 nimbus.childopts: -Xmx1024m storm.local.dir: /app/storm java.library.path: /usr/lib/jvm/java-7-openjdk-amd64 supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 # # These may optionally be filled in: # ## List of custom serializations # topology.kryo.register: # - org.mycompany.MyType # - org.mycompany.MyType2: org.mycompany.MyType2Serializer # ## List of custom kryo decorators # topology.kryo.decorators: # - org.mycompany.MyDecorator # ## Locations of the drpc servers drpc.servers: - 10.100.70.128 # - server2 drpc.port: 3772 drpc.worker.threads: 64 drpc.queue.size: 128 drpc.invocations.port: 3773 drpc.request.timeout.secs: 600 drpc.childopts: -Xmx768m ## Metrics Consumers # topology.metrics.consumer.register: # - class: backtype.storm.metrics.LoggingMetricsConsumer # parallelism.hint: 1 # - class: org.mycompany.MyMetricsConsumer # parallelism.hint: 1 # argument: # - endpoint: metrics-collector.mycompany.org I really couldn't figure out what is trick to configure zK and storm cluster, and why zookeeper listen to 2000 which is really a weird thing. thanks Alec On Wed, Aug 6, 2014 at 6:48 AM, Kushan Maskey kushan.mas...@mmillerassociates.com wrote: I see that your zookeeper is listening on port 2000. Is that how you have configured the zookeeper? -- Kushan Maskey 817.403.7500 On Tue, Aug 5, 2014 at 11:56 AM, Sa Li sa.in.v...@gmail.com wrote: Thank you very
Re: kafka-spout running error
You have two different versions of zookeeper on the classpath (or in your topology jar). You need to find out where the conflicting zookeeper dependency is sneaking in and exclude it. If you are using maven 'mvn dependency:tree' and exclusions will help. -Taylor On Aug 6, 2014, at 6:14 PM, Sa Li sa.in.v...@gmail.com wrote: Thanks, Taylor, that makes sense, I check my kafka config, the host.name=10.100.70.128, and correspondingly change the spout config as BrokerHosts zk = new ZkHosts(10.100.70.128); TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, topictest); it used to be localhost, actually localhost=10.100.70.128, so spout listen to 10.100.70.128 and collect the topictest, but still same error: 3237 [Thread-19-$spoutcoord-spout0] ERROR backtype.storm.util - Async loop died! java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.init(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V thanks Alec On Wed, Aug 6, 2014 at 1:27 PM, P. Taylor Goetz ptgo...@gmail.com wrote: You are running in local mode. So storm will start an in-process zookeeper for it’s own use (usually on port 2000). In distributed mode, Storm will connect to the zookeeper quorum specified in your storm.yaml. In local mode, you would only need the external zookeeper for kafka and the kafka spout. When configuring the kafka spout, point it to the zookeeper used by kafka. - Taylor On Aug 6, 2014, at 3:34 PM, Sa Li sa.in.v...@gmail.com wrote: Hi, Kushan You are completely right, I noticed this after you mentioned it, apparently I am able to consumer the messages by kafka-console-consumer.sh which listen to 2181, but storm goes to 2000 instead. 1319 [main] INFO backtype.storm.zookeeper - Starting inprocess zookeeper at port 2000 and dir /tmp/f41ad971-9f6b-433f-9dc9-9797afcc2e46 1425 [main] INFO backtype.storm.daemon.nimbus - Starting Nimbus with conf {dev.zookeeper.path /tmp/dev-storm-zookeeper, I spent whole morning to walk through my configuration, this is the zoo.cfg # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=5 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=2 # the directory where the snapshot is stored. dataDir=/var/lib/zookeeper # Place the dataLogDir to a separate physical disc for better performance # dataLogDir=/disk2/zookeeper # the port at which the clients will connect clientPort=2181 # specify all zookeeper servers # The fist port is used by followers to connect to the leader # The second one is used for leader election #server.1=zookeeper1:2888:3888 #server.2=zookeeper2:2888:3888 #server.3=zookeeper3:2888:3888 # To avoid seeks ZooKeeper allocates space in the transaction log file in # blocks of preAllocSize kilobytes. The default block size is 64M. One reason # for changing the size of the blocks is to reduce the block size if snapshots # are taken more often. (Also, see snapCount). #preAllocSize=65536 # Clients can submit requests faster than ZooKeeper can process them, # especially if there are a lot of clients. To prevent ZooKeeper from running # out of memory due to queued requests, ZooKeeper will throttle clients so that # there is no more than globalOutstandingLimit outstanding requests in the # system. The default limit is 1,000.ZooKeeper logs transactions to a # transaction log. After snapCount transactions are written to a log file a # snapshot is started and a new transaction log file is started. The default # snapCount is 10,000. #snapCount=1000 # If this option is defined, requests will be will logged to a trace file named # traceFile.year.month.day. #traceFile= # Leader accepts client connections. Default value is yes. The leader machine # coordinates updates. For higher update throughput at thes slight expense of # read throughput the leader can be configured to not accept clients and focus # on coordination. leaderServes=yes # Enable regular purging of old data and transaction logs every 24 hours autopurge.purgeInterval=24 autopurge.snapRetainCount=5 Only thing that I thought to change was to make multi-server setup, uncomment the server.1, server.2, server.3, but didn't help. And this is the storm.yaml sitting in ~/.storm storm.zookeeper.servers: - 10.100.70.128 # - server2 storm.zookeeper.port: 2181 nimbus.host: 10.100.70.128 nimbus.childopts: -Xmx1024m storm.local.dir: /app/storm java.library.path: /usr/lib/jvm/java-7-openjdk-amd64 supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 # # These may optionally be filled in: # ## List of custom serializations # topology.kryo.register: # - org.mycompany.MyType # - org.mycompany.MyType2: org.mycompany.MyType2Serializer # ## List of custom kryo decorators
Re: kafka-spout running error
You are only sleeping for 100 milliseconds before shutting down the local cluster, which is probably not long enough for the topology to come up and start processing messages. Try increasing the sleep time to something like 10 seconds. You can also reduce startup time with the following JVM flag: -Djava.net.preferIPv4Stack=true - Taylor On Aug 5, 2014, at 1:16 PM, Sa Li sa.in.v...@gmail.com wrote: Sorry, the stormTopology: TridentTopology topology = new TridentTopology(); BrokerHosts zk = new ZkHosts(localhost); TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, “topictest); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); On Aug 5, 2014, at 9:56 AM, Sa Li sa.in.v...@gmail.com wrote: Thank you very much, Marcelo, it indeed worked, now I can run my code without getting error. However, another thing is keeping bother me, following is my code: public static class PrintStream implements Filter { @SuppressWarnings(rawtypes”) @Override public void prepare(Map conf, TridentOperationContext context) { } @Override public void cleanup() { } @Override public boolean isKeep(TridentTuple tuple) { System.out.println(tuple); return true; } } public static StormTopology buildTopology(LocalDRPC drpc) throws IOException { TridentTopology topology = new TridentTopology(); BrokerHosts zk = new ZkHosts(localhost); TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, ingest_test); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); topology.newStream(kafka, spout) .each(new Fields(str), new PrintStream() ); return topology.build(); } public static void main(String[] args) throws Exception { Config conf = new Config(); conf.setDebug(true); conf.setMaxSpoutPending(1); conf.setMaxTaskParallelism(3); LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(kafka, conf, buildTopology(drpc)); Thread.sleep(100); cluster.shutdown(); } What I expect is quite simple, print out the message I collect from a kafka producer playback process which is running separately. The topic is listed as: root@DO-mq-dev:/etc/kafka# bin/kafka-list-topic.sh --zookeeper localhost:2181 topic: topictestpartition: 0leader: 1 replicas: 1,3,2 isr: 1,3,2 topic: topictestpartition: 1leader: 2 replicas: 2,1,3 isr: 2,1,3 topic: topictestpartition: 2leader: 3 replicas: 3,2,1 isr: 3,2,1 topic: topictestpartition: 3leader: 1 replicas: 1,2,3 isr: 1,2,3 topic: topictestpartition: 4leader: 2 replicas: 2,3,1 isr: 2,3,1 When I am running the code, this is what I saw on the screen, seems no error, but no message print out as well: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. Running: java -client -Dstorm.options= -Dstorm.home=/etc/storm-0.9.0.1 -Djava.library.path=/usr/lib/jvm/java-7-openjdk-amd64 -Dstorm.conf.file= -cp
Re: @ptgoetz storm-jms and ActiveMQ
Hi Alberto, With a spout parallelism 1, you will lose ordering. The only way to preserve some semblance of order would be to use a spout parallelism of 1, AND use a fields grouping of some kind to partition the stream. What about your use case requires preservation of message order? There might be an alternative angle of attack… - Taylor On Aug 4, 2014, at 12:29 PM, Alberto São Marcos amar...@bikeemotion.com wrote: Hi, Im currently using Storm 0.9.x, ActiveMQ and @ptgoetz storm-jms spout. I have multiple producers sending messages to the same queue. My topologies run multiple Worker/Executors/Taks. From what i understand: 1) One ActiveMQ consumer (storm-jms spout instance) takes messages from the queue preserving the order (FIFO); 2) Multiple ActiveMQ consumers/spouts in the same queue will compete for messages and order is lost. 3) Storm guarantees that bolts consume tuples in the order they are emitted by the topology spout; With storm-jms spout in a parallel scenario is there any message ordering preservation guarantee at all? In what order are tuple batches emitted to the stream by those multiple storm-jms spout instances? Even if I used a Exclusive Consumer approach, can i assume the order of tuple processing is preserved? Does it having multiple ActiveMQ consumers in the form of spouts instances ultimately translates to a unpredictable tuple processing order by the topology bolts? If so, can I solve this problem switching ActiveMQ for other solution? Thanks in advance. Alberto signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Storm workers not starting because of netty reconnect : [INFO] Reconnect started for Netty-Client
I would double check to make sure hostname resolution is working properly on all hosts in the cluster, and that there are not any firewall rules that would prevent connections on the supervisor ports. I would also remove any Netty configuration overrides from storm.yaml to allow the defaults to take effect — only override the defaults when you need to. - Taylor On Aug 4, 2014, at 2:37 PM, Rushabh Shah rushabh.s...@integral.com wrote: Hi, I have a topology that was deployed on a storm cluster and was running fine until I started facing the following issue. I can see that in supervisor logs, the supervisor is trying to launch the topology on a worker but it is not able to start it. 2014-08-04 18:27:33 b.s.d.supervisor [INFO] Launching worker with assignment #backtype.storm.daemon.supervisor.LocalAssignment{:storm-id SALABPOSITION-5-1-2-1406938773, :executors ([3 3] [5 5] [7 7] [9 9] [11 11] [1 1])} for this supervisor 48753f4c-e0fd-48f3-a149-1f52491da5b9 on port 6702 with id f620ab27-61fd-4b87-b017-dea1e811074b 2014-08-04 18:27:33 b.s.d.supervisor [INFO] Launching worker with command: '/integral/opt/jdk16/bin/java' '-server' '-Xmx768m' '-Djava.net.preferIPv4Stack=true' '-Djava.net.preferIPv4Stack=true' '-Xmanagement:ssl=false,authenticate=false,port=7099' '-Xmx8192m' '-Djava.library.path=/app/storm/supervisor/stormdist/SALABPOSITION-5-1-2-1406938773/resources/Linux-amd64:/app/storm/supervisor/stormdist/SALABPOSITION-5-1-2-1406938773/resources:/usr/local/lib:/opt/local/lib:/usr/lib' '-Dlogfile.name=worker-6702.log' '-Dstorm.home=/integral/opt/apache-storm-0.9.2-incubating' '-Dlogback.configurationFile=/integral/opt/apache-storm-0.9.2-incubating/logback/cluster.xml' '-Dstorm.id=SALABPOSITION-5-1-2-1406938773' '-Dworker.id=f620ab27-61fd-4b87-b017-dea1e811074b' '-Dworker.port=6702' '-cp'
Re: storm-0.9.2-incubating topology submission fails
I take it you are using the Nimbus client directly to submit the topology, rather than using the `storm jar` command. If that’s the case, you need to configure the transport plugin: MapString, Object conf = new HashMapString, Object(); conf.put(storm.thrift.transport, backtype.storm.security.auth.SimpleTransportPlugin); NimbusClient nc = NimbusClient.getConfiguredClient(conf); - Taylor On Aug 4, 2014, at 8:46 AM, Richards Peter hbkricha...@gmail.com wrote: Hi, I am trying to migrate to storm-0.9.2-incubating from storm-0.8.2. With storm-0.8.2 I submit topology as per the following post: https://groups.google.com/forum/?fromgroups#!searchin/storm-user/Can$20we$20submit$20the$20topology$20jar$20without$20using$20storm$20command$3F/storm-user/5JjZQyviRqk/B_r1Ybew4aYJ However when I use the same code with storm-0.9.2, I get the following error when I submit the topology. Could anyone tell me what I am missing? java.lang.RuntimeException: java.lang.NullPointerException at backtype.storm.security.auth.AuthUtils.GetTransportPlugin(AuthUtils.java:78) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.security.auth.ThriftClient.init(ThriftClient.java:47) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.utils.NimbusClient.init(NimbusClient.java:47) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.utils.NimbusClient.init(NimbusClient.java:43) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:36) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.StormSubmitter.submitJar(StormSubmitter.java:235) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.StormSubmitter.submitJar(StormSubmitter.java:220) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] Regards, Richards Peter. signature.asc Description: Message signed with OpenPGP using GPGMail
Re: how can I make the Nimbus highly available ?
HA for Nimbus is planned, and there has already been some work in that area: https://issues.apache.org/jira/browse/STORM-166 Nimbus is not a true SPOF in that if it goes down, you’re topologies will continue to function and process data. With Nimbus down what you lose is the ability to submit/manage topologies and reassign tasks. Without STORM-166, you would have to manually intervene if your nimbus host exploded. If it is a big concern you may want a standby machine and something in your ops pocket to quickly change DNS for your nimbus host. - Taylor On Aug 4, 2014, at 10:55 PM, yang hui loveck...@163.com wrote: HI,all: how can I make the Nimbus highly available ? I am just confused about this. In the website, I just read below; This means you can kill -9 Nimbus or the Supervisors and they’ll start back up like nothing happened. This design leads to Storm clusters being incredibly stable. But if my master node crashes with an unexpected hardware failure , I can't recover it in a short time. So how can I make the Nimbus as a high available service ? signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Good way to test when topology in local cluster is fully active
My guess is that the slowdown you are seeing is a result of the new version of ZooKeeper and how it handles IPv4/6. Try adding the following JVM parameter when running your tests: -Djava.net.preferIPv4Stack=true -Taylor On Aug 4, 2014, at 8:49 PM, Corey Nolet cjno...@gmail.com wrote: I'm testing some sliding window algorithms with tuples emitted from a mock spout based on a timer but the amount of time it takes the topology to fully start up and activate seems to vary from computer to computer. Specifically, I just updated from 0.8.2 to 0.9.2-incubating and all of my tests are breaking because the time to activate the topology is taking longer (because of Netty possibly?). I'd like to make my tests more resilient to things like this. Is there something I can look at in LocalCluster where I could do while(!notActive) { Thread.sleep(50) } ? This is what my test looks like currently: StormTopology topology = buildTopology(...); Config conf = new Config(); conf.setNumWorkers(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(getTopologyName(), conf, topology); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } cluster.shutdown(); assertEquals(4, MockSinkBolt.getEvents().size()); Thanks! signature.asc Description: Message signed with OpenPGP using GPGMail
Re: High CPU utilization after storm node failover
+ dev@storm Vinyasa/Srinath, Anything you can share to make this reproducible would be very helpful. I would love to see a network partition simulation framework for Storm along the lines of what Kyle Kingsbury has done with Jepsen [1]. It basically sets up a virtual cluster then simulates network partitions by manipulating iptables. Jepsen [2] is written in clojure and Kyle is a strong proponent. I think it is worth a look. -Taylor [1] http://aphyr.com/posts/281-call-me-maybe-carly-rae-jepsen-and-the-perils-of-network-partitions [2] https://github.com/aphyr/jepsen On Aug 5, 2014, at 8:39 PM, Srinath C srinat...@gmail.com wrote: I have seen this behaviour too using 0.9.2-incubating. The failover works better when there is a redundant node available. Maybe 1 slot per node is the best approach. Eager to know if there are any steps to further diagnose. On Wed, Aug 6, 2014 at 5:43 AM, Vinay Pothnis vinay.poth...@gmail.com wrote: [Storm Version: 0.9.2-incubating] Hello, I am trying to test failover scenarios with my storm cluster. The following are the details of the cluster: * 4 nodes * Each node with 2 slots * Topology with around 600 spouts and bolts * Num. Workers for the topology = 4 I am running a test that generating a constant load. The cluster is able to handle this load fairly well and the CPU utilization at this point is below 50% on all the nodes. 1 slot is occupied on each of the nodes. I then bring down one of the nodes (kill the supervisor and the worker processes on a node). After this, another worker is created on one of the remaining nodes. But the CPU utilization jumps up to 100%. At this point, nimbus cannot communicate with the supervisor on the node and keeps killing and restarting workers. The CPU utilization remains pegged at 100% as long as the load is on. If I stop the tests and restart the test after a while, the same set up with just 3 nodes works perfectly fine with less CPU utilization. Any pointers to how to figure out the reason for the high CPU utilization during the failover? Thanks Vinay
Re: use normal storm bolt in trident topology
You cannot use regular storm bolts in a trident topology. Regular spouts can be used with trident, but not bolts. - Taylor On Jul 23, 2014, at 1:16 PM, A.M. costco.s...@yahoo.com wrote: Is it recommended to use regular storm bolts in a trident topology? or is it at all possible? Thanks for your advice in advance. -Costco signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Need help to use storm with mysql.
What version of Hadoop are you using? Storm-hdfs requires Hadoop 2.x. - Taylor On Jul 18, 2014, at 6:07 AM, amjad khan amjadkhan987...@gmail.com wrote: Thanks for your help parth When i trying to run the topology to write the data to hdfs it throws exception Class Not Found: org.apache.hadoop.client.hdfs.HDFSDataOutputStream$SyncFlags Can anyone tell me what are the jars needed to execute the code to write data to hdfs. Please tell me all the required jars. On Wed, Jul 16, 2014 at 10:46 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: You can use https://github.com/ptgoetz/storm-hdfs It supports writing to HDFS with both Storm bolts and trident states. Thanks Parth On Jul 16, 2014, at 10:41 AM, amjad khan amjadkhan987...@gmail.com wrote: Can anyone provide the code for bolt to write its data to hdfs. Kindly tell me the jar's required to run that bolt. On Mon, Jul 14, 2014 at 2:33 PM, Max Evers mcev...@gmail.com wrote: Can you expand on your use case? What is the query selecting on? Is the column you are querying on indexed? Do you really need to look at the entire 20 gb every 20ms? On Jul 14, 2014 6:39 AM, amjad khan amjadkhan987...@gmail.com wrote: I made a storm topoogy where spout was fetching data from mysql using select query. The select query was fired after every 30 msec but because the size of the table is more than 20 GB the select query takes more than 10 sec to execute therefore this is not working. I need to know what are the possible alternatives for this situation. Kindly reply as soon as possible. Thanks, CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Does Storm support JDK7 ??
Source and target are set to 1.6 to ensure backward compatibility for those who, for whatever reason, need to use Java 1.6. 1. Yes, Storm is compatible with JDK 7. 2. No. 3. I imagine this will happen at some point. But there’s nothing to stop you from running Storm with JDK 7. 4. No restriction. For now we do not rely on any JDK 7-specific features, so providing backward compatibility with 1.6 is a no-brainer. 5. Storm binary releases are compatible with JDK 6 and 7, both Oracle and OpenJDK. There’s no need to recompile. - Taylor On Jul 14, 2014, at 8:11 AM, Veder Lope add...@gmail.com wrote: Storm is a great project!!! We are trying to build a project, where Storm holds a crucial role in our architecture. As I see in pom.xml (in maven-compiler-plugin), source and target are set to Java 1.6. 1) Is Storm compatible with JDK7? 2) I know we can download Storm (build for JDK6) and run it using JDK7, but there are a few incompatibilities* between JDK7 and JDK6. Will these incompatibilities affect Storm or not? 3) Do you plan to move to JDK7? 4) What is the restriction that holds are back to JDK6? (we are now stuck to JDK6 compile and runtime because of Storm) 5) Can we just build Storm with JDK7 (alter both source and target in pom.xml) and then use JDK7 for runtime or not? Have you seen any errors with this road? *incompatibilities: Check this: http://www.oracle.com/technetwork/java/javase/compatibility-417013.html#incompatibilities Regards, Adrianos Dadis. signature.asc Description: Message signed with OpenPGP using GPGMail
Re: [0.9.2-incubating] Kafka Spout setting to latest offset
I'm +1 as well. On Jul 9, 2014, at 4:03 PM, Danijel Schiavuzzi dani...@schiavuzzi.com wrote: I'm also +1 on this. The old spout behaviour was perfectly fine. I guess maxOffsetBehind was added as a protection against fetching unavailable Kafka offsets, but it doesn't really make sense to me, in my Trident transactional topology where I can't afford to lose any data. I would rather have my spout stop processing data in this case, than skipping some offsets because of an arbitrary maxOffsetBehind config value. Others opinions may vary, but I think setting this to Long.MAX_VALUE would make a much better default, as it would be closer to the old spout behaviour. On Wednesday, July 9, 2014, Curtis Allen curtis.n.al...@gmail.com wrote: Hello, I’ve recently upgraded to storm and storm-kafka 0.9.2-incubating, replacing the https://github.com/wurstmeister/storm-kafka-0.8-plus spout I was using previously. I have a large kafka log that I needed processed. I started my topology with storm.kafka.SpoutConfig spoutConfig = new SpoutConfig spoutConfig.forceFromStart = true; I then needed to make some tweaks in my application code and restarted the topology with spoutConfig.forceFromStart = false. Expecting to pick up where I left off in my kafka log. Instead the kafka spout started from the latest offset. Upon investigation I found this log message in my storm worker logs 2014-07-09 18:02:15 s.k.PartitionManager [INFO] Read last commit offset from zookeeper: 15266940; old topology_id: ef3f1f89-f64c-4947-b6eb-0c7fb9adb9ea - new topology_id: 5747dba6-c947-4c4f-af4a-4f50a84817bf 2014-07-09 18:02:15 s.k.PartitionManager [INFO] Last commit offset from zookeeper: 15266940 2014-07-09 18:02:15 s.k.PartitionManager [INFO] Commit offset 22092614 is more than 10 behind, resetting to startOffsetTime=-2 2014-07-09 18:02:15 s.k.PartitionManager [INFO] Starting Kafka prd-use1c-pr-08-kafka-kamq-0004:4 from offset 22092614 Digging in the storm-kafka spout I found this line https://github.com/apache/incubator-storm/blob/master/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java#L95 To fix this problem I ended up setting my spout config like so spoutConf.maxOffsetBehind = Long.MAX_VALUE; Now finally to my question. Why would the kafka spout skip to the latest offset if the current offset is more then 10 behind by default? This seems like a bad default value, the spout literally skipped over months of data without any warning. Are the core contributors open to accepting a pull request that would set the default to Long.MAX_VALUE? Thanks, Curtis Allen -- Danijel Schiavuzzi E: dani...@schiavuzzi.com W: www.schiavuzzi.com T: +385989035562 Skype: danijels7
Re: wurstmiester kafkaspout on kafka version 0.8.1.1
Yes. That is fixed in the kafka spout that's now part of the latest 0.9.2 release. http://storm.incubator.apache.org/2014/06/25/storm092-released.html -Taylor On Jun 30, 2014, at 5:27 PM, Kashyap Mhaisekar kashya...@gmail.com wrote: HI, I recently had got kafka version 0.8.0 to work with wurstmiester kafkaspout. I plan to upgrade kafka to latest stable verison 0.8.1.1. After upgrade, i find that the KafkaSpout does not store the offsets into the zookeeper. Has anyone encountered this? Regards, Kashyap
Re: build-time vs run-time serialization
You should really only be concerned with serialization when it comes to the values you use in tuples. If you are using all primitives, then you shouldn't have anything to worry about. If you are passing custom objects around in tuples, then you should consider registering a custom ryo serializer for those objects, and keep them immutable if possible. In terms of instance variables of components, if they are not serializable, then they should be marked as transient and initialized in the prepare() method of the component. So if you have heavy resources like a database connection, guava cache, etc., initialize those in the prepare() method and keep everything as stateless and idempotent as possible for your use case. Keep in mind that the Topology object created when you call the build()method is going to be serialized, including all the components in the topology graph. A Storm topology is really only a static, serializable data structure (Thrift) containing instructions telling Storm how to deploy and run it in a cluster. Only when it is deployed (and serialized in the process) and initialized (i.e. prepare() and other life cycle methods are called on components) does it really do anything in terms of message processing. Hope this helps. -Taylor On Jun 25, 2014, at 8:38 PM, Cody A. Ray cody.a@gmail.com wrote: I have a serialization question. As long as your tuple values are all primitive/built-in types (strings, byte arrays, longs, etc), is Java serialization only used at (re)build time (i.e., when the nimbus distributes the code of your topology to the supervisors, and when they create worker jvms)? Another way of asking: will using a heavy library (e.g., guava or apache commons) in your trident functions (but not tuple types) incur a runtime overhead (other than when you lose a worker/supervisor)? -Cody -- Cody A. Ray, LEED AP cody.a@gmail.com 215.501.7891
Re: Storm 0.9.2-incubating not on maven central, KafkaSpout release?
It has not been officially released yet. The PPMC voted to approve it, but it still needs to be approved by the IPMC. That vote is currently underway. Only when that succeeds can we release the artifacts from staging. -Taylor On Jun 20, 2014, at 10:29 AM, Toni Menzel toni.men...@rebaze.com wrote: Hi there, 1. i see storm 0.9.2-incubator has been released last friday? Any reason why its not on maven central yet [1] ? 2. Right now i am using the (external) kafka spout which right now has been built locally from master. Will it be released with the general distribution soon'ish? Thanks guys, Toni [1] http://search.maven.org/#artifactdetails%7Corg.apache.storm%7Cstorm-core%7C0.9.1-incubating%7Cjar Toni Menzel | rebaze GmbH toni.men...@rebaze.com | +49 171 65 202 84 http://www.rebaze.com | twitter @rebazeio | LinkedIn Profile
Re: v0.9.2-incubating and .ser files
Hi Andrew, Thanks for pointing this out. I agree with your point about bit rot. However, we had to remove the the 0mq transport due to license incompatibilities with Apache, so any kind of release test suite would have to be maintained outside of Apache since it would likely pull in LGPL-licensed dependencies. So if something like you’re suggesting could be accomplished in the storm-0mq project, that would be the best option. I’m open to pull requests, help, contributions, etc. to storm-0mq. It just can’t be part of Apache. I’ll test out your changes to storm-0mq to see if I can reproduce the issue you’re seeing. As Nathan mentioned, any additional information (thread dumps, etc.) you could provide would help. Thanks (and sorry for the inconvenience), Taylor On Jun 19, 2014, at 6:09 PM, Andrew Montalenti and...@parsely.com wrote: Another interesting 0.9.2 issue I came across: the IConnection interface has changed, meaning any pluggable transports no longer work without a code change. I implemented changes to storm-0mq to get it to be compatible with this interface change in my fork here. https://github.com/Parsely/storm-0mq/compare/ptgoetz:master...master I tested that and it nominally works in distributed mode with two independent workers in my cluster. Don't know what the performance impact is of the interface change. I get that zmq is no longer part of storm core, but maintaining a stable interface for pluggable components like this transport is probably something that should be in the release test suite. Otherwise bitrot will take its toll. I am glad to volunteer help with this. My team is now debugging an issue where Storm stops asking our spout for next tuples after awhile of running the topology, causing the tool go to basically freeze with no errors in the logs. At first blush, seems like a regression from 0.9.1. But we'll have more detailed info once we isolate some variables soon. On Jun 18, 2014 4:32 PM, Andrew Montalenti and...@parsely.com wrote: I built the v0.9.2-incubating rc-3 locally and once verifying that it worked for our topology, pushed it into our cluster. So far, so good. One thing for the community to be aware of. If you try to upgrade an existing v0.9.1-incubating or 0.8 cluster to v0.9.2-incubating, you may hit exceptions upon nimbus/supervisor startup about stormcode.ser/stormconf.ser. The issue is that the new cluster will try to re-submit the topologies that were already running before the upgrade. These will fail because Storm's Clojure version has been upgraded from 1.4 - 1.5, thus the serialization formats IDs have changed. This would be true basically if any class serial IDs change that happen to be in these .ser files (stormconf.ser stormcode.ser, as defined in Storm's internal config). The solution is to clear out the storm data directories on your worker nodes/nimbus nodes and restart the cluster. I have some open source tooling that submits topologies to the nimbus using StormSubmitter. This upgrade also made me realize that due to the use of serialized Java files, it is very important the StormSubmitter class used for submitting and the running Storm cluster be precisely the same version / classpath. I describe this more in the GH issue here: https://github.com/Parsely/streamparse/issues/27 I wonder if maybe it's worth it to consider using a less finicky serialization format within Storm itself. Would that change be welcome as a pull request? It would make it easier to script Storm clusters without consideration for client/server Storm version mismatches, which I presume was the original reasoning behind putting Storm functionality behind a Thrift API anyway. And it would prevent crashed topologies during minor Storm version upgrades. signature.asc Description: Message signed with OpenPGP using GPGMail
Re: v0.9.2-incubating and .ser files
Okay. Keep me posted. I still plan on looking at and testing your patch to storm-0mq, but probably won't get to that until early next week. -Taylor On Jun 19, 2014, at 7:43 PM, Andrew Montalenti and...@parsely.com wrote: FYI, the issue happened with both zmq and netty transports. We will investigate more tomorrow. We think the issue only happens with more than one supervisor and multiple workers. On Jun 19, 2014 7:32 PM, P. Taylor Goetz ptgo...@gmail.com wrote: Hi Andrew, Thanks for pointing this out. I agree with your point about bit rot. However, we had to remove the the 0mq transport due to license incompatibilities with Apache, so any kind of release test suite would have to be maintained outside of Apache since it would likely pull in LGPL-licensed dependencies. So if something like you’re suggesting could be accomplished in the storm-0mq project, that would be the best option. I’m open to pull requests, help, contributions, etc. to storm-0mq. It just can’t be part of Apache. I’ll test out your changes to storm-0mq to see if I can reproduce the issue you’re seeing. As Nathan mentioned, any additional information (thread dumps, etc.) you could provide would help. Thanks (and sorry for the inconvenience), Taylor On Jun 19, 2014, at 6:09 PM, Andrew Montalenti and...@parsely.com wrote: Another interesting 0.9.2 issue I came across: the IConnection interface has changed, meaning any pluggable transports no longer work without a code change. I implemented changes to storm-0mq to get it to be compatible with this interface change in my fork here. https://github.com/Parsely/storm-0mq/compare/ptgoetz:master...master I tested that and it nominally works in distributed mode with two independent workers in my cluster. Don't know what the performance impact is of the interface change. I get that zmq is no longer part of storm core, but maintaining a stable interface for pluggable components like this transport is probably something that should be in the release test suite. Otherwise bitrot will take its toll. I am glad to volunteer help with this. My team is now debugging an issue where Storm stops asking our spout for next tuples after awhile of running the topology, causing the tool go to basically freeze with no errors in the logs. At first blush, seems like a regression from 0.9.1. But we'll have more detailed info once we isolate some variables soon. On Jun 18, 2014 4:32 PM, Andrew Montalenti and...@parsely.com wrote: I built the v0.9.2-incubating rc-3 locally and once verifying that it worked for our topology, pushed it into our cluster. So far, so good. One thing for the community to be aware of. If you try to upgrade an existing v0.9.1-incubating or 0.8 cluster to v0.9.2-incubating, you may hit exceptions upon nimbus/supervisor startup about stormcode.ser/stormconf.ser. The issue is that the new cluster will try to re-submit the topologies that were already running before the upgrade. These will fail because Storm's Clojure version has been upgraded from 1.4 - 1.5, thus the serialization formats IDs have changed. This would be true basically if any class serial IDs change that happen to be in these .ser files (stormconf.ser stormcode.ser, as defined in Storm's internal config). The solution is to clear out the storm data directories on your worker nodes/nimbus nodes and restart the cluster. I have some open source tooling that submits topologies to the nimbus using StormSubmitter. This upgrade also made me realize that due to the use of serialized Java files, it is very important the StormSubmitter class used for submitting and the running Storm cluster be precisely the same version / classpath. I describe this more in the GH issue here: https://github.com/Parsely/streamparse/issues/27 I wonder if maybe it's worth it to consider using a less finicky serialization format within Storm itself. Would that change be welcome as a pull request? It would make it easier to script Storm clusters without consideration for client/server Storm version mismatches, which I presume was the original reasoning behind putting Storm functionality behind a Thrift API anyway. And it would prevent crashed topologies during minor Storm version upgrades.
Re: Storm 0.9.2-incubating release
A release candidate has been cut, and is currently being voted on by the PPMC. If that vote passes, it will move on to vote by the IPMC. If both votes pass, it will be released. Each vote stays open for at least 72 hours. If approved, there is also a 72 hour waiting period to allow download mirrors to catch up before the release announcement. So there is a minimum of 9 days between initial vote and release announcement. Release discussions take place on the d...@storm.incubator.apache.org mailing list if you want to follow them. -Taylor On Jun 17, 2014, at 7:36 AM, Mazoyer, Frantz frantz.mazo...@hp.com wrote: Hi Everyone, Does anyone know when Storm 0.9.2-incubating is scheduled for release? What’s left to do until it is? Thanks a lot! Kind regards, Frantz
[RESULT] [VOTE] Storm Logo Contest - Final Round
The Apache Storm logo contest is now complete and was a great success. We received votes from 7 PPMC members as well as 55 votes from the greater Storm community. Thank you to everyone who participated by voting. Congratulations to Jennifer Lee, whose 3rd entry received the most points from both the PPMC as well as the Storm community. Final vote tallies are listed below. We will now begin the process of finalizing the logo and making it official. Congratulations Jennifer, and thanks again to everyone who participated! Results: | Entry| PPMC | Community | |:-|-:|--:| |6 - Alec Bartos | 2| 41| |9 - Jennifer Lee | 7| 111 | |10 - Jennifer Lee | 26 | 123 | -Taylor On Jun 9, 2014, at 2:38 PM, P. Taylor Goetz ptgo...@gmail.com wrote: This is a call to vote on selecting the winning Storm logo from the 3 finalists. The three candidates are: * [No. 6 - Alec Bartos](http://storm.incubator.apache.org/2014/04/23/logo-abartos.html) * [No. 9 - Jennifer Lee](http://storm.incubator.apache.org/2014/04/29/logo-jlee1.html) * [No. 10 - Jennifer Lee](http://storm.incubator.apache.org/2014/04/29/logo-jlee2.html) VOTING Each person can cast a single vote. A vote consists of 5 points that can be divided among multiple entries. To vote, list the entry number, followed by the number of points assigned. For example: #1 - 2 pts. #2 - 1 pt. #3 - 2 pts. Votes cast by PPMC members are considered binding, but voting is open to anyone. In the event of a tie vote from the PPMC, votes from the community will be used to break the tie. This vote will be open until Monday, June 16 11:59 PM UTC. - Taylor signature.asc Description: Message signed with OpenPGP using GPGMail
Re: [VOTE] Storm Logo Contest - Final Round
#10 - 5 pts. On Jun 9, 2014, at 2:38 PM, P. Taylor Goetz ptgo...@gmail.com wrote: This is a call to vote on selecting the winning Storm logo from the 3 finalists. The three candidates are: * [No. 6 - Alec Bartos](http://storm.incubator.apache.org/2014/04/23/logo-abartos.html) * [No. 9 - Jennifer Lee](http://storm.incubator.apache.org/2014/04/29/logo-jlee1.html) * [No. 10 - Jennifer Lee](http://storm.incubator.apache.org/2014/04/29/logo-jlee2.html) VOTING Each person can cast a single vote. A vote consists of 5 points that can be divided among multiple entries. To vote, list the entry number, followed by the number of points assigned. For example: #1 - 2 pts. #2 - 1 pt. #3 - 2 pts. Votes cast by PPMC members are considered binding, but voting is open to anyone. In the event of a tie vote from the PPMC, votes from the community will be used to break the tie. This vote will be open until Monday, June 16 11:59 PM UTC. - Taylor signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Apache Storm vs Apache Spark
The way I usually describe the difference is that Spark is a batch processing framework that also does micro-batching (Spark Streaming), while Storm is a stream processing framework that also does micro-batching (Trident). So architecturally they are very different, but have some similarity on the functional side. With micro-batching you can achieve higher throughput at the cost of increased latency. With Spark this is unavoidable. With Storm you can use the core API (spouts and bolts) to do one-at-a-time processing to avoid the inherent latency overhead imposed by micro-batching. With Trident, you get state management out of the box, and sliding windows are supported as well. In terms of adoption and production deployments, Storm has been around longer and there are a LOT of production deployments. I’m not aware of that many production Spark deployments, but I’d expect that to change over time. In terms of performance, I can’t really point to any valid comparisons. When I say “valid” I mean open and independently verifiable. There is one study that I’m aware of that claims Spark streaming is insanely faster than Storm. The problem with that study is that none of the code or configurations used are publicly available (that I’m aware of). So without a way to independently verify those claims, I’d dismiss it as marketing fluff (the same goes for the IBM InfoStreams comparison). Storm is very tunable when it comes to performance, allowing it to be tuned to the use case at hand. However, it is also easy to cripple performance with the wrong config. I can personally verify that it is possible to process 1.2+ million (relatively small) messages per second with a 10-15 node cluster — and that includes writing to HBase, and other components (I don’t have the hardware specs handy, but can probably dig them up). - Taylor On Jun 9, 2014, at 4:04 PM, Rajiv Onat ora...@gmail.com wrote: Thanks. Not sure why you say it is different, from a stream processing use case perspective both seems to accomplish the same thing while the implementation may take different approaches. If I want to aggregate and do stats in Storm, I would have to microbatch the tuples at some level. e.g. count of orders in last 1 minute, in Storm I have to write code to for sliding windows and state management, while Spark seems to provide operators to accomplish that. Tuple level operations such as enrichment, filters etc.. seems also doable in both. On Mon, Jun 9, 2014 at 12:24 PM, Ted Dunning ted.dunn...@gmail.com wrote: They are different. Storm allows right now processing of tuples. Spark streaming requires micro batching (which may be a really short time). Spark streaming allows checkpointing of partial results in the stream supported by the framework. Storm says you should roll your own or use trident. Applications that fit one like a glove are likely to bind a bit on the other. On Mon, Jun 9, 2014 at 12:16 PM, Rajiv Onat ora...@gmail.com wrote: I'm trying to figure out whether these are competitive technologies for stream processing or complimentary? From the initial read, from a stream processing capabilities both provides a framework for scaling while Spark has window constructs, Apache Spark has a Spark Streaming and promises one platform for batch, interactive and stream processing. Any comments or thoughts? signature.asc Description: Message signed with OpenPGP using GPGMail
Re: JVM instances and Workers
Separate. Supervisors spawn worker JVMs when topologies are deployed. Workers are also dedicated to one topology. -Taylor On Jun 9, 2014, at 5:44 PM, Nima Movafaghrad nima.movafagh...@oracle.com wrote: Good afternoon Storm community, I have a quick question. Do worker processes run in separate JVMs or do they share the same one as the Supervisors? Thanks, Nima
Re: standard DRPC server deployment
I generally put one on the same node as nimbus, and then monitor it to make sure it doesn't get overloaded. If it start to see heavy load, then consider moving it to a dedicated node or multiple nodes + load balancing. Ultimately it comes down to your use case and usage. Monitor usage and adjust accordingly. -Taylor On Jun 9, 2014, at 7:48 PM, Andrew Neilson arsneil...@gmail.com wrote: Hi, I'm just curious about how others have deployed DRPC servers in their storm cluster? Do they need dedicated hosts or can they comfortably run on the same hosts as supervisors? Thanks for any insights you may provide, Andrew
Re: Trident, ZooKeeper and Kafka
Not really. It’s not really intuitive. By setting forceFromStart = true, you are saying that you want to start from a specific position, rather than the last offset stored in ZooKeeper. Then you specify the position with -1, -2, or a specific point in time (in milliseconds). - Taylor On May 29, 2014, at 11:33 AM, Raphael Hsieh raffihs...@gmail.com wrote: By setting forceFromStart to true, aren't I telling it to start from the beginning or earliest time then ? On Thu, May 29, 2014 at 12:59 AM, Danijel Schiavuzzi dani...@schiavuzzi.com wrote: You must set both forceFromStart to true and startOffsetTime to -1 or -2. On Thu, May 29, 2014 at 12:23 AM, Raphael Hsieh raffihs...@gmail.com wrote: I'm doing both tridentKafkaConfig.forceFromStart = false; as well as tridentKafkaConfig.startOffsetTime = -1; Neither are working for me. Looking at my nimbus UI, I still get a large spike in processed data, before it levels off and seems to not process anything. On Wed, May 28, 2014 at 3:17 PM, Shaikh Riyaz shaikh@gmail.com wrote: I think you can use kafkaConfig.forceFromStart = false; We have implemented this and its working fine. Regards, Riyaz On Thu, May 29, 2014 at 1:02 AM, Raphael Hsieh raffihs...@gmail.com wrote: This is still not working for me. I've set the offset to -1 and it is still backfilling data. Is there any documentation on the start offsets that I could take a look at ? Or even documentation on kafka.api.OffsetRequest.LatestTime() ? On Wed, May 28, 2014 at 1:01 PM, Raphael Hsieh raffihs...@gmail.com wrote: would the Trident version of this be tridentKafkaConfig.startOffsetTime ? On Wed, May 28, 2014 at 12:23 PM, Danijel Schiavuzzi dani...@schiavuzzi.com wrote: By default, the Kafka spout resumes consuming where it last left off. That offset is stored in ZooKeeper. You can set forceStartOffset to -2 to start consuming from the earliest available offset, or -1 to start consuming from the latest available offset. On Wednesday, May 28, 2014, Raphael Hsieh raffihs...@gmail.com wrote: If I don't tell trident to start consuming data from the beginning of the Kafka stream, where does it start from? If I were to do: tridentKafkaConfig.forceFromStart = true; Then it will tell the spout to start consuming from the start of the stream. If that is not set, then where does it start consuming from? and How might I go about telling it to start consuming from the very end of the stream? If a disaster were to happen and all my hosts died, when I start my cluster back up, it might start consuming from where it left off. I would rather manually process that old data, and have my storm system start processing the live data. Thanks -- Raphael Hsieh -- Danijel Schiavuzzi E: dani...@schiavuzzi.com W: www.schiavuzzi.com T: +385989035562 Skype: danijels7 -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014 -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014 -- Regards, Riyaz -- Raphael Hsieh -- Danijel Schiavuzzi E: dani...@schiavuzzi.com W: www.schiavuzzi.com T: +385989035562 Skype: danijels7 -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014 signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Workers constantly restarted due to session timeout
Can you share your storm config and version? On May 29, 2014, at 12:45 PM, Michael Dev michael_...@outlook.com wrote: Derek, We are currently running with -Xmx60G and only about 20-30G of that has been observed to be used. I'm still observing workers restarted every 2 minutes. What timeout is relevant to increase for the heartbeats in question? Is it be a config on the Zookeeper side we can increase to make our topology more resilient to these restarts? Michael Date: Fri, 23 May 2014 15:50:50 -0500 From: der...@yahoo-inc.com To: user@storm.incubator.apache.org Subject: Re: Workers constantly restarted due to session timeout 2) Is this expected behavior for Storm to be unable to keep up with heartbeat threads under high CPU or is our theory incorrect? Check your JVM max heap size (-Xmx). If you use too much, the JVM will garbage-collect, and that will stop everything--including the thread whose job it is to do the heartbeating. -- Derek On 5/23/14, 15:38, Michael Dev wrote: Hi all, We are seeing our workers constantly being killed by Storm with to the following logs: worker: 2014-05-23 20:15:08 INFO ClientCxn:1157 - Client session timed out, have not heard from the server in 28105ms for sessionid 0x14619bf2f4e0109, closing socket and attempting reconnect supervisor: 2014-05-23 20:17:30 INFO supervisor:0 - Shutting down and clearing state for id 94349373-74ec-484b-a9f8-a5076e17d474. Current supervisor time: 1400876250. State: :disallowed, Heartbeat: #backtype.storm.daemon.common.WorkerHeartbeat{{:time-secs 1400876249, :storm-id test-46-1400863199, :executors #{[-1 -1]}, :port 6700} Eventually Storm decides to just kill the worker and restart it as you see in the supervisor log. We theorize this is the Zookeeper heartbeat thread and it is being choked out due to very high CPU load on the machine (near 100%). I have increased the connection timeouts in the storm.yaml config file yet Storm seems to continue to use some unknown value for the above client session timeout messages: storm.zookeeper.connection.timeout: 30 storm.zookeeper.session.timeout: 30 1) What timeout config is appropriate for the above timeout message? 2) Is this expected behavior for Storm to be unable to keep up with heartbeat threads under high CPU or is our theory incorrect? Thanks, Michael
New Committer/PPMC Member: Michael G. Noll
The Podling Project Management Committee (PPMC) for Apache Storm has asked Michael G. Noll to become a committer/PPMC member and we are pleased to announce that he has accepted. Michael has contributed to Storm in many ways, including code patches, community support, and high quality documentation and blog posts. We are very excited to have him on board. Michael’s blog can be found here: http://www.michael-noll.com Please join me in welcoming Michael to the team. - Taylor signature.asc Description: Message signed with OpenPGP using GPGMail
Re: logging 'failed' tuples in mastercoord-bg0
Silent replays are usually a sign of batches timing out. By default storm uses a timeout value of thirty seconds. Try upping that value and setting TOPOLOGY_SPOUT_MAX_PENDING to a very low value like 1. In trident that controls how many batches can be in-flight at a time. -Taylor On May 28, 2014, at 5:24 PM, Raphael Hsieh raffihs...@gmail.com wrote: Is there a way to add logging to the master coordinator? For some reason my spout is failing batches and not giving me any error messages. The log files on the host don't provide any error messages and I'm not sure where the logic for this resides in Storm-Trident. Is there a particular string other than 'failed' that I can grep for? Thanks -- Raphael Hsieh
Re: All tuples are going to same worker
Fields grouping uses a mod hash function to determine which task to send a tuple. It sounds like there's not enough variety in the field values you are grouping such that they are all getting sent to the same task. Without seeing your code and data I can't tell for sure. -Taylor On May 28, 2014, at 5:56 PM, Shaikh Riyaz shaikh@gmail.com wrote: Hi All, We running Storm cluster with following servers. One Nimbus Six supervisor with 2 workers each running on 6700 and 6701 ports. All tuples are going to only one supervisor and only to one worker (6701) running on that supervisor. We have one KafkaSpout and 6 bolts processing the data. We are using fieldgrouping to pass tuple from one bolt to another. Each tuple is saving some data to HBase. One of the executor has emitted 609180 tuples and remaining executor has emitted 200 tuples as whole. we have configured our spout and tuples with parallelism hint 5. Please let me know what might wrong with the configuration. Thanks in advance. -- Regards, Riyaz
[RESULT] [VOTE] Storm Logo Contest - Round 1
Round one of the Apache Storm logo contest is now complete and was a great success. We received votes from 7 PPMC members as well as 46 votes from the greater Storm community. We would like to extend a very special thanks to all those who took the time and effort to create and submit a logo proposal. We would also like to thank everyone who voted. ## Results ## The Storm PPMC and community votes were closely aligned, with the community vote resolving a PPMC tie for the 3rd finalist as shown in the result table below. The three finalists entering the final round are: * [No. 6 - Alec Bartos](http://storm.incubator.apache.org/2014/04/23/logo-abartos.html) * [No. 9 - Jennifer Lee](http://storm.incubator.apache.org/2014/04/29/logo-jlee1.html) * [No. 10 - Jennifer Lee](http://storm.incubator.apache.org/2014/04/29/logo-jlee2.html) Please join me in congratulating Alec and Jennifer! Stay tuned for the final vote, which will be announced shortly. | Entry| PPMC | Community | |:-|-:|--:| |1 - Patricia Forrest | 2| 23| |2 - Samuel Quiñones | 3| 6 | |3- Shaan Shiv Suleman | 0| 7 | |4 - Richard Brownlie-Marshall | 0| 6 | |5 - Ziba Sayari | 0| 9 | |6 - Alec Bartos | 3| 32| |7 - Calum Boustead| 0| 0 | |8 - Stefano Asili | 0| 2 | |9 - Jennifer Lee | 9| 63| |10 - Jennifer Lee | 18 | 64| |11 - Jennifer Lee | 0| 18| A spreadsheet containing the complete vote result can be found here: http://people.apache.org/~ptgoetz/storm_logo_contest.xlsx - Taylor On May 15, 2014, at 12:28 PM, P. Taylor Goetz ptgo...@gmail.com wrote: This is a call to vote on selecting the top 3 Storm logos from the 11 entries received. This is the first of two rounds of voting. In the first round the top 3 entries will be selected to move onto the second round where the winner will be selected. The entries can be viewed on the storm website here: http://storm.incubator.apache.org/blog.html VOTING Each person can cast a single vote. A vote consists of 5 points that can be divided among multiple entries. To vote, list the entry number, followed by the number of points assigned. For example: #1 - 2 pts. #2 - 1 pt. #3 - 2 pts. Votes cast by PPMC members are considered binding, but voting is open to anyone. This vote will be open until Thursday, May 22 11:59 PM UTC. - Taylor signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Storm failover, is it possible to know the last succeeded bolt when fail method gets called in spout?
In short, no. Not today. Storm's acking mechanism is largely stateless (one long can track an entire tuple tree),which is one of the reasons it is so efficient. But the acking mechanism is also based on Storms core primitives, so it is entirely possible. There is a JIRA for adding additional metadata to the spout fail method (specific number escapes me ATM). We should consider adding this as as a related issue. -Taylor On May 27, 2014, at 8:58 PM, Xueming Li james.xueming...@gmail.com wrote: Hi guys, For a Storm topology, if a tuple failed to be processed (either timed out or fail method got called on the tuple explicitly), I understand that fail method of the spout will be called with the tuple message id. Other than that, can Storm provide any info as to the last bolt that had processed the tuple successfully? Thanks in advance! Thanks, James
Re: Sample project using Storm and Kafka (using Scala 2.10)
Come to think about it, this is a non-issue. This is just java code that happens to depend on a library written in scala. There isn’t anything that needs to be cross compiled. So it comes down to dependency management and documentation. So the scala and kafka dependencies would be set to “provided” scope, and it would be up to the user to pull in the kafka/scala dependency versions they want. Correct me if I’m missing anything here. - Taylor On May 22, 2014, at 6:08 AM, Marco zentrop...@yahoo.co.uk wrote: Taylor, cross-compatibility is a Scala thingie and THEREFORE it reflects onto Kafka and THEREFORE onto storm-kafka (or whatever-kafka). Kafka (if I understand correctly https://github.com/apache/kafka/blob/0.8.1/build.gradle#l131) manages this with Gradle (which I'm personally not much a fan of). I don't think it'd be much of an overhead having that in storm-kafka (and so effectively have storm-kafka_2_8_0, storm-kafka_2_9_1, storm-kafka_2_9_2, storm-kafka_2_10_1). Maven profiles can do that (as Micheal proved). Plus Storm seems to have put a lot of interest towards polyglottism (mixing Clojure and Java for example). Also, I agree with all the points János made. Having it on Maven Central would be THE perfect starting point for any (Scala) developer willing to give it a spin. ps: Scala 2.11 is just out -Marco Il Giovedì 22 Maggio 2014 3:43, P. Taylor Goetz ptgo...@gmail.com ha scritto: János, I sense and appreciate your frustration. The initial goal with bringing the storm-kafka project under the Apache Storm project umbrella is to provide a definitive source for the code and eliminate fragmentation, which has been an issue in the past. This is a good first step for many reasons, and will hopefully improve greatly in future iterations. Just to be clear, Apache releases source code, not binaries. Any binaries built against an Apache project's source are provided as a convenience. The cross-compilation ballet you describe is a feature of scala [1], not anything Storm or maven related. Yes, we can and will improve on the build and binary release process. But for now the goal is to provide a definitive source, and make sure users can build that source as needed -- which I think we have done. -Taylor [1] a not-so-subtle dig against scala's build process, not the language. Hopefully this will get sorted out someday. On May 21, 2014, at 7:48 PM, János Háber janos.ha...@finesolution.hu wrote: Dear Taylor, I love your work, but: - I don't want to build myself - Dependent libraries (like tormenta-kafka) need a cross compiled version of storm-kafka, without this they need to clone the project, change the group id, handle every changes by hand, and publish to central repository. - I need to have own maven repository to store the cross compiled version (which need to be public if somebody want to use my application) and maintain the changes - I think hand made library is the best way to make a project to unstable, because if I clone the project I need to clone tormenta-kafka project too and handle myself both version changes, solve the compatibility issues, etc... I know how can I compile to 2.10 by hand, but all other project (example kafka, which is apache project too) has cross compiled version in CENTRAL maven repository, if a project required a cross-compiled scala library - my oppinion - the project need to provide cross-compiled version too, no exception. b0c1 János Háber Fine Solution Ltd On Wed, May 21, 2014 at 11:48 PM, P. Taylor Goetz ptgo...@gmail.com wrote: If you build yourself, you can do the following: mvn install -DscalaVersion=2.10.3 -DkafkaArtifact=kafka_2.10 - Taylor On May 21, 2014, at 5:32 PM, János Háber janos.ha...@finesolution.hu wrote: https://github.com/apache/incubator-storm/blob/master/external/storm-kafka/pom.xml#L34 And 2.10 not backward compatible with 2.9... that's why scala applications/libraries is cross compiled with multiple scala version. (that's why SBT will handle this natively, but I think in apache sbt is not allowed so you need to create multiple maven project (hard way) or switch to gradle (like kafka) to produce multiple version) János Háber Fine Solution Ltd On Wed, May 21, 2014 at 11:28 PM, János Háber janos.ha...@finesolution.hu wrote: On Wed, May 21, 2014 at 11:27 PM, P. Taylor Goetz ptgo...@gmail.com wrote: ill include the Kafka spout (for Kafka 0.8.x). Yeah, but the current maven build compile to 2.9 scala not 2.10... János Háber Fine Solution Ltd signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Sometimes topology crashed with internal exception
Are you using the OutputCollector to emit in a separate thread (i.e. Outside of the execute() method.)? As the wiki states, this will cause the problem you are seeing? -Taylor On May 21, 2014, at 7:24 AM, Irek Khasyanov qua...@gmail.com wrote: Hello. I have strange problem with by topology, sometimes everything crashed with exception: java.lang.RuntimeException: java.lang.NullPointerException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) at backtype.storm.disruptor$consume_loop_STAR_$fn__1577.invoke(disruptor.clj:89) at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:662) Caused by: java.lang.NullPointerException at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:41) at backtype.storm.daemon.worker$mk_transfer_fn$fn__4217$fn__4221.invoke(worker.clj:123) at backtype.storm.util$fast_list_map.invoke(util.clj:832) at backtype.storm.daemon.worker$mk_transfer_fn$fn__4217.invoke(worker.clj:123) at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3746.invoke(executor.clj:255) at backtype.storm.disruptor$clojure_handler$reify__1560.onEvent(disruptor.clj:58) at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104) ... 6 more First of all, I'm using storm 0.9.1, kafka 0.8.1, storm-kafka-0.8-plus 0.4.0, zookeeper 3.3.6, oracle java 1.6 I have toplogy: kafka spout - database bolt - database bolt. Second database bolt commits to another table, with data emmited from first bolt. I saw wiki page https://github.com/nathanmarz/storm/wiki/Troubleshooting#nullpointerexception-from-deep-inside-storm but I can't say I'm doing everything wrong. I'm ack'ing right after execute() call, for example: @Override public void execute(Tuple tuple) { this.collector.ack(tuple); this.queue.offer(tuple); } And then queue in different thread will be commited to database. Everything works well for different time, topology can crash after few minutes or after 10 hours. I did't see anything wrong in ZK, kafka logs. What can be a problem? Where I should look into? -- With best regards, Irek Khasyanov.
Re: Sample project using Storm and Kafka (using Scala 2.10)
storm-kafka is now part of the Storm project. When 0.9.2 is released (shortly) it will include the Kafka spout (for Kafka 0.8.x). - Taylor On May 21, 2014, at 4:32 PM, Marco zentrop...@yahoo.co.uk wrote: Hi, I'm having some troubles understanding how to boostrap a Kafka + Storm project. Specifically I don't get if storm-kafka is indeed included in Storm release or not. Also, I'm currently using Scala 2.10 but it seems that Kafka main release is only available in its Scala 2.9.2 version. Thanks for your help -Marco signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Sample project using Storm and Kafka (using Scala 2.10)
If you build yourself, you can do the following: mvn install -DscalaVersion=2.10.3 -DkafkaArtifact=kafka_2.10 - Taylor On May 21, 2014, at 5:32 PM, János Háber janos.ha...@finesolution.hu wrote: https://github.com/apache/incubator-storm/blob/master/external/storm-kafka/pom.xml#L34 And 2.10 not backward compatible with 2.9... that's why scala applications/libraries is cross compiled with multiple scala version. (that's why SBT will handle this natively, but I think in apache sbt is not allowed so you need to create multiple maven project (hard way) or switch to gradle (like kafka) to produce multiple version) János Háber Fine Solution Ltd On Wed, May 21, 2014 at 11:28 PM, János Háber janos.ha...@finesolution.hu wrote: On Wed, May 21, 2014 at 11:27 PM, P. Taylor Goetz ptgo...@gmail.com wrote: ill include the Kafka spout (for Kafka 0.8.x). Yeah, but the current maven build compile to 2.9 scala not 2.10... János Háber Fine Solution Ltd signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Sample project using Storm and Kafka (using Scala 2.10)
János, I sense and appreciate your frustration. The initial goal with bringing the storm-kafka project under the Apache Storm project umbrella is to provide a definitive source for the code and eliminate fragmentation, which has been an issue in the past. This is a good first step for many reasons, and will hopefully improve greatly in future iterations. Just to be clear, Apache releases source code, not binaries. Any binaries built against an Apache project's source are provided as a convenience. The cross-compilation ballet you describe is a feature of scala [1], not anything Storm or maven related. Yes, we can and will improve on the build and binary release process. But for now the goal is to provide a definitive source, and make sure users can build that source as needed -- which I think we have done. -Taylor [1] a not-so-subtle dig against scala's build process, not the language. Hopefully this will get sorted out someday. On May 21, 2014, at 7:48 PM, János Háber janos.ha...@finesolution.hu wrote: Dear Taylor, I love your work, but: - I don't want to build myself - Dependent libraries (like tormenta-kafka) need a cross compiled version of storm-kafka, without this they need to clone the project, change the group id, handle every changes by hand, and publish to central repository. - I need to have own maven repository to store the cross compiled version (which need to be public if somebody want to use my application) and maintain the changes - I think hand made library is the best way to make a project to unstable, because if I clone the project I need to clone tormenta-kafka project too and handle myself both version changes, solve the compatibility issues, etc... I know how can I compile to 2.10 by hand, but all other project (example kafka, which is apache project too) has cross compiled version in CENTRAL maven repository, if a project required a cross-compiled scala library - my oppinion - the project need to provide cross-compiled version too, no exception. b0c1 János Háber Fine Solution Ltd On Wed, May 21, 2014 at 11:48 PM, P. Taylor Goetz ptgo...@gmail.com wrote: If you build yourself, you can do the following: mvn install -DscalaVersion=2.10.3 -DkafkaArtifact=kafka_2.10 - Taylor On May 21, 2014, at 5:32 PM, János Háber janos.ha...@finesolution.hu wrote: https://github.com/apache/incubator-storm/blob/master/external/storm-kafka/pom.xml#L34 And 2.10 not backward compatible with 2.9... that's why scala applications/libraries is cross compiled with multiple scala version. (that's why SBT will handle this natively, but I think in apache sbt is not allowed so you need to create multiple maven project (hard way) or switch to gradle (like kafka) to produce multiple version) János Háber Fine Solution Ltd On Wed, May 21, 2014 at 11:28 PM, János Háber janos.ha...@finesolution.hu wrote: On Wed, May 21, 2014 at 11:27 PM, P. Taylor Goetz ptgo...@gmail.com wrote: ill include the Kafka spout (for Kafka 0.8.x). Yeah, but the current maven build compile to 2.9 scala not 2.10... János Háber Fine Solution Ltd
Re: Which daemon tool for Storm nodes?
Exactly what Michael said... Any good process monitoring tool will work. The only issue I've had with supervisord is not with the software, but rather the name conflict with the Storm supervisor process -- it can confuse the hell out of some people. -Taylor On May 21, 2014, at 9:29 PM, Michael Rose mich...@fullcontact.com wrote: We use upstart. Supervisord would also work. Just anything to keep an eye on it and restart it if it dies (a very rare occurrence). Michael Rose (@Xorlev) Senior Platform Engineer, FullContact mich...@fullcontact.com On Wed, May 21, 2014 at 7:09 PM, Connie Yang cybercon...@gmail.com wrote: Hi, Is there a recommended supervisor or daemon tool for Storm nodes? I'm using Supervisor for Storm and Zookeeper. I heard some concern in using Supervisor, but I haven't researched into the matter. Any comments on this? Thanks, Connie
Re: Weirdness running topology on multiple nodes
Hi Justin, Can you share your storm.yaml config file? Do you have any firewall software running on any of the machines in your cluster? - Taylor On May 7, 2014, at 11:11 AM, Justin Workman justinjwork...@gmail.com wrote: We have spent the better part of 2 weeks now trying to get a pretty basic topology running across multiple nodes. I am sure I am missing something simple but for the life of me I cannot figure it out. Here is the situation, I have 1 nimbus server and 5 supervisor servers, with Zookeeper running on the nimbus server and 2 supervisor nodes. These hosts are all virtual machines 4 CPU's 8GB RAM, running in a OpenStack deployment. If all of the guests are running on the same physical hyperisor then the topology starts up just fine and runs without any issues. However, if we take the guests and spread them out over multiple hypervisors ( in the same OpenStack cluster ), the topology never really completely starts up. Things start to run, some messages are pulled off the spout, but nothing ever makes it all the way through the topology and nothing is ever ack'd. In the worker logs we get messages about reconnecting and eventually a Remote host unreachable error, and Async Loop Died. This used to result in a NumberFormat exception, reducing the netty retries from 30 to 10 resloved the NumberFormat error, and not we get the following 2014-05-07 09:00:51 b.s.m.n.Client [INFO] Reconnect ... [9] 2014-05-07 09:00:52 b.s.m.n.Client [INFO] Reconnect ... [10] 2014-05-07 09:00:52 b.s.m.n.Client [INFO] Reconnect ... [9] 2014-05-07 09:00:52 b.s.m.n.Client [WARN] Remote address is not reachable. We will close this client. 2014-05-07 09:00:52 b.s.m.n.Client [INFO] Reconnect ... [9] 2014-05-07 09:00:52 b.s.m.n.Client [INFO] Reconnect ... [10] 2014-05-07 09:00:52 b.s.m.n.Client [WARN] Remote address is not reachable. We will close this client. 2014-05-07 09:00:52 b.s.m.n.Client [INFO] Reconnect ... [10] 2014-05-07 09:00:52 b.s.m.n.Client [WARN] Remote address is not reachable. We will close this client. 2014-05-07 09:00:52 b.s.m.n.Client [INFO] Reconnect ... [10] 2014-05-07 09:00:52 b.s.m.n.Client [WARN] Remote address is not reachable. We will close this client. 2014-05-07 09:00:53 b.s.util [ERROR] Async loop died! java.lang.RuntimeException: java.lang.RuntimeException: Client is being closed, and does not take requests any more at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] at backtype.storm.disruptor$consume_loop_STAR_$fn__1577.invoke(disruptor.clj:89) ~[na:na] at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433) ~[na:na] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] at java.lang.Thread.run(Thread.java:662) [na:1.6.0_26] Caused by: java.lang.RuntimeException: Client is being closed, and does not take requests any more at backtype.storm.messaging.netty.Client.send(Client.java:125) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__4398$fn__4399.invoke(worker.clj:319) ~[na:na] at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__4398.invoke(worker.clj:308) ~[na:na] at backtype.storm.disruptor$clojure_handler$reify__1560.onEvent(disruptor.clj:58) ~[na:na] And in the supervisor logs we see errors about the workers timing out and not starting up all the way, we also see executor timeouts in the nimbus logs. But we do not see any errors in the Zookeeper logs and the Zookeeper stats look fine. There do not appear to be any real network issues, I can run a continuous flood ping, between the hosts, with varying packet sizes, with minimal latency, and no dropped packets. I have also attempted to add all hosts to the local hosts files on each machine without any difference. I have also played with adjusting the different heartbeat timeouts and intervals with out any luck, and I have also deployed this same setup to a 5 node cluster on physical hardware ( 24 cores 64GB ram and a lot of local disks ), and we had the same issue. Topology would start, but data ever made it through the topology. The only way I have ever been able to get the topology to work is under OpenStack when all guests are on the same physical hypervisor. I think I am just missing something very obvious, but I am going in circles at this point and could use some additional suggestions. Thanks Justin signature.asc Description: Message signed with OpenPGP
[VOTE] Storm Logo Contest - Round 1
This is a call to vote on selecting the top 3 Storm logos from the 11 entries received. This is the first of two rounds of voting. In the first round the top 3 entries will be selected to move onto the second round where the winner will be selected. The entries can be viewed on the storm website here: http://storm.incubator.apache.org/blog.html VOTING Each person can cast a single vote. A vote consists of 5 points that can be divided among multiple entries. To vote, list the entry number, followed by the number of points assigned. For example: #1 - 2 pts. #2 - 1 pt. #3 - 2 pts. Votes cast by PPMC members are considered binding, but voting is open to anyone. This vote will be open until Thursday, May 22 11:59 PM UTC. - Taylor signature.asc Description: Message signed with OpenPGP using GPGMail
Re: How resilient is Storm w/o supervision
I don’t think you need root to run supervisord: http://supervisord.org/running.html If you’re just testing something out, and don’t mind your cluster going down, then running without supervision is okay. But I would NEVER suggest someone run Storm’s daemons without supervision in a production environment. - Taylor On May 2, 2014, at 2:29 PM, Albert Chu ch...@llnl.gov wrote: I'm attempting to run Storm on a platform that I don't have root on. So I won't be able to run it under Redhat's supervisord that's already installed. How resilient are the Storm daemons by themselves? Are they reasonably resilient or are they programmed to not handle even relatively simple errors? I should probably say, this probably wouldn't be run in a production environment. Just trying to understand if the documentation writers are saying, you should really do this for production or it won't work if you don't do this. Thanks, Al -- Albert Chu ch...@llnl.gov Computer Scientist High Performance Systems Division Lawrence Livermore National Laboratory signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Topology submission exception caused by Class Not Found backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917
I have seen this as well and thought I was going nuts. In my testing I could reliably reproduce it in local mode against 0.9.1-incubating. What I noticed in my testing: - It only occurred when a custom logback.xml was on the class path. - More specifically, it only happened when there was an “appender-ref” child element in the “root”. Commenting out the “appender-ref” element made it go away. - It only happened with Trident topologies. It did not happen with regular storm topologies. It is clearly a classloader issue, but I have no idea what the root cause is. My theory is that it is related to the fact that in 0.9.1 we had both the clojure source code as well as the AOT-compiled clojure classes in the storm-core jar. In 0.9.2-SNAPSHOT (latest master branch), we no longer package the clojure source code alongside the AOT-compiled classes. In 0.9.2, with the patch for STORM-290 applied, this problem goes away. Not absolutely sure why, but I’m glad it does. ;) - Taylor On Apr 21, 2014, at 9:09 PM, Adam Lewis m...@adamlewis.com wrote: The tests are in the test tree (which I don't think eclipse cares about, but should get treated properly on the command line). The scope wasn't provided, it was defaulted to runtime...and I've now corrected that and it doesn't help. Strangely, if I explicitly exclude logback-classic within the storm dependency, it does work as long as I also add a dependency on slf4j-simple...so I am still seeing weird logger induced classpath issues...so this is what is working for me: dependency groupIdorg.apache.storm/groupId artifactIdstorm-core/artifactId exclusions exclusion artifactIdlogback-classic/artifactId groupIdch.qos.logback/groupId /exclusion /exclusions /dependency dependency groupIdorg.slf4j/groupId artifactIdslf4j-simple/artifactId /dependency WEIRD! On Mon, Apr 21, 2014 at 8:54 PM, Jon Logan jmlo...@buffalo.edu wrote: Are your maven scopes right? The scope of the Storm dependency should be provided -- not runtime. Also be sure that your main method / unit test is under your test/ classpath, not your main/ classpath. On Mon, Apr 21, 2014 at 8:49 PM, Adam Lewis m...@adamlewis.com wrote: Are there other things that could cause this error? Since upgrading to 0.9.1-incubating, I've hit it twice. The first time I resolved it (in one project) by fixing an issue where two slf4j bindings were on the classpath together (strange, but it worked)...now I'm hitting the problem again in a different project and can't figure out what is causing the problem. This is for a test which is submitting a topology to a LocalCluster; the full trace follows (happens launching JUnit from Eclipse and from Maven command line) java.lang.RuntimeException: java.lang.ClassNotFoundException: backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917 at backtype.storm.utils.Utils.deserialize(Utils.java:88) at backtype.storm.daemon.nimbus$read_storm_conf.invoke(nimbus.clj:89) at backtype.storm.daemon.nimbus$start_storm.invoke(nimbus.clj:724) at backtype.storm.daemon.nimbus$eval3974$exec_fn__1459__auto__$reify__3987.submitTopologyWithOpts(nimbus.clj:962) at backtype.storm.daemon.nimbus$eval3974$exec_fn__1459__auto__$reify__3987.submitTopology(nimbus.clj:971) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) at backtype.storm.testing$submit_local_topology.invoke(testing.clj:253) at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:34) at backtype.storm.LocalCluster.submitTopology(Unknown Source) at com.acuitysds.trident.TestTimeseriesAssembly.testBasicTopology(TestTimeseriesAssembly.java:108) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
[ANNOUNCE] Storm Logo Contest Now Open
The storm logo contest is now open. If you or someone you know has great design skills, then this is the chance to help establish Storm’s brand with an awesome new logo. The contest is open now, and will run through 4/30/2014. More details are available on the storm website: http://bit.ly/1jwMPjy In order to attract interest from professional designers it would be great if Storm community members could use their social networks to help get the word out. - Taylor signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Topology is stuck
Hey Jason, Do you know a way to reliably reproduce this? If so can you share the steps? -Taylor On Apr 9, 2014, at 5:52 PM, Jason Jackson jasonj...@gmail.com wrote: Fyi we're using Summingbird in production not Trident. However summingbird does not give you exactly once semantics, it does give you a higher level of abstraction than Storm API though. On Wed, Apr 9, 2014 at 2:50 PM, Jason Jackson jasonj...@gmail.com wrote: I have one theory that because reads in zookeeper are eventually consistent, this is a necessary condition for the bug to manifest. So one way to test this hypothesis is to run a zookeeper ensemble with 1 node, or a zookeeper ensemble configured for 5 nodes, but take 2 of them offline, so that every write operation only succeeds if every member of the ensemble sees the write. This should produce strong consistent reads. If you run this test, let me know what the results are. (Clearly this isn't a good production system though as you're making a tradeoff for lower availability in favor of greater consistency, but the results could help narrow down the bug) On Wed, Apr 9, 2014 at 2:43 PM, Jason Jackson jasonj...@gmail.com wrote: Yah it's probably a bug in trident. It would be amazing if someone figured out the fix for this. I spent about 6 hours looking into, but couldn't figure out why it was occuring. Beyond fixing this, one thing you could do to buy yourself time is disable batch retries in trident. There's no option for this in the API, but it's like a 1 or 2 line change to the code. Obviously you loose exactly once semantics, but at least you would have a system that never falls behind real-time. On Wed, Apr 9, 2014 at 1:10 AM, Danijel Schiavuzzi dani...@schiavuzzi.com wrote: Thanks Jason. However, I don't think that was the case in my stuck topology, otherwise I'd have seen exceptions (thrown by my Trident functions) in the worker logs. On Wed, Apr 9, 2014 at 3:02 AM, Jason Jackson jasonj...@gmail.com wrote: An example of corrupted input that causes a batch to fail would be for example if you expected a schema to your data that you read off kafka, or some queue, and for whatever reason the data didn't conform to your schema and the function that you implement that you pass to stream.each throws an exception when this unexpected situation occurs. This would cause the batch to be retried, but it's deterministically failing, so the batch will be retried forever. On Mon, Apr 7, 2014 at 10:37 AM, Danijel Schiavuzzi dani...@schiavuzzi.com wrote: Hi Jason, Could you be more specific -- what do you mean by corrupted input? Do you mean that there's a bug in Trident itself that causes the tuples in a batch to somehow become corrupted? Thanks a lot! Danijel On Monday, April 7, 2014, Jason Jackson jasonj...@gmail.com wrote: This could happen if you have corrupted input that always causes a batch to fail and be retried. I have seen this behaviour before and I didn't see corrupted input. It might be a bug in trident, I'm not sure. If you figure it out please update this thread and/or submit a patch. On Mon, Mar 31, 2014 at 7:39 AM, Danijel Schiavuzzi dani...@schiavuzzi.com wrote: To (partially) answer my own question -- I still have no idea on the cause of the stuck topology, but re-submitting the topology helps -- after re-submitting my topology is now running normally. On Wed, Mar 26, 2014 at 6:04 PM, Danijel Schiavuzzi dani...@schiavuzzi.com wrote: Also, I did have multiple cases of my IBackingMap workers dying (because of RuntimeExceptions) but successfully restarting afterwards (I throw RuntimeExceptions in the BackingMap implementation as my strategy in rare SQL database deadlock situations to force a worker restart and to fail+retry the batch). From the logs, one such IBackingMap worker death (and subsequent restart) resulted in the Kafka spout re-emitting the pending tuple: 2014-03-22 16:26:43 s.k.t.TridentKafkaEmitter [INFO] re-emitting batch, attempt 29698959:736 This is of course the normal behavior of a transactional topology, but this is the first time I've encountered a case of a batch retrying indefinitely. This is especially suspicious since the topology has been running fine for 20 days straight, re-emitting batches and restarting IBackingMap workers quite a number of times. I can see in my IBackingMap backing SQL database that the batch with the exact txid value 29698959 has been committed -- but I suspect that could come from another BackingMap, since there are two BackingMap instances running (paralellismHint 2). However, I have no idea why the batch is being retried indefinitely now nor why it hasn't been successfully acked by Trident. Any suggestions on the area (topology component) to focus my research on? Thanks, On Wed, Mar 26, 2014 at 5:32 PM, Danijel Schiavuzzi
Re: Is Storm a right tool for processing of thousands of small tasks?
Have you considered using DRPC [1]? [1] http://storm.incubator.apache.org/documentation/Distributed-RPC.html On Mar 19, 2014, at 11:21 AM, Eugene Dzhurinsky jdeve...@gmail.com wrote: Hello! I'm evaluating Storm for the project, which involves processing of many distinct small tasks in the following way: - a user supplies some data source - spout is attached to the source and produces chunks of data to the topology - bolts are being processing the chunk of data and transform it somehow (in general reducing the number of chunks, so number of records in sink are much less than number of records out of the spout) - when all records are processed - the results are accumulated and sent back to the user. As far as I understand, a topology is supposed to be kept running forever, so I don't really see the easy way to distinguish the records from one task from records of another one. Should a new topology be started for each new task of a user? Thank you in advance! The links to any appropriate articles are very welcome :) -- Eugene N Dzhurinsky signature.asc Description: Message signed with OpenPGP using GPGMail
Re: nimbus.thrift.max_buffer_size
It uploads the file in small (1024*5 bytes) chunks. Does this happen every time (i.e. reproducible)? What is the size of your topology jar? Can you post the server side message (I want to see the length it output). - Taylor On Mar 18, 2014, at 3:40 PM, Adam Lewis superca...@gmail.com wrote: Upon upgrading from 0.9.0.1 to 0.9.1-incubating, I'm exceeding the new thrift max buffer size (nicely logged on the server side, although the client just gets a broken pipe stack trace form thrift) with an approx 6 MB message(!). Increasing the configured limit solves the problem, but I would have thought the 1MB default should be enough. Does the storm submitter encode the entire topology as a single thrift message? I'm really surprised that the message is coming out so large, my topology isn't exactly small, but it only has about 20 bolts...does anyone have any suggestions on how to determine why the message is so large? Is this within the realm of what others have seen or am I doing something wrong? Thanks, Adam signature.asc Description: Message signed with OpenPGP using GPGMail
Re: nimbus.thrift.max_buffer_size
Cool. I'm going back to the public list to share the knowledge. If your trident topology is compiling down to 35 bolts, then it sound like you have a lot of partitioning operations. Is there any way you can reduce that? That's going to introduce a decent amount of network transfer. And I would definitely try to figure out what's making the serialized topology so big. In that department, the prepare() method is your friend in terms of initializing state. -Taylor On Mar 18, 2014, at 7:37 PM, Adam Lewis gm...@adamlewis.com wrote: Sure, sharing with the list is a great idea. I already checked the log excerpts I sent you to ensure they are clean of anything proprietary so all good there. I did a little more digging and it seems there is definitely something...interesting...happening in the compiling process. My uncompiled topology is pretty small memory wise (according to the profiler), but once it gets built into the generated storm bolts, it becomes 35 bolts each of which serializes to about 200kb...I was struck by how small the variance is amongst the serialized sizes, as if something big-ish is getting duplicated into each bolt. Some more experimentation revealed that part of the problem may be that I have multiple DRPC spouts in a single topology. I noticed super-linear growth in serialized topology size for each additional DRPC spout I create. I'm thinking I will break each DRPC spout into its own topology, which I've needed to do anyway since right now they seem to block each other from processing even though I don't need requests to separate DRPCs to be wholly ordered in the way I imagine trident would try to do. On Tue, Mar 18, 2014 at 7:24 PM, P. Taylor Goetz ptgo...@gmail.com wrote: Glad I could help. Trident does a lot in terms of compiling down to storms primitive structures. The relevant classes are Stream and TridentTopology in the storm.trident package (if my memory serves me correctly, I'm not at a computer...). It's not for the faint of heart, but once you start to wrap your head around it, it pretty cool. Do you mind if I share this thread with the user group? Some of the information could be beneficial to others. At the very least I think we should document what you found... And the Apache line is always If it didn't happen on the list, it didn't happen. -Taylor On Mar 18, 2014, at 6:04 PM, Adam Lewis gm...@adamlewis.com wrote: Hi Taylor, The submitter isn't multithreaded or anything funky...it is basically a typical submitter from the docs that just happens to call submit multiple times. I did just check and the java-serialized topology and it is indeed the culprit (6662137 bytes, 17k larger than the thrift message). I can now look for any obvious bugs in my topology component's non-transient state, but I'm wondering if there is a more systematic way? These topologies are all trident built, so the indirection that occurs between the trident DSL API and the final storm topology is a bit opaque to me. The generated thrift classes make this difficult to penetrate from the other direction. Do you have any suggestion on where I can look to better understand the overall building process? Thanks a lot for your help. Adam On Tue, Mar 18, 2014 at 5:40 PM, P. Taylor Goetz ptgo...@gmail.com wrote: Just to give you a little background… the thrift max buffer size was introduced to prevent a situation where a wayward connection (SSH, telnet, security port scanner, etc.) to the nimbus thrift port would cause nimbus to hang. When you submit a topology, three things get sent over thrift: 1. the topology jar file (big — so it gets uploaded in small chunks) 2. the serialized Config object (typically a relatively small bit of JSON) 3. the serialized Topology itself (this all depends…) My initial guess is that #3 is potentially really big (as you mention). You could test this by serializing the topology to disk to see how big it is. My second guess is that something abut your submitter app might be overflowing thrifts server-side buffer. Is it multi-threaded and submitting topos in parallel? If you submit that topology with the storm jar command, do you still get the error? - Taylor On Mar 18, 2014, at 5:17 PM, Adam Lewis gm...@adamlewis.com wrote: I noticed that StormSubmitter has a code path for only doing a single upload of the jar file if it has already uploaded to master (StormSubmitter.java:142), but the only case where that will actually happen is if you submit multiple topologies during the lifetime of the VM started with the storm jar command (since submittedJar is just a private static field). My main() class (passed to the storm jar command) simply delegates to a series of classes which build StormTopology instances, and then calls StormSubmitter.submitTopology(..) in a loop in turn with each
Re: Nimbus Fails After Uploading Topology - Reading Too Large of Frame Size
Hi Robert, let me know if you experience the same issue with 0.9.0.1. One thing that caught my eye was this: 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame size of 2064605, which is bigger than the maximum allowable buffer size for ALL connections. In 0.9.1 that can indicate that something other than the thrift client (like ssh, telnet, or a security scanner) is accessing nimbus’ thrift port. More details here (https://github.com/apache/incubator-storm/pull/3). The fact that you can run a different topology class from the same jar file suggests that it is not a problem with the upload process. Is it possible that something in your topology (like a cassandra client, etc.) is misconfigured to point to nimbus’ host and port? - Taylor On Mar 11, 2014, at 7:48 PM, Robert Lee lee.robert...@gmail.com wrote: I will downgrade to storm-0.9.0.1 and see if the error persists in that version as well. On Tue, Mar 11, 2014 at 7:47 PM, Robert Lee lee.robert...@gmail.com wrote: Yes -- more details: Storm version: 0.9.1-incubating installed using a variant of your storm-vagrant deployment (https://github.com/ptgoetz/storm-vagrant). Cluster setup: two supervisor nodes with 1024m, nimbus with 1024m, zookeeper (3.3.5) 512mb node, and a kafka (0.8.0) 512mb node. Persisting to a local cassandra cluster. Here's an example topology I'm running. This topology works both in local and distributed mode. A variant of this topology (more persisting and more complicated functions on the kafka stream) works in local mode but gives the thrift error reported above when submitting. public class SentenceAggregationTopology { private final BrokerHosts brokerHosts; public SentenceAggregationTopology(String kafkaZookeeper) { brokerHosts = new ZkHosts(kafkaZookeeper); } public StormTopology buildTopology() { return buildTopology(null); } public StormTopology buildTopology(LocalDRPC drpc) { TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, storm-sentence, storm); kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig); KafkaSentenceMapper mapper = new KafkaSentenceMapper(playlist, testtable, word, count); TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream(kafka, kafkaSpout).shuffle(). each(new Fields(str), new WordSplit(), new Fields(word)). groupBy(new Fields(word)). persistentAggregate( CassandraBackingMap.nonTransactional(mapper), new Count(), new Fields(aggregates_words)) .parallelismHint(2); topology.newDRPCStream(words, drpc) .each(new Fields(args), new Split(), new Fields(word)) .groupBy(new Fields(word)) .stateQuery(wordCounts, new Fields(word), new MapGet(), new Fields(count)) .each(new Fields(count), new FilterNull()) .aggregate(new Fields(count), new Sum(), new Fields(sum)); return topology.build(); } public static void main(String[] args) throws Exception { final int TIME_INTERVAL_IN_MILLIS = 1000; String kafkaZk = args[0]; SentenceAggregationTopology sentenceAggregationTopology = new SentenceAggregationTopology(kafkaZk); Config config = new Config(); config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, TIME_INTERVAL_IN_MILLIS); config.put(Configuration.CASSANDRA_CQL_HOSTS_KEY, args[1]); if (args != null args.length 2) { String name = args[2]; config.setNumWorkers(4); config.setMaxTaskParallelism(4); StormSubmitter.submitTopology(name, config, sentenceAggregationTopology.buildTopology()); } else { LocalDRPC drpc = new LocalDRPC(); config.setNumWorkers(2); config.setDebug(true); config.setMaxTaskParallelism(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(kafka, config, sentenceAggregationTopology.buildTopology(drpc)); while (true) { System.out.println(Word count: + drpc.execute(words, the)); Utils.sleep(TIME_INTERVAL_IN_MILLIS); } } } } On Tue, Mar 11, 2014 at 7:33 PM, P. Taylor Goetz ptgo...@gmail.com wrote: Hi Robert, Can you provide additional details, like what storm version you are using, etc.? -Taylor On Mar 11, 2014, at 6:57 PM, Robert Lee lee.robert...@gmail.com wrote: After submitting my topology via the storm jar command: 562 [main] INFO backtype.storm.StormSubmitter - Uploading
Re: [DISCUSS] Pulling Contrib Modules into Apache
. Right now people seem to be cobbling together their test code by figuring out how the 1-year old code in [6] actually works, and copy-pasting other people's test code from GitHub. -- As I said above, these are my personal $.02. I admit that my comments go a bit beyond the original question of bringing in contrib modules -- it think implicitly the discussion about the contrib modules also means what do you need to provide a better and more well-rounded experience, i.e. the question whether to have batteries included or not. (As you may suspect I'm leaning towards included at least the most important batteries, though what's really important for on the project-level is of course up to debate.) On my side I'd be happy to help with those areas where I am able to contribute, whether that's code and examples (like storm-starter) or tutorials/docs (I already wrote e.g. [7] and [8]). Again, thanks Taylor for starting this discussion. No matter the actual outcome I'm sure the state of the project will be improved. Best, Michael [1] https://github.com/elasticsearch/logstash [2] http://mesosphere.io/learn/run-storm-on-mesos/#step-7 [3] https://github.com/miguno/puppet-storm [4] https://github.com/apache/incubator-aurora/blob/master/docs/vagrant.md [5] http://storm.incubator.apache.org/documentation/FAQ.html [6] https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java [7] https://github.com/nathanmarz/storm/wiki/Understanding-the-parallelism-of-a-Storm-topology [8] http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/ On 02/26/2014 08:21 PM, P. Taylor Goetz wrote: Thanks for the feedback Bobby. To clarify, I’m mainly talking about spout/bolt/trident state implementations that integrate storm with *Technology X*, where *Technology X* is not a fundamental part of storm. Examples would be technologies that are part of or related to the Hadoop/Big Data ecosystem and enable the Lamda Architecture, e.g.: Kafka, HDFS, HBase, Cassandra, etc. The idea behind having one or more Storm committers act as a “sponsor” is to make sure new additions are done carefully and with good reason. To add a new module, it would require committer/PPMC consensus, and assignment of one or more sponsors. Part of a sponsor’s job would be to ensure that a module is maintained, which would require enough familiarity with the code so support it long term. If a new module was proposed, but no committers were willing to act as a sponsor, it would not be added. It would be the Committers’/PPMC’s responsibly to make sure things didn’t get out of hand, and to do something about it if it does. Here’s an old Hadoop JIRA thread [1] discussing the addition of Hive as a contrib module, similar to what happened with HBase as Bobby pointed out. Some interesting points are brought up. The difference here is that both HBase and Hive were pretty big codebases relative to Hadoop. With spout/bolt/state implementations I doubt we’d see anything along that scale. - Taylor [1] https://issues.apache.org/jira/browse/HADOOP-3601 On Feb 26, 2014, at 12:35 PM, Bobby Evans ev...@yahoo-inc.com mailto:ev...@yahoo-inc.com wrote: I can see a lot of value in having a distribution of storm that comes with batteries included, everything is tested together and you know it works. But I don’t see much long term developer benefit in building them all together. If there is strong coupling between storm and these external projects so that they break when storm changes then we need to understand the coupling and decide if we want to reduce that coupling by stabilizing APIs, improving version numbering and release process, etc.; or if the functionality is something that should be offered as a base service in storm. I can see politically the value of giving these other projects a home in Apache, and making them sub-projects is the simplest route to that. I’d love to have storm on yarn inside Apache. I just don’t want to go overboard with it. There was a time when HBase was a “contrib” module under Hadoop along with a lot of other things, and the Apache board came and told Hadoop to brake it up. Bringing storm-kafka into storm does not sound like it will solve much from a developer’s perspective, because there is at least as much coupling with kafka as there is with storm. I can see how it is a huge amount of overhead and pain to set up a new project just for a few hundred lines of code, as such I am in favor of pulling in closely related projects, especially those that are spouts and state implementations. I just want to be sure that we do it carefully, with a good reason, and with enough people who are familiar with the code to support it long term. If it starts to look like we are pulling in too many projects perhaps we should look at something more like the bigtop project https
Re: Nimbus Fails After Uploading Topology - Reading Too Large of Frame Size
Hi Robert, Can you provide additional details, like what storm version you are using, etc.? -Taylor On Mar 11, 2014, at 6:57 PM, Robert Lee lee.robert...@gmail.com wrote: After submitting my topology via the storm jar command: 562 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar storm-kafka-cassandra-0.1.0-SNAPSHOT-jar-with-dependencies.jar to assigned location: storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar 2307 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar 2307 [main] INFO backtype.storm.StormSubmitter - Submitting topology test in distributed mode with conf {topology.max.task.parallelism:4,topology.workers:2,topology.debug:true,topology.trident.batch.emit.interval.millis:5000,} Exception in thread main java.lang.RuntimeException: org.apache.thrift7.transport.TTransportException: java.net.SocketException: Connection reset at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:95) at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:41) at com.sentaware.sentalytics.storm.trident.TwitterTopology.main(TwitterTopology.java:180) Caused by: org.apache.thrift7.transport.TTransportException: java.net.SocketException: Connection reset at org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147) at org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157) at org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65) at backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:139) at backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:128) at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:81) ... 2 more Caused by: java.net.SocketException: Connection reset at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118) at java.net.SocketOutputStream.write(SocketOutputStream.java:159) at org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145) ... 7 more storm@nimbus:~$ tail /var/log/storm/nimbus.log 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from client: storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from client: storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame size of 2064605, which is bigger than the maximum allowable buffer size for ALL connections. Thoughts on how to proceed? I tried boosting memory from 256mb to 1024mb on the nimbus and supervisor nodes with no luck. The jar file is roughly 18MB in size and I can run another topology within the jar fine but the one I want to run (more complex) fails.
Re: Error when trying to use multilang in a project built from scratch (not storm-starter)
I don't have access to a windows machine at the moment, but does this help? http://support.microsoft.com/kb/832434 On Mar 10, 2014, at 4:51 PM, Chris James chris.james.cont...@gmail.com wrote: Reposting since I posted this before at a poor time and got no response. I'm trying out a storm project built from scratch in Java, but with a Python bolt. I have everything running with all Java spouts/bolts just fine, but when I try to incorporate a python bolt I am running into issues. I have my project separated into a /storm/ for topologies, /storm/bolts/ for bolts, /storm/spouts for spouts, and /storm/multilang/ for the multilang wrappers. Right now the only thing in /storm/multilang/ is storm.py, copied and pasted from the storm-starter project. In my bolts folder, I have a dummy bolt set up that just prints the tuple. I've virtually mimicked the storm-starter WordCountTopology example for using a python bolt, so I think the code is OK and the configuration is the issue. So my question is simple. What configuration steps do I have to set up so that my topology knows where to look to find storm.py when I run super(python, dummypythonbolt.py)? I noticed an error in the stack trace claiming that it could not run python (python is definitely on my path and I use it everyday), and that is looking in a resources folder that does not exist. Here is the line in question: Caused by: java.io.IOException: Cannot run program python (in directory C:\Users\chris\AppData\Local\Temp\67daff0e-7348-46ee-9b62-83f8ee4e431c\supervisor\stormdist\dummy-topology-1-1394418571\resources): CreateProcess error=267, The directory name is invalid A more extensive stack trace is here: http://pastebin.com/6yx97m0M So once again: what is the configuration step that I am missing to allow my topology to see storm.py and be able to run multilang spouts/bolts in my topology? Thanks!
Re: [DISCUSS] Pulling Contrib Modules into Apache
in my Storm blog posts, for instance, and others like Mesosphere.io are even using storm-starter for their commercial offerings [2]). [Have Storm up and running faster than you can brew an espresso] Third, for the same reason (get people up and running in a few minutes), I do like that other people in this thread have been bringing up projects like storm-deploy. For the same reason I have open sourced puppet-storm [3] (and puppet-kafka, for that matter) a few days ago, and I'll soon open source another Vagrant/Puppet based tool that provides you with 1-click local and remote deployments of Storm and Kafka clusters. That's way better IMHO than having to follow long articles or blog posts to deploy your first cluster. And there are a number of other people that have been rolling their own variants. Now don't get me wrong -- I don't mention this to pitch any of those tools. My intention is to say that it would be greatly helpful to have /something/ like this for Storm, for the same reason that it's nice to have LocalCluster for unit testing. I have been demo'ing both Storm and Kafka by launching clusters with a simple command line, which always gets people excited. If they can then rely on existing examples (see above) to also /run/ an analysis on their cluster then they have a beautiful start. Oh, and btw: Apache Aurora (with Mesos) have such a Vagrant-based VM cluster setup, too [4] so that people can run the Aurora tutorial on their machines in a few minutes. [Storm and YARN] Fourth, and for similar reasons as #2 and #3, bringing in storm-yarn would be nice. It ties into being able to run LocalCluster as well as to run Storm in local or remote VMs -- but now alongside your existing Hadoop/YARN infrastructure. For those preferring Mesos Storm-on-Mesos will surely be similarly attractive. On a related note bringing the Storm docs up to speed with the quality of the Storm code would also be great. I have seen that since Storm moved to Incubator several new sections have been added such as the FAQ [5] (btw: nice!). Similarly, there should be better examples and docs for users how to write unit tests for Storm. Right now people seem to be cobbling together their test code by figuring out how the 1-year old code in [6] actually works, and copy-pasting other people's test code from GitHub. -- As I said above, these are my personal $.02. I admit that my comments go a bit beyond the original question of bringing in contrib modules -- it think implicitly the discussion about the contrib modules also means what do you need to provide a better and more well-rounded experience, i.e. the question whether to have batteries included or not. (As you may suspect I'm leaning towards included at least the most important batteries, though what's really important for on the project-level is of course up to debate.) On my side I'd be happy to help with those areas where I am able to contribute, whether that's code and examples (like storm-starter) or tutorials/docs (I already wrote e.g. [7] and [8]). Again, thanks Taylor for starting this discussion. No matter the actual outcome I'm sure the state of the project will be improved. Best, Michael [1] https://github.com/elasticsearch/logstash [2] http://mesosphere.io/learn/run-storm-on-mesos/#step-7 [3] https://github.com/miguno/puppet-storm [4] https://github.com/apache/incubator-aurora/blob/master/docs/vagrant.md [5] http://storm.incubator.apache.org/documentation/FAQ.html [6] https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java [7] https://github.com/nathanmarz/storm/wiki/Understanding-the-parallelism-of-a-Storm-topology [8] http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/ On 02/26/2014 08:21 PM, P. Taylor Goetz wrote: Thanks for the feedback Bobby. To clarify, I’m mainly talking about spout/bolt/trident state implementations that integrate storm with *Technology X*, where *Technology X* is not a fundamental part of storm. Examples would be technologies that are part of or related to the Hadoop/Big Data ecosystem and enable the Lamda Architecture, e.g.: Kafka, HDFS, HBase, Cassandra, etc. The idea behind having one or more Storm committers act as a “sponsor” is to make sure new additions are done carefully and with good reason. To add a new module, it would require committer/PPMC consensus, and assignment of one or more sponsors. Part of a sponsor’s job would be to ensure that a module is maintained, which would require enough familiarity with the code so support it long term. If a new module was proposed, but no committers were willing to act as a sponsor, it would not be added. It would be the Committers’/PPMC’s responsibly to make sure things didn’t get out of hand, and to do something about it if it does. Here’s an old Hadoop JIRA thread [1] discussing
Re: [DISCUSS] Pulling Contrib Modules into Apache
and governance added to the contributing modules. Look at the benefit a tool like npm brings to the node community. I like the idea of sponsorship, vetting and a community vote. I, as sure many would be, am willing to offer support and time to working through how to set this up and helping with the implementation if it is decided to pursue some solution. I hope these views are taken in the sprit they are made, to make this incredible system even better along with the surrounding eco-system. Thanks, Brian On Tue, Feb 25, 2014 at 9:36 PM, P. Taylor Goetz ptgo...@gmail.commailto:ptgo...@gmail.com wrote: Just to be clear (and play a little Devil’s advocate :) ), I’m not suggesting that whatever a “contrib” project/module/subproject might become, be a clearinghouse for anything Storm-related. I see it as something that is well-vetted by the Storm community, subject to PPMC review, vote, etc. Entry would require community review, PPMC review, and in some cases ASF IP clearance/legal review. Anything added would require some level of commitment from the PPMC/committers to provide some level of support. In other words, nothing “willy-nilly”. One option could be that any module added require (X 0) number of committers to volunteer as “sponsor”s for the module, and commit to maintaining it. That being said, I don’t see storm-kafka being any different from anything else that provides integration points for Storm. -Taylor On Feb 25, 2014, at 7:53 PM, Nathan Marz nat...@nathanmarz.commailto:nat...@nathanmarz.com wrote: I'm only +1 for pulling in storm-kafka and updating it. Other projects put these contrib modules in a contrib folder and keep them managed as completely separate codebases. As it's not actually a module necessary for Storm, there's an argument there for doing it that way rather than via the multi-module route. On Tue, Feb 25, 2014 at 4:39 PM, Milinda Pathirage mpath...@umail.iu.edumailto:mpath...@umail.iu.edu wrote: Hi Taylor, I'm +1 for pulling these external libraries into Apache codebase. This will certainly benifit Strom community. I also like to contribute to this process. Thanks Milinda On Tue, Feb 25, 2014 at 5:28 PM, P. Taylor Goetz ptgo...@gmail.commailto:ptgo...@gmail.com wrote: A while back I opened STORM-206 [1] to capture ideas for pulling in contrib modules to the Apache codebase. In the past, we had the storm-contrib github project [2] which subsequently got broken up into individual projects hosted on the stormprocessor github group [3] and elsewhere. The problem with this approach is that in certain cases it led to code rot (modules not being updated in step with Storm's API), fragmentation (multiple similar modules with the same name), and confusion. A good example of this is the storm-kafka module [4], since it is a widely used component. Because storm-contrib wasn't being tagged in github, a lot of users had trouble reconciling with which versions of storm it was compatible. Some users built off specific commit hashes, some forked, and a few even pushed custom builds to repositories such as clojars. With kafka 0.8 now available, there are two main storm-kafka projects, the original (compatible with kafka 0.7) and an updated fork [5] (compatible with kafka 0.8). My intention is not to find fault in any way, but rather to point out the resulting pain, and work toward a better solution. I think it would be beneficial to the Storm user community to have certain commonly used modules like storm-kafka brought into the Apache Storm project. Another benefit worth considering is the licensing/legal oversight that the ASF provides, which is important to many users. If this is something we want to do, then the big question becomes what sort governance process needs to be established to ensure that such things are properly maintained. Some random thoughts, questions, etc. that jump to mind include: What to call these things: contib modules, connectors, integration modules, etc.? Build integration: I imagine they would be a multi-module submodule of the main maven build. Probably turned off by default and enabled by a maven profile. Governance: Have one or more committer volunteers responsible for maintenance, merging patches, etc.? Proposal process for pulling new modules? I look forward to hearing others' opinions. - Taylor [1] https://issues.apache.org/jira/browse/STORM-206 [2] https://github.com/nathanmarz/storm-contrib [3] https://github.com/stormprocessor [4] https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka [5] https://github.com/wurstmeister/storm-kafka-0.8-plus -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.orghttp
[DISCUSS] Pulling Contrib Modules into Apache
A while back I opened STORM-206 [1] to capture ideas for pulling in “contrib” modules to the Apache codebase. In the past, we had the storm-contrib github project [2] which subsequently got broken up into individual projects hosted on the stormprocessor github group [3] and elsewhere. The problem with this approach is that in certain cases it led to code rot (modules not being updated in step with Storm’s API), fragmentation (multiple similar modules with the same name), and confusion. A good example of this is the storm-kafka module [4], since it is a widely used component. Because storm-contrib wasn’t being tagged in github, a lot of users had trouble reconciling with which versions of storm it was compatible. Some users built off specific commit hashes, some forked, and a few even pushed custom builds to repositories such as clojars. With kafka 0.8 now available, there are two main storm-kafka projects, the original (compatible with kafka 0.7) and an updated fork [5] (compatible with kafka 0.8). My intention is not to find fault in any way, but rather to point out the resulting pain, and work toward a better solution. I think it would be beneficial to the Storm user community to have certain commonly used modules like storm-kafka brought into the Apache Storm project. Another benefit worth considering is the licensing/legal oversight that the ASF provides, which is important to many users. If this is something we want to do, then the big question becomes what sort governance process needs to be established to ensure that such things are properly maintained. Some random thoughts, questions, etc. that jump to mind include: What to call these things: “contib modules”, “connectors”, “integration modules”, etc.? Build integration: I imagine they would be a multi-module submodule of the main maven build. Probably turned off by default and enabled by a maven profile. Governance: Have one or more committer volunteers responsible for maintenance, merging patches, etc.? Proposal process for pulling new modules? I look forward to hearing others’ opinions. - Taylor [1] https://issues.apache.org/jira/browse/STORM-206 [2] https://github.com/nathanmarz/storm-contrib [3] https://github.com/stormprocessor [4] https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka [5] https://github.com/wurstmeister/storm-kafka-0.8-plus signature.asc Description: Message signed with OpenPGP using GPGMail
Re: [DISCUSS] Pulling Contrib Modules into Apache
Just to be clear (and play a little Devil’s advocate :) ), I’m not suggesting that whatever a “contrib” project/module/subproject might become, be a clearinghouse for anything Storm-related. I see it as something that is well-vetted by the Storm community, subject to PPMC review, vote, etc. Entry would require community review, PPMC review, and in some cases ASF IP clearance/legal review. Anything added would require some level of commitment from the PPMC/committers to provide some level of support. In other words, nothing “willy-nilly”. One option could be that any module added require (X 0) number of committers to volunteer as “sponsor”s for the module, and commit to maintaining it. That being said, I don’t see storm-kafka being any different from anything else that provides integration points for Storm. -Taylor On Feb 25, 2014, at 7:53 PM, Nathan Marz nat...@nathanmarz.com wrote: I'm only +1 for pulling in storm-kafka and updating it. Other projects put these contrib modules in a contrib folder and keep them managed as completely separate codebases. As it's not actually a module necessary for Storm, there's an argument there for doing it that way rather than via the multi-module route. On Tue, Feb 25, 2014 at 4:39 PM, Milinda Pathirage mpath...@umail.iu.edu wrote: Hi Taylor, I'm +1 for pulling these external libraries into Apache codebase. This will certainly benifit Strom community. I also like to contribute to this process. Thanks Milinda On Tue, Feb 25, 2014 at 5:28 PM, P. Taylor Goetz ptgo...@gmail.com wrote: A while back I opened STORM-206 [1] to capture ideas for pulling in contrib modules to the Apache codebase. In the past, we had the storm-contrib github project [2] which subsequently got broken up into individual projects hosted on the stormprocessor github group [3] and elsewhere. The problem with this approach is that in certain cases it led to code rot (modules not being updated in step with Storm's API), fragmentation (multiple similar modules with the same name), and confusion. A good example of this is the storm-kafka module [4], since it is a widely used component. Because storm-contrib wasn't being tagged in github, a lot of users had trouble reconciling with which versions of storm it was compatible. Some users built off specific commit hashes, some forked, and a few even pushed custom builds to repositories such as clojars. With kafka 0.8 now available, there are two main storm-kafka projects, the original (compatible with kafka 0.7) and an updated fork [5] (compatible with kafka 0.8). My intention is not to find fault in any way, but rather to point out the resulting pain, and work toward a better solution. I think it would be beneficial to the Storm user community to have certain commonly used modules like storm-kafka brought into the Apache Storm project. Another benefit worth considering is the licensing/legal oversight that the ASF provides, which is important to many users. If this is something we want to do, then the big question becomes what sort governance process needs to be established to ensure that such things are properly maintained. Some random thoughts, questions, etc. that jump to mind include: What to call these things: contib modules, connectors, integration modules, etc.? Build integration: I imagine they would be a multi-module submodule of the main maven build. Probably turned off by default and enabled by a maven profile. Governance: Have one or more committer volunteers responsible for maintenance, merging patches, etc.? Proposal process for pulling new modules? I look forward to hearing others' opinions. - Taylor [1] https://issues.apache.org/jira/browse/STORM-206 [2] https://github.com/nathanmarz/storm-contrib [3] https://github.com/stormprocessor [4] https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka [5] https://github.com/wurstmeister/storm-kafka-0.8-plus -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org -- Twitter: @nathanmarz http://nathanmarz.com signature.asc Description: Message signed with OpenPGP using GPGMail
Re: storm version 0.9.0.1 and zeromq 2.2.0 compatibility
Hi Andrew, There’s not a way that I know of to use multiple versions of zeromq with Storms zeromq transport. I think your best option is to use Storm’s Netty transport to avoid zeromq version conflicts. - Taylor On Feb 19, 2014, at 11:06 AM, Andrew Milkowski amgm2...@gmail.com wrote: thanks so much bijoy, will need to do netty , because one of my spouts needs to read from certain version of zeromq! I have bo choice but use 2.2 zeromq version... unless there is a way to have storm executing one version 2.1.7 and spout another such as 2.2.2 thanks for netty tip trying it now, but if you know above, appreciate it! On Wed, Feb 19, 2014 at 10:57 AM, bijoy deb bijoy.comput...@gmail.com wrote: Hi, Try using zeromq 2.1.7 with the given Storm version.Storm seems to have compatibility issues with later versions of zeromq. Also, to avoid this version compatibility problem,you can also switch to using Netty transport instead of zeromq by setting certain parameters in storm.yaml file.See thye Storm 0.9 documentation page on Github. Thanks Bijoy On Wed, Feb 19, 2014 at 9:21 PM, Andrew Milkowski amgm2...@gmail.com wrote: I have tried running zeromq-2.2.0 with storm version 0.9.0.1 java.library.path: /opt/local/src/zeromq/zeromq-2.2.0/dist/lib:/opt/local/src/git/jzmq/dist/lib but ran into sigfault below (are these versions compatible) thanks! Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code) C [libzmq.so.1+0x1ac60] zmq::signaler_t::signaler_t()+0x1e0 Java frames: (J=compiled Java code, j=interpreted, Vv=VM code) j org.zeromq.ZMQ$Socket.setLongSockopt(IJ)V+0 j org.zeromq.ZMQ$Socket.setHWM(J)V+16 j zilch.mq$set_hwm.invoke(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;+25 j backtype.storm.messaging.zmq.ZMQContext.connect(Ljava/lang/String;Ljava/lang/String;I)Lbacktype/storm/messaging/IConnection;+82 j backtype.storm.daemon.worker$mk_refresh_connections$this__5827$iter__5834__5838$fn__5839.invoke()Ljava/lang/Object;+474 J clojure.lang.LazySeq.sval()Ljava/lang/Object; j clojure.lang.LazySeq.seq()Lclojure/lang/ISeq;+1 J clojure.lang.RT.seq(Ljava/lang/Object;)Lclojure/lang/ISeq; J clojure.core$seq.invoke(Ljava/lang/Object;)Ljava/lang/Object; j clojure.core$dorun.invoke(Ljava/lang/Object;)Ljava/lang/Object;+10 j clojure.core$doall.invoke(Ljava/lang/Object;)Ljava/lang/Object;+10 j backtype.storm.daemon.worker$mk_refresh_connections$this__5827.invoke(Ljava/lang/Object;)Ljava/lang/Object;+512 j backtype.storm.daemon.worker$fn__5882$exec_fn__1229__auto5883.invoke(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;+62 signature.asc Description: Message signed with OpenPGP using GPGMail
Re: How to consume from last offset when topology restarts(STORM-KAFKA)
If turn off forceOffsetTime, it should resume from the last offset stored in zookeeper. - Taylor On Feb 16, 2014, at 12:35 PM, Chitra Raveendran chitra.raveend...@flutura.com wrote: Hi So according to this logic I should set the timestamp parameter to the value when the topology was stopped ? But how do we identify the exact instance when the topology went down, so that storm could start consuming from then ? Is it based on approximation ? Or is there some concrete way to find the exact instance when the topology was down? Is there any other parameter which is based on last offset and not time? Regards Chitra On Feb 16, 2014 11:00 PM, Vinoth Kumar Kannan vinot...@gmail.com wrote: forceStartOffsetTime value can be -2, -1, or a time stamp in milliseconds -1 to read the latest offset of the topic -2 to read from the beginning. timestamp to read from a specific time On Sun, Feb 16, 2014 at 6:15 PM, Chitra Raveendran chitra.raveend...@flutura.com wrote: Any body ?? Any answers !!! I'm sure someone would have a work around ! Please help :) On Feb 15, 2014 12:11 AM, Andrey Yegorov andrey.yego...@gmail.com wrote: I have exactly the same question. I am using kafka spout from https://github.com/wurstmeister/storm-kafka-0.8-plus.git with kafka 0.8 release and ordinary (non-trident) storm topology. How can I guarantee processing of messages sent while topology was down or while e.g. storm cluster was down for maintenance? -- Andrey Yegorov On Wed, Feb 12, 2014 at 8:05 AM, Danijel Schiavuzzi dschi...@gmail.com wrote: Hi Chitra, Which Kafka spout version are you exactly using, and what spout type -- Trident or the ordinary Storm spout? I ask that because, unfortunately, there are multiple Kafka spout versions around the web. According to my research, your best bet is the one in storm-contrib in case you use Kafka version 0.7, and storm-kafka-0.8-plus in case you use Kafka 0.8. Best regards, Danijel Schiavuzzi www.schiavuzzi.com On Wed, Feb 12, 2014 at 8:42 AM, Chitra Raveendran chitra.raveend...@flutura.com wrote: Hi I have a topology in production which uses the default kafka spout, I have set this parameter spoutConfig.forceStartOffsetTime(-1); This parameter -1 helps me in such a way that, it consumes from the latest message, and doesn't start reading data from kafka right from the beginning (That would be unnecessary and redundant in my usecase). But in production, whenever a new release goes in, I stop and start the topology which would take a few seconds to minutes. I have been loosing out on some data during the time that the topology is down. How can I avoid this. I have tried running without the ForcedOffsetTime parameter, but that did not work. What am I doing wrong, how can I continue reading from the last offest ? Thanks, Chitra -- Danijel Schiavuzzi -- With Regards, Vinoth Kumar K signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Storm 0.9.0.1 and Zookeeper 3.4.5 hung issue.
Look for project.clj in storm-core. -Taylor On Feb 14, 2014, at 6:12 PM, Saurabh Agarwal (BLOOMBERG/ 731 LEXIN) sagarwal...@bloomberg.net wrote: After adding the changes, I am building storm 0.9.0.1 source code using Leiningen . Can anyone point me out from where lein pick up the classpath? I am trying to change from zookeeper 3.3.3 to 3.4.5, but unable to find how to change it. - Original Message - From: der...@yahoo-inc.com To: SAURABH AGARWAL (BLOOMBERG/ 731 LEXIN), user@storm.incubator.apache.org At: Feb 14 2014 13:17:04 Some changes to storm code are necessary for this. See https://github.com/apache/incubator-storm/pull/29/files -- Derek On 2/14/14, 11:50, Saurabh Agarwal (BLOOMBERG/ 731 LEXIN) wrote: Thanks Bijoy for reply. We can't downgrade to 3.3.3 as our system has zookeeper 3.4.5 server running. and we would like to keep same version of zookeeper client to avoid any incompatibility issues. The error we are getting with 3.4.5 is. Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.server.NIOServerCnxn$Factory After looking at zookeeper code, static Factory class within NIOSeverCnxn class has been removed in 3.4.5 version. zookeeper version 3.3.3 is 3 years old. Should not Storm be updated the code to run with zookeeper latest version. Should I create a jira for this? - Original Message - From: user@storm.incubator.apache.org To: SAURABH AGARWAL (BLOOMBERG/ 731 LEXIN), user@storm.incubator.apache.org At: Feb 14 2014 11:45:50 Hi, We had also downgraded zookeeper from 3.4.5 to 3.3.3 due to issues with Storm.But we are not facing any issues related to Kafka after the downgrade.We are using Storm 0.9.0-rc2 and Kafka 0.8.0. Thanks Bijoy On Fri, Feb 14, 2014 at 9:57 PM, Saurabh Agarwal (BLOOMBERG/ 731 LEXIN) sagarwal...@bloomberg.net wrote: Hi, Storm 0.9.0.1 client linked with zookeeper 3.4.5 library hung on zookeeper initialize. Is it known issue? 453 [main] INFO org.apache.zookeeper.server.ZooKeeperServer - init - Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 4 datadir /tmp/7b520ac7-ff87-4eb6-9fc5-3a16deec0272/version-2 snapdir /tmp/7b520ac7-ff87-4eb6-9fc5-3a16deec0272/version-2 The client works fine with zookeeper 3.3.3. As we are using storm with kafka, kafka does not work with zookeeper 3.3.3 but work with 3.4.5. any help is appreciated... Thanks, Saurabh.
Re: Is there a way to add a custom jar (or directory of jars) to the Storm classpath without copying the jar(s) to Storm lib folder
Bijoy, Out of curiosity, why can’t you bundle your dependencies with the topology jar? If you have to then you can put your topology dependencies in $STORM_HOME/lib, but you should be careful with replacing Storm’s core dependencies since it could lead to unpredictable results. - Taylor On Feb 7, 2014, at 8:48 AM, bijoy deb bijoy.comput...@gmail.com wrote: Hi, Is there any way to add a specific directory to the Storm classpath rather than copying the required jars to Storm lib directory.For me,bundling the required jars with the actual topology jar is not an option? Can I add a directory (of jars) into the Storm lib rather than copying the actual jar files there? Thanks Bijoy signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Svend's blog - several questions
Thanks Svend. Good explanation. Adrian, The storm-cassandra documentation could be better in terms of explaining how to use the MapState implementation, but theres a unit test that demonstrates basic usage: https://github.com/hmsonline/storm-cassandra/blob/master/src/test/java/com/hmsonline/storm/cassandra/bolt/CassandraMapStateTest.java Basically, you just need to point it to a keyspace + column family where the state data will be stored. - Taylor On Feb 6, 2014, at 3:25 AM, Svend Vanderveken svend.vanderve...@gmail.com wrote: The logic of a map state is to keep a state somewhere, you can think of a Storm state as a big Map of key values, the keys come from the groupBy and the values are the result of the aggregations. Conceptually, when your topology is talking to a State, you can imagine it's actually talking to a big HashMap (only there's a DB behind for persistence + opaque logic for error handling). Most of the time, I try not to have any other part of my product that actually depends on the location or structure the data is stored in DB, so I do not really need to be super specific about the storage stucture: that is up to the IBackingMap implementation I am delegating to. Read or write access to the DB is done via the Storm primitive, not by accessing the DB directly. Don't forget there's also the stateQuery primitive you can use to read you stored state from another place. There are ways to configure column families and column names, have a look at the super clear storm-cassandra doc to see how to do that with this implementation: https://github.com/hmsonline/storm-cassandra My blog post of last year is indeed illustrating a full implementation including an in-house IBackingMap implementation, I think that approach is sometimes needed when we want fine grained control over things. I should have made more clear that this is not necessarily the default approach to have. I hope this makes sense now. S On Wed, Feb 5, 2014 at 11:15 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Thank you Svend and Adam. Svend I’m your reader and that tutorial is very useful. I’ve been spending lots of time looking at the code and that blog post. BTW I initially thought you were adding the nulls incorrectly in Q3 below, but now I see you’re doing it correctly. I have a follow up question: Why do you say that “we do not implement multiget/multiput, we just take an existing implementation for Cassandra or Memcached or anything and they do what's right for that backend.” I thought that I had to rewrite an IBackingMap implementation to correspond to the tuples and schema I have in my database. I use Cassandra. I started with com.hmsonline.storm.cassandra.trident.CassandraState or trident.cassandra.CassandraState (they both implement IBackingMap) and I replaced multiGet and multiPut to match my db schema. (well, I’m trying to do it) You are saying I can use CassandraState as it is? :D If so how would it even know what table my data should go into? It allows you to set the column family and a few other things where state will be saved (keyspace, column family, replication, rowKey). By state I think it means sth like txID (transaction ID). Do you by any chance know what this state that CassandraState is saving is? So as you can tell I have no idea how to use CassandraState. Thanks again! -A From: Svend Vanderveken [mailto:svend.vanderve...@gmail.com] Sent: February-05-14 2:56 PM To: user@storm.incubator.apache.org Subject: Re: Svend's blog - several questions On Wed, Feb 5, 2014 at 6:22 PM, Adrian Mocanu amoc...@verticalscope.com wrote: I've read Svend's blog [http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/] multiple times and I have a few questions. So you are my reader! Great :D (you can post your questions on the blog itself, I'm more likely to spot it there) Because we did a groupBy on one tuple field, each List contains here one single String: the correlationId. Note that the list we return must have exactly the same size as the list of keys, so that Storm knows what period corresponds to what key. So for any key that does not exist in DB, we simply put a null in the resulting list. Q1: Do the db keys come only from groupBy? Yes, the key values arriving in the multiget are the field value by which we are grouping do groupBy (new Fields(color)) and you get things like blue; green, flowerly romantic red... Q2: Can you do groupBy multiple keys:like .groupBy(name).groupBy(id) ? yes, the syntax is like this: groupBy (new Fields(name, id)) That's the reason the keys in the multiget are ListObject and not simply Object. We receive them in the
Re: http-client version conflict
I tend to agree. The move to maven makes package relocation very easy, but we should be careful not to abuse it if that’s a direction we decide to take. IMHO, users having to modify the contents of $STORM_HOME/lib is really bad and should only be done as a last resort. If someone asks for help with an issue after they’ve modified the contents of $STORM_HOM/lib, all bets are off. I also agree that we need to keep a close watch on our dependencies, and that some might warrant re-evaluation. I’d love to others opinions. - Taylor On Feb 6, 2014, at 9:28 PM, Adam Lewis m...@adamlewis.com wrote: My $0.02 on this subject: Without going down the path of class loader or OSGi mania and becoming a full container, I'd definitely be in favor of storm relocating its own dependencies. In this way edge cases around things like reflection can be handled once within storm rather than burdening every topology builder with those details. Part of the problem seems to be that storm makes extensive use (directly or transitively) of a lot of go-to utility libraries like guava, thrift, jodatime, json-simple, snakeyaml, commons-io, etc... I'm sure that leveraging these libraries allowed storm's development to proceed rapidly, but from a maturity perspective, it is problematic to impose these version choices on users. And while I might want Storm to, say, try to track the latest Guava version, that same policy could be very problematic for others. If storm can relocate even some of its own dependencies, I think that would be a great help to me at least. Longer term, I wonder how much of some of these libraries are really being used. For example, is clj-time (and by extension joda-time) really needed? Or just a small fraction of the functionality in that library? I can probably pitch in some of the effort required to do this, if this is the direction people want to go in. On Thu, Feb 6, 2014 at 8:44 PM, P. Taylor Goetz ptgo...@gmail.com wrote: I’m glad the shader plugin worked for you. Updating dependencies can be tricky as it can easily introduce regressions. Ultimately we need to figure out the best solution to avoiding conflicts between user code (i.e. dependencies in topology jar files) and Storm’s libraries. The classloader approach has been attempted, but IMO Storm’s use of serialization complicates things significantly. Package relocation seems to be a relatively lightweight solution. If that’s a direction we pursue, then it introduces the question of whether Storm should relocate its dependencies, or if that should be left up to the user (topology developer). Elastic Search has gone down the path of relocating some of their dependencies [1] (not necessarily an endorsement, just an observation). I’ve CC’d dev@ since this is all related to the infamous issue #115, which is now STORM-129 [2]. - Taylor [1] https://github.com/elasticsearch/elasticsearch/blob/master/pom.xml#L474 [2] https://issues.apache.org/jira/browse/STORM-129 On Feb 6, 2014, at 7:25 PM, Vinay Pothnis vinay.poth...@gmail.com wrote: Thank you all for replies! The shader-plugin solution seems to work for us. I wonder if we can create a JIRA ticket for storm to upgrade the http-client library as part of their next release. -Vinay On Thu, Feb 6, 2014 at 2:38 PM, Michael Rose mich...@fullcontact.com wrote: We've done this with SLF4j and Guava as well without issues. Michael Rose (@Xorlev) Senior Platform Engineer, FullContact mich...@fullcontact.com On Thu, Feb 6, 2014 at 3:03 PM, Mark Greene m...@evertrue.com wrote: We had this problem as well. We modified our chef cookbook to just replace the older version with the newer one and storm didn't complain or have any other issues as a result. On Wed, Feb 5, 2014 at 10:31 AM, P. Taylor Goetz ptgo...@gmail.com wrote: Your best bet is probably to use the shade plugin to relocate the http-client package so it doesn’t conflict with the version storm uses. Storm does this with the libtrhift dependency in storm-core: https://github.com/apache/incubator-storm/blob/master/storm-core/pom.xml#L220 (You can ignore the clojure transformer in that config, unless you have non-AOT clojure code that uses the http-client library). More information on using the shade plugin to do package relocations can be found here: http://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html - Taylor On Feb 4, 2014, at 4:27 PM, Vinay Pothnis vinay.poth...@gmail.com wrote: Hello, I am using storm version 0.9.0.1. My application depends on apache http-client version 4.3.2 - but storm depends on http-client version 4.1.1. What is the best way to override this dependency? Thanks Vinay signature.asc Description: Message signed with OpenPGP using GPGMail
Re: http-client version conflict
Your best bet is probably to use the shade plugin to relocate the http-client package so it doesn’t conflict with the version storm uses. Storm does this with the libtrhift dependency in storm-core: https://github.com/apache/incubator-storm/blob/master/storm-core/pom.xml#L220 (You can ignore the clojure transformer in that config, unless you have non-AOT clojure code that uses the http-client library). More information on using the shade plugin to do package relocations can be found here: http://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html - Taylor On Feb 4, 2014, at 4:27 PM, Vinay Pothnis vinay.poth...@gmail.com wrote: Hello, I am using storm version 0.9.0.1. My application depends on apache http-client version 4.3.2 - but storm depends on http-client version 4.1.1. What is the best way to override this dependency? Thanks Vinay signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Rebalance topology throws java.lang.StringIndexOutOfBoundsException: String index out of range: -1
Hi Florin, I opened an issue to track this: https://issues.apache.org/jira/browse/STORM-214 Regards, Taylor On Feb 5, 2014, at 4:30 AM, Spico Florin spicoflo...@gmail.com wrote: Hello! In the mentioned version for Windows, the rebalance command arguments should not be passed as indicated in https://github.com/nathanmarz/storm/wiki/Understanding-the-parallelism-of-a-Storm-topology storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10 In order to get rid of the mentioned exception(java.lang.StringIndexOutOfBoundsException:) you should use the command storm rebalance WordCount -e spout=3 However, trying to rebalance more components (either spouts or bolts) will rebalance only the latest component mentioned in the list. So, for example for example: storm rebalance WordCount -e spout=3 -e count=5 the rebalance will be applied only for the count component not for the spout. So, in my opinion either the documentation should be updated or the rebalance.clj should be changed in order to support rebalancing for multiple components. I look forward for your opinions. Regards, Florin -- Forwarded message -- From: Spico Florin spicoflo...@gmail.com Date: Tue, Feb 4, 2014 at 11:10 AM Subject: Rebalance topology throws java.lang.StringIndexOutOfBoundsException: String index out of range: -1 To: user@storm.incubator.apache.org user@storm.incubator.apache.org Hello! I'm using the Taylor Goetz's storm version pointed out by article http://ptgoetz.github.io/blog/2013/12/18/running-apache-storm-on-windows/ and located at: https://github.com/ptgoetz/incubator-storm/tree/windows-test I have succeeded to install everything on my computer (running windows 7, 64 bit). I have also ran fine the indicated topology and my topology too. But when I'm trying to do a rebalancing of my topology by re-configuring the number of spouts or bolt with the command storm rebalance WordCount -e spout=3 I'm getting the exception: Exception in thread main java.lang.StringIndexOutOfBoundsException: String index out of range: -1 at java.lang.String.substring(String.java:1911) at backtype.storm.command.rebalance$parse_executor.invoke(rebalance.clj:24) at clojure.tools.cli$apply_specs.invoke(cli.clj:80) at clojure.tools.cli$cli.doInvoke(cli.clj:130) at clojure.lang.RestFn.invoke(RestFn.java:460) at backtype.storm.command.rebalance$_main.doInvoke(rebalance.clj:31) at clojure.lang.RestFn.applyTo(RestFn.java:137) at backtype.storm.command.rebalance.main(Unknown Source) If I'm changing only the number of workers it works without any exceptions. If someone of you have tested this version, can you please help me to get rid of it? I'll look forward for your answers. Thanks in advance. Regards, Florin signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Svend's blog - several questions
Hi Adrian, I’ll apologize up-front for not answering your questions now, but I’ll try to follow up later when I have a little more bandwidth. In the meantime, check out the storm documentation on the new Storm website: http://storm.incubator.apache.org, which includes the latest javadoc for the 0.9.x development line. Specifically, look for the documentation for trident, which should answer Q7/Q8. Again, I’ll try to address your other questions when I have more time, if someone else doesn’t address them first. - Taylor On Feb 5, 2014, at 12:22 PM, Adrian Mocanu amoc...@verticalscope.com wrote: I've read Svend's blog [http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/] multiple times and I have a few questions. Because we did a groupBy on one tuple field, each List contains here one single String: the correlationId. Note that the list we return must have exactly the same size as the list of keys, so that Storm knows what period corresponds to what key. So for any key that does not exist in DB, we simply put a null in the resulting list. Q1: Do the db keys come only from groupBy? Q2: Can you do groupBy multiple keys:like .groupBy(name).groupBy(id) ? Q3: When we add null we keep the size of the results list the same as they keys list but I don't understand how we make sure that key(3) points to correct result(3). After all we're adding nulls at the end of result list not intermitently. ie: if key(1) does not have an entry in db, and key size is 5, we add null to last position in results not to results(1). This doesn't preserve consistency/order so key(1) now gives result(1) which is not null as it should be. Is the code incorrect ... or the explanation on Svend's blog is incorrect? Moving on, Once this is loaded Storm will present the tuples having the same correlation ID one by one to our reducer, the PeriodBuilder Q4: Does Trident/Storm call the reducer after calling multiGet and before calling multiPut? Q5: What params (and their types) are passed to the reducer and what parameters should it emit so they can go into multiGet? Q6: The first time the program is run the database is empty and multiGet will return nothing. Does the reducer need to take care and make sure to insert for the first time as opposed to update value? I do see that reducer (TimelineUpdater) checks for nulls and I'm guessing this is the reason why it does so. Q7: Can someone explain what these mean: .each (I've seen this used even consecutively: .each(..).each(..) ) .newStream .newValuesStream .persistAggregate I am unable to find javadocs with documentation for the method signatures. These java docs don't help much: http://nathanmarz.github.io/storm/doc/storm/trident/Stream.html Q8: Storm has ack/fail; does Trident handle that automatically? Q9: Has anyone tried Spark? http://spark.incubator.apache.org/streaming/ I'm wondering if anyone has tried it because I'm thinking of ditching storm and moving to that. It seems much much much better documented. Lots of questions I know. Thanks for reading! -Adrian signature.asc Description: Message signed with OpenPGP using GPGMail
Re: trident simplest aggregator into cassandra
Which project did the CassandraState implementation come from? On Feb 3, 2014, at 5:09 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Hi I'm using Trident to perform some aggregations and store the results into cassandra. I've looked at IBackingMap and specifically at some tutorials on trident site and I've tried using CassandraState which I found online in some repository. After creating what I thought were column family and keys corresponding to the code I still cannot figure out how to run the sample topology and not crash due to some Cassandra schema error (InvalidRequestException(why:Invalid cell for CQL3 table state. The CQL3 column component (over) does not correspond to a defined CQL3 column). Here is the sample code I use: val cassandraStateFactory:StateFactory = chat.CassandraState.transactional(10.10.6.80) val spout = new FixedBatchSpout(new Fields(sentence), 3, new Values(the cow jumped over the moon), new Values(the man went to the store and bought some candy), new Values(four score and seven years ago), new Values(how many apples can you eat)) spout.setCycle(true) val wordCounts :TridentState= tridentBuilder.newStream(spout1, spout) .each(new Fields(sentence), new Split(), new Fields(word)) .groupBy(new Fields(word)) .persistentAggregate(cassandraStateFactory, new Count(), new Fields(count)) .parallelismHint(6) val cluster = new LocalCluster(); val config = new Config(); config.setMaxSpoutPending(100); config.setMaxSpoutPending(25); cluster.submitTopology(test, config, tridentBuilder.build()); What is the schema needed to run this example (it also uses CassandraState)? thanks A signature.asc Description: Message signed with OpenPGP using GPGMail
Re: trident simplest aggregator into cassandra
The storm-cassandra project (https://github.com/hmsonline/storm-cassandra) is actively maintained (DISCLAIMER: I’m the original author), so I would give that one a try. It’s also aligned with the latest version of storm. Also, which version of storm are you using? - Taylor On Feb 3, 2014, at 5:34 PM, Adrian Mocanu amoc...@verticalscope.com wrote: It’s from trident.cassandra libraryDependencies += trident-cassandra %trident-cassandra % 0.0.1-wip2 but I modified the file and removed an IMetrics parameter (it seems I have 2 interfaces for it in my code so during runtime it would think it was the other interface which did not have that param and crashed) Anyhow, I saw that there is another version of CassandraState in hmsonline project (libraryDependencies += com.hmsonline % storm-cassandra % 0.4.0-rc4) which is very different. I did not use that one – I thought that was the older version. Thanks A From: P. Taylor Goetz [mailto:ptgo...@gmail.com] Sent: February-03-14 5:21 PM To: user@storm.incubator.apache.org Subject: Re: trident simplest aggregator into cassandra Which project did the CassandraState implementation come from? On Feb 3, 2014, at 5:09 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Hi I'm using Trident to perform some aggregations and store the results into cassandra. I've looked at IBackingMap and specifically at some tutorials on trident site and I've tried using CassandraState which I found online in some repository. After creating what I thought were column family and keys corresponding to the code I still cannot figure out how to run the sample topology and not crash due to some Cassandra schema error (InvalidRequestException(why:Invalid cell for CQL3 table state. The CQL3 column component (over) does not correspond to a defined CQL3 column). Here is the sample code I use: val cassandraStateFactory:StateFactory = chat.CassandraState.transactional(10.10.6.80) val spout = new FixedBatchSpout(new Fields(sentence), 3, new Values(the cow jumped over the moon), new Values(the man went to the store and bought some candy), new Values(four score and seven years ago), new Values(how many apples can you eat)) spout.setCycle(true) val wordCounts :TridentState= tridentBuilder.newStream(spout1, spout) .each(new Fields(sentence), new Split(), new Fields(word)) .groupBy(new Fields(word)) .persistentAggregate(cassandraStateFactory, new Count(), new Fields(count)) .parallelismHint(6) val cluster = new LocalCluster(); val config = new Config(); config.setMaxSpoutPending(100); config.setMaxSpoutPending(25); cluster.submitTopology(test, config, tridentBuilder.build()); What is the schema needed to run this example (it also uses CassandraState)? thanks A signature.asc Description: Message signed with OpenPGP using GPGMail
Re: quick question ab failed tuple on only 1 of the 2 bolts
Correct. The spout will replay the tuple, so it will go through the entire graph. - Taylor On Jan 23, 2014, at 10:46 AM, Adrian Mocanu amoc...@verticalscope.com wrote: Any takers on this? I’m under the impression that storm will resend to both tuples not only to the one that send the fail signal. Adrian From: Adrian Mocanu [mailto:amoc...@verticalscope.com] Sent: January-22-14 11:33 AM To: user@storm.incubator.apache.org Subject: quick question ab failed tuple on only 1 of the 2 bolts Hi I’ve a quick question, if you send a tuple from spout to 2 bolts. If one of the bolts fails the tuple but the other acks it, does the spout resend the tuple to both bolts or only to the one that failed it? Thanks -Adrian
What's new in Storm 0.9.0
http://storm-project.net/2013/12/08/storm090-released.html signature.asc Description: Message signed with OpenPGP using GPGMail
Re: [storm-user] Re: [CORRECTION] [RELEASE] Storm 0.9.0
Guillaume, Thank you for calling this out. I inadvertently built the release against Java 7. Storm 0.9.0 should work with both Java 6 and Java 7. I will rectify this today by releasing what will likely be version 0.9.0.1. There will be no code changes, it will simply restore compatibility with Java 6. My sincere apologies for any inconvenience this may have caused. Fortunately, our move to the Apache process will help prevent these types of mistakes, since the more rigorous release process will help catch such mistakes. - Taylor On Dec 5, 2013, at 6:01 AM, Guillaume Perrot gper...@ubikod.com wrote: Yeah but I was using Storm 0.8.2 before this announcement. My deps on Java 6: mainly because we use Debian Lenny images on AWS, and we haven't tested Java 7 on our existing code base yet (which uses Hadoop/HBase/PIG). We want to migrate to Java 7 anyway, Storm adoption will probably speed up the process. My point for Java 7 is that such major change should be clearly documented in release notes. 2013/12/5 Andrew Perepelytsya aperepely...@hortonworks.com Guillaume, Storm 0.9.x builds moved to jdk7 a while back, maybe it was mentioned in some previous release notes. Also, there was a pull request for jdk6 compatibility, not sure if it's merged yet. Given that jdk6 is EOL'ed, how strong is your dependency on java 6? Andrew CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. -- You received this message because you are subscribed to the Google Groups storm-user group. To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out. signature.asc Description: Message signed with OpenPGP using GPGMail
[CORRECTION] [RELEASE] Storm 0.9.0
CORRECTION: The HREF for the release tag link was wrong, and there was a paragraph erroneously describing this release as a release candidate. This is a FINAL release. Corrected announcement below: The Storm team is pleased to announce the release of Storm version 0.9.0. NOTE: This release is neither a top-level Apache release nor an Apache Incubator release. Storm will now migrate to the Apache release process. Version 0.9.1 will be the first Apache incubator release. Storm is a distributed, fault-tolerant, and high-performance realtime computation system that provides strong guarantees on the processing of data. Additional information is available at the following URLS: http://storm-project.net/ https://github.com/nathanmarz/storm Downloads of binary distributions are listed in the downloads section: http://storm-project.net/downloads.html JAR artifacts can be found at clojars.org: https://clojars.org/storm The tag for this release can be found on github: https://github.com/nathanmarz/storm/tree/0.9.0 Changes since 0.9.0-rc3 * Fixed a netty client issue where sleep times for reconnection could be negative (thanks brndnmtthws) * Fixed an issue that would cause storm-netty unit tests to fail - Taylor signature.asc Description: Message signed with OpenPGP using GPGMail
Re: [storm-user] [RELEASE] Storm 0.9.0-rc3
Yep. You’re right. Should be corrected now. - Taylor On Nov 26, 2013, at 8:44 AM, James Xu xumingmi...@gmail.com wrote: Seems storm:storm:0.9.0-rc3 is missing: https://clojars.org/storm/versions/0.9.0-rc3 here is the storm:storm:0.9.0-rc2: https://clojars.org/storm/versions/0.9.0-rc2 On 2013年11月26日, at 下午9:39, P. Taylor Goetz ptgo...@gmail.com wrote: 0.9.0-rc3 artifacts are there: https://clojars.org/search?q=Storm - Taylor On Nov 26, 2013, at 4:54 AM, Nicolas iscern...@gmail.com wrote: Hi, It seems that it's not available on clojars (still showing 0.9.0-rc2 for me). Can you check please ? Thanks :) Nicolas -- You received this message because you are subscribed to the Google Groups storm-user group. To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out. -- You received this message because you are subscribed to the Google Groups storm-user group. To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out. -- You received this message because you are subscribed to the Google Groups storm-user group. To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.