Re: Storm with ActiveMQ
Hi Peter, Thank you for helping me out!!! I went through the provided link and it has helped me a lot. Now i do get that there will be a intra worker messaging and another messaging system for pulling data into storm. Can you also tell me whether i need to have specific knowledge about the external queing system(RabbitMQ,ActiveMQ,kafka) or it will just be a connector which i could use to ingest data. I wanna know whether i need to develop specific skills for the queing systems like RabbitMQ? Thanks, Siddharth ubale On Wed, Jul 2, 2014 at 6:33 PM, Richards Peter hbkricha...@gmail.com wrote: Hi, I think you are slightly confused. Let me try to explain it in a simple manner. Storm has a transport layer for communication between spouts and bolts. At present (release =storm 0.9) there are two transport layer implementations provided by storm - zeromq and netty based implementations.You can select anyone of them. Installation of zeromq needs some special steps. It is already documented in storm project. I haven't tried Netty based implementation. However netty based implementation is supposed to give you better performance. http://yahooeng.tumblr.com/post/64758709722/making-storm-fly-with-netty Now let me try to answer your question related to ActiveMQ. For most of the realtime usecases people like to process messages from a queue. For such usecases the messages are read from these queues by the spout. You can use any queuing system(Kafka/RabbitMQ/ActiveMQ/...) for this activity. All that you need is a consumer in your spout to read these messages. Regards, Richards Peter.
Re: Reduce number of imports
Once per supervisor / worker, I should say. Best regards, Joel 2014-07-01 14:38 GMT+02:00 Joel Samuelsson samuelsson.j...@gmail.com: We are using storm bolts written in python. In the bolt we have a dependency that takes a lot of time to import. Is there a way to make the import only once per instance of the bolt? Best regards, Joel
Re: Storm with ActiveMQ
Hi, Well that depends on how you pull data into your storm cluster. If you are pulling it from a queue, you will need to know how the queuing system allows you to retrieve the data. You may have standards such as JMS or even specific implementations as in kafka. Some people even pull data from file systems and databases. Please take a call based on type of data source, acceptable latency in fetching data and the cost of having such a system. Regards, Richards Peter.
Re: Storm with ActiveMQ
We use ActiveMQ with storm in production with a patched version of the JMS Trident Spout. On Wed, Jul 2, 2014 at 11:27 AM, Richards Peter hbkricha...@gmail.com wrote: Hi, Well that depends on how you pull data into your storm cluster. If you are pulling it from a queue, you will need to know how the queuing system allows you to retrieve the data. You may have standards such as JMS or even specific implementations as in kafka. Some people even pull data from file systems and databases. Please take a call based on type of data source, acceptable latency in fetching data and the cost of having such a system. Regards, Richards Peter.
Re: key values in PersistentAggregate
actually I think this is a non-issue, given the field exists in the stream already, I should be able to access it right ? On Wed, Jul 2, 2014 at 10:27 AM, Raphael Hsieh raffihs...@gmail.com wrote: From my understanding, if I implement my own state factory to use in PersistentAggregate, the grouping fields will be the key values in the state, However, if I want to have access to other fields in the aggregation, how might I get those ? From my understanding, doing a groupBy() will create a new GroupedStream which will only have the fields specified in the groupBy(). Essentially what I want to do is: stream .groupBy(new Fields(a)) .persistentAggregate( new Factory(), new Fields(b), ... ) How would I do this ? -- Raphael Hsieh -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
Re: key values in PersistentAggregate
Yes. GroupingBy simply allows you to select out of the existing fields you have created within your TridentTopology. You still maintain access to all the fields created within the stream that is being groupedBy. On Wed, Jul 2, 2014 at 1:44 PM, Raphael Hsieh raffihs...@gmail.com wrote: actually I think this is a non-issue, given the field exists in the stream already, I should be able to access it right ? On Wed, Jul 2, 2014 at 10:27 AM, Raphael Hsieh raffihs...@gmail.com wrote: From my understanding, if I implement my own state factory to use in PersistentAggregate, the grouping fields will be the key values in the state, However, if I want to have access to other fields in the aggregation, how might I get those ? From my understanding, doing a groupBy() will create a new GroupedStream which will only have the fields specified in the groupBy(). Essentially what I want to do is: stream .groupBy(new Fields(a)) .persistentAggregate( new Factory(), new Fields(b), ... ) How would I do this ? -- Raphael Hsieh -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
Re: [Issue] OutOfMemoryError when disable ackers
My hunch would be that you're anchoring your tuples and without acker tasks these tuple trees are never destroyed. So all the tuples from each batch are kept in memory until you run out of memory, i.e., a very, very fast memory leak. If you disable ackers, you should probably not anchor your tuples either. -Cody On Mon, Jun 30, 2014 at 9:56 PM, jamesw...@yahoo.com.tw jamesw...@yahoo.com.tw wrote: Hi all, I use storm 0.9.2 now, and do real-time processing by basic storm. It's normal to use in reliable way. But the speed is extremely slow about 6000 tuples/s. So I try to disable acker feature by conf.setNumAckers(0). However, this leads to following error: java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.reflect.Method.copy(Method.java:151) at java.lang.reflect.ReflectAccess.copyMethod(ReflectAccess.java:136) at sun.reflect.ReflectionFactory.copyMethod(ReflectionFactory.java:300) at java.lang.Class.copyMethods(Class.java:2891) at java.lang.Class.getMethods(Class.java:1467) at clojure.lang.Reflector.getMethods(Reflector.java:357) at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:27) at backtype.storm.daemon.worker$mk_transfer_fn$fn__5748.invoke(worker.clj:128) at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__5483.invoke(executor.clj:256) at backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at backtype.storm.disruptor$consume_loop_STAR_$fn__758.invoke(disruptor.clj:94) at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) Can anyone tell me why this happens? Thank you very much. Best regards, James Fu -- Cody A. Ray, LEED AP cody.a@gmail.com 215.501.7891
Clojure 1.6?
I couldn't find a reference to it, except for this old ticket: But I was trying to use Clojure 1.6 with my local storm setup (and the great Marceline https://github.com/yieldbot/marceline lib), and kept getting the error of: WARNING: some? already refers to: #'clojure.core/some? in namespace: backtype.storm.util, being replaced by: #'backtype.storm.util/some? Which seemed to also break me being able to deploy local topologies as well. Rolling back to Clojure 1.5.1 (which is what is in the pom) solved the issue. Just wondering - can someone confirm if this is a known issue, and Clojure 1.6 is just not supported at this stage? (I'm going to assume it isn't). If not, I may spend some time digging, and submit a pull request that works around the issue. Thanks! Mark -- E: mark.man...@gmail.com T: http://www.twitter.com/neurotic W: www.compoundtheory.com 2 Devs from Down Under Podcast http://www.2ddu.com/
Re: Clojure 1.6?
Whoops, forgot the ticket I was referring to: https://issues.apache.org/jira/browse/STORM-265 On Thu, Jul 3, 2014 at 9:32 AM, Mark Mandel mark.man...@gmail.com wrote: I couldn't find a reference to it, except for this old ticket: But I was trying to use Clojure 1.6 with my local storm setup (and the great Marceline https://github.com/yieldbot/marceline lib), and kept getting the error of: WARNING: some? already refers to: #'clojure.core/some? in namespace: backtype.storm.util, being replaced by: #'backtype.storm.util/some? Which seemed to also break me being able to deploy local topologies as well. Rolling back to Clojure 1.5.1 (which is what is in the pom) solved the issue. Just wondering - can someone confirm if this is a known issue, and Clojure 1.6 is just not supported at this stage? (I'm going to assume it isn't). If not, I may spend some time digging, and submit a pull request that works around the issue. Thanks! Mark -- E: mark.man...@gmail.com T: http://www.twitter.com/neurotic W: www.compoundtheory.com 2 Devs from Down Under Podcast http://www.2ddu.com/ -- E: mark.man...@gmail.com T: http://www.twitter.com/neurotic W: www.compoundtheory.com 2 Devs from Down Under Podcast http://www.2ddu.com/
回覆: [Issue] OutOfMemoryError when disable ackers
Hi, I really not remove anchor ack when I run storm in the unreliable way by setNumAckers to 0. I think it's simple by changing config to be between reliable and unreliable storm. I'll try it later. Thank you - Reply message - 寄件者: Cody A. Ray cody.a@gmail.com 收件者: user@storm.incubator.apache.org 主旨: [Issue] OutOfMemoryError when disable ackers 日期: 週四, 7月 3 日, 2014 年 6:49 上午 My hunch would be that you're anchoring your tuples and without acker tasks these tuple trees are never destroyed. So all the tuples from each batch are kept in memory until you run out of memory, i.e., a very, very fast memory leak. If you disable ackers, you should probably not anchor your tuples either. -Cody On Mon, Jun 30, 2014 at 9:56 PM, jamesw...@yahoo.com.tw jamesw...@yahoo.com.tw wrote: Hi all, I use storm 0.9.2 now, and do real-time processing by basic storm. It's normal to use in reliable way. But the speed is extremely slow about 6000 tuples/s. So I try to disable acker feature by conf.setNumAckers(0). However, this leads to following error: java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.reflect.Method.copy(Method.java:151) at java.lang.reflect.ReflectAccess.copyMethod(ReflectAccess.java:136) at sun.reflect.ReflectionFactory.copyMethod(ReflectionFactory.java:300) at java.lang.Class.copyMethods(Class.java:2891) at java.lang.Class.getMethods(Class.java:1467) at clojure.lang.Reflector.getMethods(Reflector.java:357) at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:27) at backtype.storm.daemon.worker$mk_transfer_fn$fn__5748.invoke(worker.clj:128) at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__5483.invoke(executor.clj:256) at backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor..clj:58) at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at backtype.storm.disruptor$consume_loop_STAR_$fn__758.invoke(disruptor.clj:94) at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) Can anyone tell me why this happens? Thank you very much. Best regards, James Fu -- Cody A. Ray, LEED AP cody.a@gmail.com 215.501.7891
Re: Reduce number of imports
I believe the below is a stub you can flesh out. def post_initialize( self ): flesh out Please note that I can't recommend Storm and Python; language layers begin with Java - Jruby - Python, and the text pipe in between Jruby and Python makes it really hard to recover from. I recommend either Java, and worst case Jruby, for any production work you're thinking about. On Wed, Jul 2, 2014 at 8:41 AM, Joel Samuelsson samuelsson.j...@gmail.com wrote: Once per supervisor / worker, I should say. Best regards, Joel 2014-07-01 14:38 GMT+02:00 Joel Samuelsson samuelsson.j...@gmail.com: We are using storm bolts written in python. In the bolt we have a dependency that takes a lot of time to import. Is there a way to make the import only once per instance of the bolt? Best regards, Joel