Re: Storm with ActiveMQ

2014-07-02 Thread siddharth ubale
Hi Peter,

Thank you for helping me out!!!
I went through the provided link and it has helped me a lot. Now i do get
that there will be a intra worker messaging and another messaging system
for pulling data into storm.
Can you also tell me whether i need to have specific knowledge about the
external queing system(RabbitMQ,ActiveMQ,kafka) or it will just be a
connector which i could use to ingest data. I wanna know whether i need to
develop specific skills for the queing systems like RabbitMQ?

Thanks,
Siddharth ubale


On Wed, Jul 2, 2014 at 6:33 PM, Richards Peter hbkricha...@gmail.com
wrote:

 Hi,

 I think you are slightly confused. Let me try to explain it in a simple
 manner. Storm has a transport layer for communication between spouts and
 bolts. At present (release =storm 0.9) there are two transport layer
 implementations provided by storm - zeromq and netty based
 implementations.You can select anyone of them. Installation of zeromq needs
 some special steps. It is already documented in storm project. I haven't
 tried Netty based implementation. However netty based implementation is
 supposed to give you better performance.
 http://yahooeng.tumblr.com/post/64758709722/making-storm-fly-with-netty

 Now let me try to answer your question related to ActiveMQ. For most of
 the realtime usecases people like to process messages from a queue. For
 such usecases the messages are read from these queues by the spout. You can
 use any queuing system(Kafka/RabbitMQ/ActiveMQ/...) for this activity. All
 that you need is a consumer in your spout to read these messages.

 Regards,
 Richards Peter.



Re: Reduce number of imports

2014-07-02 Thread Joel Samuelsson
Once per supervisor / worker, I should say.

Best regards,
Joel


2014-07-01 14:38 GMT+02:00 Joel Samuelsson samuelsson.j...@gmail.com:

 We are using storm bolts written in python. In the bolt we have a
 dependency that takes a lot of time to import. Is there a way to make the
 import only once per instance of the bolt?

 Best regards,
 Joel



Re: Storm with ActiveMQ

2014-07-02 Thread Richards Peter
Hi,

Well that depends on how you pull data into your storm cluster. If you are
pulling it from a queue, you will need to know how the queuing system
allows you to retrieve the data. You may have standards such as JMS or even
specific implementations as in kafka. Some people even pull data from file
systems and databases. Please take a call based on type of data source,
acceptable latency in fetching data and the cost of having such a system.

Regards,
Richards Peter.


Re: Storm with ActiveMQ

2014-07-02 Thread Michael Ritsema
We use ActiveMQ with storm in production with a patched version of the JMS
Trident Spout.


On Wed, Jul 2, 2014 at 11:27 AM, Richards Peter hbkricha...@gmail.com
wrote:

 Hi,

 Well that depends on how you pull data into your storm cluster. If you are
 pulling it from a queue, you will need to know how the queuing system
 allows you to retrieve the data. You may have standards such as JMS or even
 specific implementations as in kafka. Some people even pull data from file
 systems and databases. Please take a call based on type of data source,
 acceptable latency in fetching data and the cost of having such a system.

 Regards,
 Richards Peter.



Re: key values in PersistentAggregate

2014-07-02 Thread Raphael Hsieh
actually I think this is a non-issue,
given the field exists in the stream already, I should be able to access it
right ?


On Wed, Jul 2, 2014 at 10:27 AM, Raphael Hsieh raffihs...@gmail.com wrote:

 From my understanding, if I implement my own state factory to use in
 PersistentAggregate, the grouping fields will be the key values in the
 state,
 However, if I want to have access to other fields in the aggregation, how
 might I get those ? From my understanding, doing a groupBy() will create a
 new GroupedStream which will only have the fields specified in the
 groupBy().

 Essentially what I want to do is:
 stream
 .groupBy(new Fields(a))
 .persistentAggregate(
 new Factory(),
 new Fields(b),
 ...
 )

 How would I do this ?

 --
 Raphael Hsieh







-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014


Re: key values in PersistentAggregate

2014-07-02 Thread Robert Lee
Yes. GroupingBy simply allows you to select out of the existing fields you
have created within your TridentTopology. You still maintain access to all
the fields created within the stream that is being groupedBy.


On Wed, Jul 2, 2014 at 1:44 PM, Raphael Hsieh raffihs...@gmail.com wrote:

 actually I think this is a non-issue,
 given the field exists in the stream already, I should be able to access
 it right ?


 On Wed, Jul 2, 2014 at 10:27 AM, Raphael Hsieh raffihs...@gmail.com
 wrote:

 From my understanding, if I implement my own state factory to use in
 PersistentAggregate, the grouping fields will be the key values in the
 state,
 However, if I want to have access to other fields in the aggregation, how
 might I get those ? From my understanding, doing a groupBy() will create a
 new GroupedStream which will only have the fields specified in the
 groupBy().

 Essentially what I want to do is:
 stream
 .groupBy(new Fields(a))
 .persistentAggregate(
 new Factory(),
 new Fields(b),
 ...
 )

 How would I do this ?

 --
 Raphael Hsieh







 --
 Raphael Hsieh
 Amazon.com
 Software Development Engineer I
 (978) 764-9014






Re: [Issue] OutOfMemoryError when disable ackers

2014-07-02 Thread Cody A. Ray
My hunch would be that you're anchoring your tuples and without acker tasks
these tuple trees are never destroyed. So all the tuples from each batch
are kept in memory until you run out of memory, i.e., a very, very fast
memory leak.

If you disable ackers, you should probably not anchor your tuples either.

-Cody


On Mon, Jun 30, 2014 at 9:56 PM, jamesw...@yahoo.com.tw 
jamesw...@yahoo.com.tw wrote:

  Hi all,

 I use storm 0.9.2 now, and do real-time processing by basic storm. It's
 normal to use in reliable way. But the speed is extremely slow about 6000
 tuples/s. So I try to disable acker feature by conf.setNumAckers(0).
 However, this leads to following error:

 java.lang.OutOfMemoryError: GC overhead limit exceeded at
 java.lang.reflect.Method.copy(Method.java:151) at
 java.lang.reflect.ReflectAccess.copyMethod(ReflectAccess.java:136) at
 sun.reflect.ReflectionFactory.copyMethod(ReflectionFactory.java:300) at
 java.lang.Class.copyMethods(Class.java:2891) at
 java.lang.Class.getMethods(Class.java:1467) at
 clojure.lang.Reflector.getMethods(Reflector.java:357) at
 clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:27) at
 backtype.storm.daemon.worker$mk_transfer_fn$fn__5748.invoke(worker.clj:128)
 at
 backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__5483.invoke(executor.clj:256)
 at
 backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58)
 at
 backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
 at
 backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
 at
 backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
 at
 backtype.storm.disruptor$consume_loop_STAR_$fn__758.invoke(disruptor.clj:94)
 at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) at
 clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)


 Can anyone tell me why this happens?
 Thank you very much.

 Best regards,
 James Fu




-- 
Cody A. Ray, LEED AP
cody.a@gmail.com
215.501.7891


Clojure 1.6?

2014-07-02 Thread Mark Mandel
I couldn't find a reference to it, except for this old ticket:

But I was trying to use Clojure 1.6 with my local storm setup (and the
great Marceline https://github.com/yieldbot/marceline lib), and kept
getting the error of:
WARNING: some? already refers to: #'clojure.core/some? in namespace:
backtype.storm.util, being replaced by: #'backtype.storm.util/some?

Which seemed to also break me being able to deploy local topologies as
well.

Rolling back to Clojure 1.5.1 (which is what is in the pom) solved the
issue.

Just wondering - can someone confirm if this is a known issue, and Clojure
1.6 is just not supported at this stage? (I'm going to assume it isn't).

If not, I may spend some time digging, and submit a pull request that works
around the issue.

Thanks!

Mark



-- 
E: mark.man...@gmail.com
T: http://www.twitter.com/neurotic
W: www.compoundtheory.com

2 Devs from Down Under Podcast
http://www.2ddu.com/


Re: Clojure 1.6?

2014-07-02 Thread Mark Mandel
Whoops, forgot the ticket I was referring to:
https://issues.apache.org/jira/browse/STORM-265


On Thu, Jul 3, 2014 at 9:32 AM, Mark Mandel mark.man...@gmail.com wrote:

 I couldn't find a reference to it, except for this old ticket:

 But I was trying to use Clojure 1.6 with my local storm setup (and the
 great Marceline https://github.com/yieldbot/marceline lib), and kept
 getting the error of:
 WARNING: some? already refers to: #'clojure.core/some? in namespace:
 backtype.storm.util, being replaced by: #'backtype.storm.util/some?

 Which seemed to also break me being able to deploy local topologies as
 well.

 Rolling back to Clojure 1.5.1 (which is what is in the pom) solved the
 issue.

 Just wondering - can someone confirm if this is a known issue, and Clojure
 1.6 is just not supported at this stage? (I'm going to assume it isn't).

 If not, I may spend some time digging, and submit a pull request that
 works around the issue.

 Thanks!

 Mark



 --
 E: mark.man...@gmail.com
 T: http://www.twitter.com/neurotic
 W: www.compoundtheory.com

 2 Devs from Down Under Podcast
 http://www.2ddu.com/




-- 
E: mark.man...@gmail.com
T: http://www.twitter.com/neurotic
W: www.compoundtheory.com

2 Devs from Down Under Podcast
http://www.2ddu.com/


回覆: [Issue] OutOfMemoryError when disable ackers

2014-07-02 Thread jamesw...@yahoo.com.tw
Hi,
I really not remove anchor ack when I run storm in the unreliable way by 
setNumAckers to 0.

I think it's simple by changing config to be between reliable and unreliable 
storm. I'll try it later. Thank you

- Reply message -
寄件者: Cody A. Ray cody.a@gmail.com
收件者: user@storm.incubator.apache.org
主旨: [Issue] OutOfMemoryError when disable ackers
日期: 週四, 7月 3 日, 2014 年 6:49 上午

My hunch would be that you're anchoring your tuples and without acker tasks 
these tuple trees are never destroyed. So all the tuples from each batch are 
kept in memory until you run out of memory, i.e., a very, very fast memory leak.


If you disable ackers, you should probably not anchor your tuples either.

-Cody



On Mon, Jun 30, 2014 at 9:56 PM, jamesw...@yahoo.com.tw 
jamesw...@yahoo.com.tw wrote:



Hi all,
I use storm 0.9.2 now, and do real-time processing by basic storm. It's normal 
to use in reliable way. But the speed is extremely slow about 6000 tuples/s. So 
I try to disable acker feature by conf.setNumAckers(0). However, this leads to 
following error:



java.lang.OutOfMemoryError: GC overhead limit exceeded at 
java.lang.reflect.Method.copy(Method.java:151) at 
java.lang.reflect.ReflectAccess.copyMethod(ReflectAccess.java:136) at 
sun.reflect.ReflectionFactory.copyMethod(ReflectionFactory.java:300) at 
java.lang.Class.copyMethods(Class.java:2891) at 
java.lang.Class.getMethods(Class.java:1467) at 
clojure.lang.Reflector.getMethods(Reflector.java:357) at 
clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:27) at 
backtype.storm.daemon.worker$mk_transfer_fn$fn__5748.invoke(worker.clj:128) at 
backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__5483.invoke(executor.clj:256)
 at 
backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor..clj:58) 
at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
 at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
 at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) 
at backtype.storm.disruptor$consume_loop_STAR_$fn__758.invoke(disruptor.clj:94) 
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) at 
clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)




Can anyone tell me why this happens?
Thank you very much.

Best regards,
James Fu







-- 
Cody A. Ray, LEED AP
cody.a@gmail.com
215.501.7891

Re: Reduce number of imports

2014-07-02 Thread Mark Hu
I believe the below is a stub you can flesh out.

def post_initialize( self ):
 flesh out

Please note that I can't recommend Storm and Python; language layers begin
with Java - Jruby - Python, and the text pipe in between Jruby and Python
makes it really hard to recover from. I recommend either Java, and worst
case Jruby, for any production work you're thinking about.


On Wed, Jul 2, 2014 at 8:41 AM, Joel Samuelsson samuelsson.j...@gmail.com
wrote:

 Once per supervisor / worker, I should say.

 Best regards,
 Joel


 2014-07-01 14:38 GMT+02:00 Joel Samuelsson samuelsson.j...@gmail.com:

 We are using storm bolts written in python. In the bolt we have a
 dependency that takes a lot of time to import. Is there a way to make the
 import only once per instance of the bolt?

 Best regards,
 Joel