Re: Storm.yaml
finally it works and the supervisor is shown in the Storm UI : -I modified the /etc/hosts file in each server -and I stopped the iptables 2014-04-21 14:33 GMT+00:00 Bilal Al Fartakh alfartaj.bi...@gmail.com: logs dir (storm ) are empty in both machines , yes in my nimbus server I have these 14814 CassandraDaemon 19669 Jps 29789 core 17659 nimbus 21083 QuorumPeerMain the supervisor is stopped automatically in its machine . what do you suggest me to do now ? (thank you so much @Abhishek and @Peasanna for taking pain on reading my message ) 2014-04-18 18:42 GMT+00:00 Abhishek Bhattacharjee abhishek.bhattacharje...@gmail.com: You can check the logs inside the logs dir. That is from the directory where you are running storm there's a logs dir. Are you sure your zookeeper is running ?? Run jps in the shell and see whether QuorumPeerMain is showing. On Apr 18, 2014 11:31 PM, Bilal Al Fartakh alfartaj.bi...@gmail.com wrote: sorry for my ignorance , but where I can see this logs ? and I doubt too that the supervisor couldn't communicate with Zookeeper , what do you suggest me to do or to check ? 2014-04-18 17:50 GMT+00:00 Prasanna Padmanabhan prass...@gmail.com: What do you see in the supervisor logs? I would think your supervisor is not able to talk to Zookeeper for registration. On Fri, Apr 18, 2014 at 10:19 AM, Bilal Al Fartakh alfartaj.bi...@gmail.com wrote: I have this in my Storm.yaml (supervisor ): storm.zookeeper.servers: - 88.198.22.226 # - server2 # nimbus.host: 88.198.22.226 -- and same thing in my nimbus : storm.zookeeper.servers: - localhost # - server2 # nimbus.host: localhost when I run the supervisor with nohup ~/src/storm-0.8.1/bin/storm supervisor the number of supervisors remains 0 (in UI ) and after a wile a get this message : [1]+ Exit 13 nohup ~/src/storm-0.8.1/bin/storm supervisor I'm lost guys what should I do to maintain the connection between my two servers ? -- *Al Fartakh Bilal* -- *Al Fartakh Bilal* -- *Al Fartakh Bilal*
Where to put the topology and where to run it
I have two machines nimbus and supervisor where do I have to put my project (topology) and how to run it ? I'm sure that most of users here must know the answer . any comment that will be helpful is appreciable :) -- *Al Fartakh Bilal*
Re: Where to put the topology and where to run it
You have to use maven for creating the jar of your topology. Then you can submit the topology using bin/storm jar path-to-jar namespace.namespace. your_main_file On Apr 22, 2014 8:43 PM, Bilal Al Fartakh alfartaj.bi...@gmail.com wrote: thanks Andrew , so for a test can I use my supervisor machine as a storm client ? and for deploying , do I have to use maven ? 2014-04-22 13:45 GMT+00:00 Andrew Perepelytsya aperepely...@hortonworks.com: Typically you package your topology and dependencies into a jar, then 'deploy' it with storm client (any machine having storm client deps and having a config pointing to a nimbus server). Nimbus takes care of physically rolling out the topology after. Andrew On Tue, Apr 22, 2014 at 9:40 AM, Bilal Al Fartakh alfartaj.bi...@gmail.com wrote: I have two machines nimbus and supervisor where do I have to put my project (topology) and how to run it ? I'm sure that most of users here must know the answer . any comment that will be helpful is appreciable :) -- *Al Fartakh Bilal* CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. -- *Al Fartakh Bilal*
Re: Topology submission exception caused by Class Not Found backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917
I have seen this as well and thought I was going nuts. In my testing I could reliably reproduce it in local mode against 0.9.1-incubating. What I noticed in my testing: - It only occurred when a custom logback.xml was on the class path. - More specifically, it only happened when there was an “appender-ref” child element in the “root”. Commenting out the “appender-ref” element made it go away. - It only happened with Trident topologies. It did not happen with regular storm topologies. It is clearly a classloader issue, but I have no idea what the root cause is. My theory is that it is related to the fact that in 0.9.1 we had both the clojure source code as well as the AOT-compiled clojure classes in the storm-core jar. In 0.9.2-SNAPSHOT (latest master branch), we no longer package the clojure source code alongside the AOT-compiled classes. In 0.9.2, with the patch for STORM-290 applied, this problem goes away. Not absolutely sure why, but I’m glad it does. ;) - Taylor On Apr 21, 2014, at 9:09 PM, Adam Lewis m...@adamlewis.com wrote: The tests are in the test tree (which I don't think eclipse cares about, but should get treated properly on the command line). The scope wasn't provided, it was defaulted to runtime...and I've now corrected that and it doesn't help. Strangely, if I explicitly exclude logback-classic within the storm dependency, it does work as long as I also add a dependency on slf4j-simple...so I am still seeing weird logger induced classpath issues...so this is what is working for me: dependency groupIdorg.apache.storm/groupId artifactIdstorm-core/artifactId exclusions exclusion artifactIdlogback-classic/artifactId groupIdch.qos.logback/groupId /exclusion /exclusions /dependency dependency groupIdorg.slf4j/groupId artifactIdslf4j-simple/artifactId /dependency WEIRD! On Mon, Apr 21, 2014 at 8:54 PM, Jon Logan jmlo...@buffalo.edu wrote: Are your maven scopes right? The scope of the Storm dependency should be provided -- not runtime. Also be sure that your main method / unit test is under your test/ classpath, not your main/ classpath. On Mon, Apr 21, 2014 at 8:49 PM, Adam Lewis m...@adamlewis.com wrote: Are there other things that could cause this error? Since upgrading to 0.9.1-incubating, I've hit it twice. The first time I resolved it (in one project) by fixing an issue where two slf4j bindings were on the classpath together (strange, but it worked)...now I'm hitting the problem again in a different project and can't figure out what is causing the problem. This is for a test which is submitting a topology to a LocalCluster; the full trace follows (happens launching JUnit from Eclipse and from Maven command line) java.lang.RuntimeException: java.lang.ClassNotFoundException: backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917 at backtype.storm.utils.Utils.deserialize(Utils.java:88) at backtype.storm.daemon.nimbus$read_storm_conf.invoke(nimbus.clj:89) at backtype.storm.daemon.nimbus$start_storm.invoke(nimbus.clj:724) at backtype.storm.daemon.nimbus$eval3974$exec_fn__1459__auto__$reify__3987.submitTopologyWithOpts(nimbus.clj:962) at backtype.storm.daemon.nimbus$eval3974$exec_fn__1459__auto__$reify__3987.submitTopology(nimbus.clj:971) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) at backtype.storm.testing$submit_local_topology.invoke(testing.clj:253) at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:34) at backtype.storm.LocalCluster.submitTopology(Unknown Source) at com.acuitysds.trident.TestTimeseriesAssembly.testBasicTopology(TestTimeseriesAssembly.java:108) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
Re: PersistentAggregate across batches
Yes partially, The part I was missing was getting old values and feeding it through the aggregator again, which still doesn't quite make sense to me. I am using an external datastore, so I am not able to use the vanilla MemcachedState, hence why I am implementing my own version of the IBackingMap. So let me try and explain what I am understanding. When I do something like Stream .groupBy(new Fields(a) .persistentAggregate(new MyStateFactory(), new Fields(a, b, c, d), new MyAggregator(), new Fields(resultMap)) What happens (as described herehttps://github.com/nathanmarz/storm/wiki/Trident-API-Overview) is the stream is split into different groups based on field a: [image: Grouping] like so. then, PartitionPersist will run a MultiGet on the fields (a, b, c, d), since that is what we are using as our keys. So in each of the groups described above, we would have not only the raw tuples resulting from the grouping, but also a single tuple with the result of the previous aggregation. These would all be run through the aggregator, which should be able to handle aggregating with this semi-complete aggregation (The Reduce function in a ReducerAggregator, or the Combine function in the CombinerAggregator). How does it know not to treat the previous aggregation as a single new tuple? (hence not running the init function ? For example if I was aggregating a count, having that previous value (say 60) as a single extra tuple would only increment the count by 1, instead of 60. would I then just need to implement my own init function such that it has checks for the tuple value, whether it is a raw new tuple, vs a previous tuple aggregation? On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray cody.a@gmail.com wrote: My understanding is that the process is 1. multiGet from the IBackingMap is called and returns a value for each key (or null if not present) 2. For each key, the old value from the get and new values in the batch are fed through the aggregator to produce one value per key 3. This value is then stored back into the state through the multiPut in the IBackingMap. If you just want to use nathanmarz's trident-memcached integration, you don't have to write an IBackingMap yourself. The MemcachedState itself implements IBackingMap to do the get and put. To use it, just decide what you want to groupBy (these become your keys) and how you want it aggregated (this is the reduced/combiner implementation). You don't have to write the memcache connection logic or the aggregation logic yourself unless you want to change how it's aggregated or stored. I've not used the trident-memcached state in particular, but in general this would look something like this: topology.newStream(spout1, spout1) .groupBy(new Fields(mykeyfield)) .persistentAggregate(MemcachedState.opaque(servers), new Fields(myvaluefield), new Sum(), new Fields(sum)) (Sorry for any code errors; writing in my phone) Does that answer your question? -Cody On Apr 22, 2014 10:32 AM, Raphael Hsieh raffihs...@gmail.com wrote: The Reducer/Combiner Aggregators hold logic in order to aggregate across an entire batch, however it does not have the logic to aggregate between batches. In order for this to happen, it must read the previous TransactionId and value from the datastore, determine whether this incoming data is in the right sequence, then increment the value within the datastore. I am asking about this second part. Where the logic goes in order to read previous data from the datastore, and add it to the new incoming aggregate data. On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray cody.a@gmail.comwrote: Its the ReducerAggregate/CombinerAggregator's job to implement this logic. Look at Count and Sum that are built-in to Trident. You can also implement your own aggregator. -Cody On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh raffihs...@gmail.comwrote: If I am using an opaque spout and doing a persistent aggregate to a MemcachedState, how is it aggregating/incrementing the values across all batches ? I'm wanting to implement an IBackingMap so that I can use an external datastore. However, I'm unsure where the logic goes that will read the previous data, and aggregate it with the new data. From what I've been told, I need to implement the IBackingMap and the multiput/multiget functions. So logically, I think it makes sense that I would put this update logiv in the multiput function. However, the OpaqueMap class already has multiGet logic in order to check the TxId of the batch. Instead of using an OpaqueMap class, should I just make my own implementation ? Thanks -- Raphael Hsieh -- Cody A. Ray, LEED AP cody.a@gmail.com 215.501.7891 -- Raphael Hsieh -- Raphael Hsieh
Re: PersistentAggregate across batches
the previous link didn't work, https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#operations-on-grouped-streams On Tue, Apr 22, 2014 at 10:30 AM, Raphael Hsieh raffihs...@gmail.comwrote: Yes partially, The part I was missing was getting old values and feeding it through the aggregator again, which still doesn't quite make sense to me. I am using an external datastore, so I am not able to use the vanilla MemcachedState, hence why I am implementing my own version of the IBackingMap. So let me try and explain what I am understanding. When I do something like Stream .groupBy(new Fields(a) .persistentAggregate(new MyStateFactory(), new Fields(a, b, c, d), new MyAggregator(), new Fields(resultMap)) What happens (as described herehttps://github.com/nathanmarz/storm/wiki/Trident-API-Overview) is the stream is split into different groups based on field a: [image: Grouping] like so. then, PartitionPersist will run a MultiGet on the fields (a, b, c, d), since that is what we are using as our keys. So in each of the groups described above, we would have not only the raw tuples resulting from the grouping, but also a single tuple with the result of the previous aggregation. These would all be run through the aggregator, which should be able to handle aggregating with this semi-complete aggregation (The Reduce function in a ReducerAggregator, or the Combine function in the CombinerAggregator). How does it know not to treat the previous aggregation as a single new tuple? (hence not running the init function ? For example if I was aggregating a count, having that previous value (say 60) as a single extra tuple would only increment the count by 1, instead of 60. would I then just need to implement my own init function such that it has checks for the tuple value, whether it is a raw new tuple, vs a previous tuple aggregation? On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray cody.a@gmail.com wrote: My understanding is that the process is 1. multiGet from the IBackingMap is called and returns a value for each key (or null if not present) 2. For each key, the old value from the get and new values in the batch are fed through the aggregator to produce one value per key 3. This value is then stored back into the state through the multiPut in the IBackingMap. If you just want to use nathanmarz's trident-memcached integration, you don't have to write an IBackingMap yourself. The MemcachedState itself implements IBackingMap to do the get and put. To use it, just decide what you want to groupBy (these become your keys) and how you want it aggregated (this is the reduced/combiner implementation). You don't have to write the memcache connection logic or the aggregation logic yourself unless you want to change how it's aggregated or stored. I've not used the trident-memcached state in particular, but in general this would look something like this: topology.newStream(spout1, spout1) .groupBy(new Fields(mykeyfield)) .persistentAggregate(MemcachedState.opaque(servers), new Fields(myvaluefield), new Sum(), new Fields(sum)) (Sorry for any code errors; writing in my phone) Does that answer your question? -Cody On Apr 22, 2014 10:32 AM, Raphael Hsieh raffihs...@gmail.com wrote: The Reducer/Combiner Aggregators hold logic in order to aggregate across an entire batch, however it does not have the logic to aggregate between batches. In order for this to happen, it must read the previous TransactionId and value from the datastore, determine whether this incoming data is in the right sequence, then increment the value within the datastore. I am asking about this second part. Where the logic goes in order to read previous data from the datastore, and add it to the new incoming aggregate data. On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray cody.a@gmail.comwrote: Its the ReducerAggregate/CombinerAggregator's job to implement this logic. Look at Count and Sum that are built-in to Trident. You can also implement your own aggregator. -Cody On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh raffihs...@gmail.comwrote: If I am using an opaque spout and doing a persistent aggregate to a MemcachedState, how is it aggregating/incrementing the values across all batches ? I'm wanting to implement an IBackingMap so that I can use an external datastore. However, I'm unsure where the logic goes that will read the previous data, and aggregate it with the new data. From what I've been told, I need to implement the IBackingMap and the multiput/multiget functions. So logically, I think it makes sense that I would put this update logiv in the multiput function. However, the OpaqueMap class already has multiGet logic in order to check the TxId of the batch. Instead of using an OpaqueMap class, should I just make my own implementation ? Thanks -- Raphael Hsieh -- Cody A. Ray, LEED AP cody.a@gmail.com 215.501.7891
Understanding metrics.log
I added metrics to my storm implementation by implementing IMetric. It is working and can see the metrics log populated with all stats. I've a 3 node (3 worker) and 1 nimbus/zookeeper in Development. On WORKER1's Metrics Log I can see , some metrics with reference to WORKER2 and WORKER3. For example, 2014-04-22 16:42:14,829 230021398184934 *worker3*:9953 17:normalizeBoltnull{bolt.timeconsumed={8, 1657, 5135, 41}} 2014-04-22 16:42:14,830 230031398184934 *worker3*:9953 38:normalizeBoltnull{db.searchip={3, 77, 221, 7}, fetch-doc={3, 32, 101, 8}} 2014-04-22 16:42:14,833 230061398184934 *worker3*:9953 31:normalizeBoltnull{bolt.timeconsumed={7, 1638, 5036, 36}} 2014-04-22 16:42:14,833 230061398184934 *worker3*:9953 10:normalizeBoltnull{db.searchip={3, 86, 165, 6}, fetch-doc={3, 17, 53, 7}} 2014-04-22 16:42:14,865 230381398184934 *worker1*:9953 44:normalizeSpout null{spout.timeconsumed={656, 8454, 56490, 10}} *My questions, * Why I'm seeing the stats of WORKER3 in WORKER1's log file ? My expectation was each worker will just log it's own metrics and it's upto the log collector (like splunk or logstash) to merge and interpret. Thanks, Prasun