Re: Storm performing very slow

2014-09-22 Thread Michael Rose
Storm is not your bottleneck. Check your Storm code to 1) ensure you're
parallelizing your writes and 2) you're batching writes to your external
resources if possible. Some quick napkin math shows you only doing 110
writes/s, which seems awfully low.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Mon, Sep 22, 2014 at 8:05 PM, Kushan Maskey wrote:

 I am trying to load 20 M records into Cassandra database through
 Kafka-Storm. I am able to post all the data in 5 mins into Kafka. But
 reading it from storm and inserting into Cassandra, Couch and Solr is kind
 of very slow. It has been running for past 5 hours and so far only 2
 Million records.

 How do I make the storm perform faster? Coz in this pace it will take
 couple of days to load all the data.

 Kushan Maskey

Re: Question on failing ack

2014-08-25 Thread Michael Rose
Hi Kushan,

Depending on the Kafka spout you're using, it could be doing different
things when it failed. However, if it's running reliably, the Cassandra
insertion failures would have forced a replay from the spout until they had

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Mon, Aug 25, 2014 at 4:42 PM, Kushan Maskey wrote:

 I have set up topology to load a very large volume of data. Recently I
 just loaded about 60K records and found out that there are some failed acks
 on few spouts but non on the bolts. Storm completed running and seem to
 look stable. Initially i started with a lesser amount of data like about
 500 records  successfully and then increased up to 60K where i saw the
 failed acks.

 1. Does that mean that the spout was not able to read some messages from
 Kafka? Since there are no failed ack on the bolts as per UI, what ever the
 message received has been successfully processed by the bolts.
 2. how do i interpret the numbers of failed acks like this acked:315500
  and failed: 2980.
 Does this mean that 2980 records failed to be processed? Is this is the
 case then, how do I avoid this from happening because I will be loosing
 2980 records.
 3. I also see that few of the records failed to be inserted into Cassandra
 database. What is the best way to reprocess the data again as it is quite
 difficult to do it through the batch process that I am currently running.

 LMK, thanks.

 Kushan Maskey

Re: Implementing a barrier mechanism in storm

2014-08-01 Thread Michael Rose
In the prepare method you receive a copy of the topology context which can
tell you all of the stream-components you're subscribed to.

You could make it a static field or just fields group on messageId
On Aug 1, 2014 12:26 AM, Spico Florin wrote:

   Thanks for your reply. I was thinking to use such the cache mechanism
 (the guava cache as a static field in the final bolt). The question that I
 have now, is how do you keep track of the bolts upfront? Suppose that you
 have 3 bolts then you are counting the number of bolts that were processing
 one message MapmsgId,count_of_bolts?
 Can you send the name of the bolts that have been processed the message to
 the final bolt, and in the final bolt to check if the the list of all
 processing bolts is the same?

 What is the best approach here?

 Thanks .

 On Fri, Aug 1, 2014 at 7:51 AM, Michael Rose

 It's another case of a streaming join. I've done this before, there
 aren't too many gotchas, other than you need a datastructure which purges
 stale unresolved joins beyond the tuple timeout time (I used a Guava cache
 for this).

 Michael Rose (@Xorlev
 Senior Platform Engineer, FullContact

 On Thu, Jul 31, 2014 at 10:44 PM, Varun Vijayaraghavan wrote:

 That's interesting. Note that I have not used such a pattern before -
 and have done something similar. I have not used trident - so this probably
 will not answer your last question completely :)

 If you set up the topology such that links between bolt {1, 2, 3} and
 final bolt is stream grouped by msgId - you could keep the partially
 processed results in memory (or in a persisted state somewhere) - till you
 see the processed result for all the bolts.

 I would also expire msgIds which have not seen further results for
 beyond a certain threshold time.

 What do you think?

 On Fri, Aug 1, 2014 at 12:35 AM, Spico Florin

   I have a case study where the same message (identified by an id) is
 spread over a couple of processing bolts and a final bolt should act as a
 barrier. This final bolt should do its job on the message ID only when all
 the upfront bolts have finished their process on the message.

 As sketch is bellow

 Spout - (msgid1, payload)-process bolt 1 (msgId1, payloadb1)|
   -process bolt 2(msgId1,
 payloadb2)  |-final bolt(msg1,finalp)
   -process bolt 3(msgId1,
 payloadb3)  |

 So the final bolt should not start a work on a message id till the
 message was not processed by all the 3 processing bolts.
   My question are:
 1. Can be these case viable for storm?
 2. If the answer for the first question is yes, how can I achieve this
 3. Is possible to achieve this request without using trident?
 I look forward for your answers.

 - varun :)

Re: Acking is delayed by 5 seconds (in disruptor queue ?)

2014-07-18 Thread Michael Rose
Run your producer code in another thread to fill a LBQ, poll that with
nextTuple instead.

You should never be blocking yourself inside a spout.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Fri, Jul 18, 2014 at 1:03 PM, Itai Frenkel wrote:

  Hello again,

  Attached is a simplified reproduction (without the ShellSpout, but the
 concepts are the same).

  It seems that ack() and nextTuple() are always called on the same
 thread. That means that there is an inherent tradeoff.

 Either nextTuple sleeps a few ms  (and then the ShellSpout would serialize
 alot of nextTuple messages)

 or nextTuple can sleep but then the ack is delayed.

  Is there a way around this limitation?

 *From:* Itai Frenkel
 *Sent:* Thursday, July 17, 2014 9:42 PM
 *Subject:* Acking is delayed by 5 seconds (in disruptor queue ?)


  I have noticed that an ack takes 5 seconds to pass from the bolt to the
 spout (see debug log below). It is a simple topology with 1 spout, 1 bolt
 and 1 acker all running on the same worker. The spout and the bolt are
 ShellSpout and ShellBolt respectively.

  It looks like the message is delayed in the LMAX disruptor​ queue.
  How can I reduce this delay to ~1ms ?


  2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to
 tuple 2759481868963667531
 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack
 [-357211617823660063 -3928495599512172728]
 2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Bolt sent ack to
 tuple 2759481868963667531
 2014-07-17 18:30:30 b.s.d.executor [INFO] Processing received message
 source: bolt:2, stream: __ack_ack, id: {}, [-357211617823660063
 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting direct: 3; __acker
 __ack_ack [-357211617823660063]
 2014-07-17 18:30:35 b.s.d.executor [INFO] Processing received message
 source: __acker:1, stream: __ack_ack, id: {}, [-357211617823660063]
 2014-07-17 18:30:35 b.s.d.executor [INFO] Acking message 1138

Re: Acking is delayed by 5 seconds (in disruptor queue ?)

2014-07-18 Thread Michael Rose

Lets say we have it set at a limit of 100 items. The LBQ currently has 97
items in it. The SQS client runs again, pulls 10 messages, and successfully
inputs 3. The other 7 are blocked waiting for the queue to clear out. No
new HTTP requests are made to SQS while this LBQ is full (essentially this
is just called in a loop). nextTuple() eventually comes around, 6 blocked
for insertion. etc. etc. until all 6 blocked messages are inserted into the
LBQ. At this point, we call out to the SQS client again to fetch another 10
(and hopefully the LBQ has not completely drained by the time the SQS
client returns another 1-10 messages).

This model isn't picky about which SQS client is grabbing messages. SQS
doesn't guarantee order (or single delivery) anyways. In one of our
topologies, we have 8 spout instances consuming the same SQS queue for
throughput purposes. Maybe I've misunderstood your question though. We
normally don't have multiple topologies consuming the same queue, but
depending on the data there's no reason we couldn't.

Also in this model, we don't use a blocking poll method, if the LBQ is
empty, we skip emission and let Storm handle backoff if it wants to (see

2) By backoff I mean, your spout hasn't emitted in a while, it's going to
slow down on calling nextTuple() to not busy-wait your entire CPU. If
you're at maxSpoutPending limit, nextTuple is also not called until an ack
or fail has been received (which is one of the reasons its so important to
not block in the spout as much as possible).

Storm will use the
when you don't emit. By default, it waits only 1ms before calling again
(not exponential, just fixed), which is enough to prevent 10-100k/s polling
behavior but not significantly increase latency -- if you're ever not
emitting, you can probably afford to sleep for 1ms. We actually have it set
to 10ms, given that 99.9% of the time we'll have a message ready for

An alternative to this is the
if you do see an impact on your throughput--but I've never needed this.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Fri, Jul 18, 2014 at 2:41 PM, Itai Frenkel wrote:

  Thanks for the thorough and quick answer.

  Two follow up questions. The context is performing low latency stream
 manipulation (hopefully process a message the moment it arrives, maybe a
 few milliseconds later).

  1. What happens if the LBQ contains 10 items, while the Storm topology
 does not call nextTuple because of backoffs ? Wouldn't it be better of for
 another Amazon SQS client to handle these items? Or are you assuming a
 single Storm topology is the sole handler of these items ?

  2. If by backoff, you mean storm topology cannot handle any more
 messages, or maxSpoutPending is reached, then ignore this question. If by
 backoff you mean exponential backoff then I am worried about a message
 arriving to the queue and nextTuple is not called for a long time (more
 than a few milliseconds).


 *From:* Michael Rose
 *Sent:* Friday, July 18, 2014 11:27 PM

 *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)

  I have no experience with multilang spouts, however my impression from
 the docs is that you should be handling your own multiplexing if you're
 writing a shellspout. Otherwise if you block for 5 seconds emitting a
 tuple, you cannot process an ack until that's done. I'd experiment with
 that, if you change the sleep.spout.wait time to be 500ms and you don't
 block in your spout (instead returning sync) it should back off just as
 it does with a normal spout (see,
 sync is a no-op).

 The post you linked to was mine, and for a long time that was true
 (especially 0.6 and 0.7). Since Storm 0.8, the spout wait strategy will do
 automatic backoffs when no tuples are emitted. The only time I've
 intentionally blocked in a spout after 0.8.0 is to control throughout (e.g.
 only allow 10/s during development). I've never built a multilang spout

  Spouts, like bolts, run in a single-threaded context so blocking at all
 prevents acks/fails/emits from being done until the thread is unblocked.
 That is why it's best to have another thread dealing with IO and
 asynchronously feeding a concurrent data structure the spout can utilize.
 For example, in our internal Amazon SQS client our IO thread continuously
 fetches up to 10 messages per get and shoves them into a
 LinkedBlockingQueue (until full

Re: Acking is delayed by 5 seconds (in disruptor queue ?)

2014-07-18 Thread Michael Rose
Worth clarifying for anyone else in this thread that a LBQ separating
production from consumption is not a default thing in Storm, it's something
we cooked up to prefetch elements from slow/batching resources.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Fri, Jul 18, 2014 at 3:16 PM, Itai Frenkel wrote:

  Got it! thanks.
 *From:* Michael Rose
 *Sent:* Saturday, July 19, 2014 12:10 AM

 *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)


  Lets say we have it set at a limit of 100 items. The LBQ currently has
 97 items in it. The SQS client runs again, pulls 10 messages, and
 successfully inputs 3. The other 7 are blocked waiting for the queue to
 clear out. No new HTTP requests are made to SQS while this LBQ is full
 (essentially this is just called in a loop). nextTuple() eventually comes
 around, 6 blocked for insertion. etc. etc. until all 6 blocked messages are
 inserted into the LBQ. At this point, we call out to the SQS client again
 to fetch another 10 (and hopefully the LBQ has not completely drained by
 the time the SQS client returns another 1-10 messages).

  This model isn't picky about which SQS client is grabbing messages. SQS
 doesn't guarantee order (or single delivery) anyways. In one of our
 topologies, we have 8 spout instances consuming the same SQS queue for
 throughput purposes. Maybe I've misunderstood your question though. We
 normally don't have multiple topologies consuming the same queue, but
 depending on the data there's no reason we couldn't.

  Also in this model, we don't use a blocking poll method, if the LBQ is
 empty, we skip emission and let Storm handle backoff if it wants to (see

  2) By backoff I mean, your spout hasn't emitted in a while, it's going
 to slow down on calling nextTuple() to not busy-wait your entire CPU. If
 you're at maxSpoutPending limit, nextTuple is also not called until an ack
 or fail has been received (which is one of the reasons its so important to
 not block in the spout as much as possible).

  Storm will use the
 when you don't emit. By default, it waits only 1ms before calling again
 (not exponential, just fixed), which is enough to prevent 10-100k/s polling
 behavior but not significantly increase latency -- if you're ever not
 emitting, you can probably afford to sleep for 1ms. We actually have it set
 to 10ms, given that 99.9% of the time we'll have a message ready for

  An alternative to this is the
 if you do see an impact on your throughput--but I've never needed this.

  Michael Rose (@Xorlev
 Senior Platform Engineer, FullContact

 On Fri, Jul 18, 2014 at 2:41 PM, Itai Frenkel wrote:

  Thanks for the thorough and quick answer.

  Two follow up questions. The context is performing low latency stream
 manipulation (hopefully process a message the moment it arrives, maybe a
 few milliseconds later).

  1. What happens if the LBQ contains 10 items, while the Storm topology
 does not call nextTuple because of backoffs ? Wouldn't it be better of for
 another Amazon SQS client to handle these items? Or are you assuming a
 single Storm topology is the sole handler of these items ?

  2. If by backoff, you mean storm topology cannot handle any more
 messages, or maxSpoutPending is reached, then ignore this question. If by
 backoff you mean exponential backoff then I am worried about a message
 arriving to the queue and nextTuple is not called for a long time (more
 than a few milliseconds).


 *From:* Michael Rose
 *Sent:* Friday, July 18, 2014 11:27 PM

 *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)

   I have no experience with multilang spouts, however my impression from
 the docs is that you should be handling your own multiplexing if you're
 writing a shellspout. Otherwise if you block for 5 seconds emitting a
 tuple, you cannot process an ack until that's done. I'd experiment with
 that, if you change the sleep.spout.wait time to be 500ms and you don't
 block in your spout (instead returning sync) it should back off just as
 it does with a normal spout (see,
 sync is a no-op).

 The post you linked to was mine, and for a long time that was true
 (especially 0.6 and 0.7). Since Storm 0.8

Re: Distribute Spout output among all bolts

2014-07-16 Thread Michael Rose
Maybe we can help with your topology design if you let us know what you're
doing that requires you to shuffle half of the whole stream output to each
of the two different types of bolts.

If bolt b1 and bolt b2 are both instances of ExampleBolt (and not two
different types) as above, there's no point to doing this. Setting the
parallelism will make sure that data is partitioned across machines (by
default, setting parallelism sets tasks = executors = parallelism).

Unfortunately, I don't know of any way to do this other than shuffling the
output to a new bolt, e.g. bolt b0 a 'RouterBolt', then having bolt b0
round-robin the received tuples between two streams, then have b1 and b2
shuffle over those streams instead.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Wed, Jul 16, 2014 at 5:40 PM, Andrew Xor

 Hi Tomas,

  As I said in my previous mail the grouping is for a bolt *task* not for
 the actual number of spawned bolts; for example let's say you have two
 bolts that have a parallelism hint of 3 and these two bolts are wired to
 the same spout. If you set the bolts as such:

 tb.setBolt(b1, new ExampleBolt(), 2 /* p-hint
 tb.setBolt(b2, new ExampleBolt(), 2 /* p-hint

 Then each of the tasks will receive half of the spout tuples but each
 actual spawned bolt will receive all of the tuples emitted from the spout.
 This is more evident if you set up a counter in the bolt counting how many
 tuples if has received and testing this with no parallelism hint as such:

 tb.setBolt(b1, new ExampleBolt(),).shuffleGrouping(spout1);
 tb.setBolt(b2, new ExampleBolt()).shuffleGrouping(spout1);

 Now you will see that both bolts will receive all tuples emitted by

 Hope this helps.


 On Thu, Jul 17, 2014 at 2:33 AM, Tomas Mazukna


 when you connect your bolt to your spout you specify the grouping. If you
 use shuffle grouping then any free bolt gets the tuple - in my experience
 even in lightly loaded topologies the distribution amongst bolts is pretty
 even. If you use all grouping then all bolts receive a copy of the tuple.
 Use shuffle grouping and each of your bolts will get about 1/3 of the


 On Wed, Jul 16, 2014 at 7:05 PM, Andrew Xor


  I am trying to distribute the spout output to it's subscribed bolts
 evenly; let's say that I have a spout that emits tuples and three bolts
 that are subscribed to it. I want each of the three bolts to receive 1/3
 rth of the output (or emit a tuple to each one of these bolts in turns).
 Unfortunately as far as I understand all bolts will receive all of the
 emitted tuples of that particular spout regardless of the grouping defined
 (as grouping from my understanding is for bolt *tasks* not actual bolts).

  I've searched a bit and I can't seem to find a way to accomplish
 that...​ is there a way to do that or I am searching in vain?


 Tomas Mazukna

Re: Distribute Spout output among all bolts

2014-07-16 Thread Michael Rose
It doesn't say so, but if you have 4 workers, the 4 executors will be
shared evenly over the 4 workers. Likewise, 16 will partition 4 each. The
only case where a worker will not get a specific executor is when there are
less executors than workers (e.g. 8 workers, 4 executors), 4 of the workers
will receive an executor but the others will not.

It sounds like for your case, shuffle+parallelism is more than sufficient.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Wed, Jul 16, 2014 at 5:53 PM, Andrew Xor

 Hey Stephen, Michael,

  Yea I feared as much... as searching the docs and API did not surface any
 reliable and elegant way of doing that unless you had a RouterBolt. If
 setting the parallelism of a component is enough for load balancing the
 processes across different machines that are part of the Storm cluster then
 this would suffice in my use case. Although here
 the documentation says executors are threads and it does not explicitly say
 anywhere that threads are spawned across different nodes of the cluster...
 I want to avoid the possibility of these threads only spawning locally and
 not in a distributed fashion among the cluster nodes..


 On Thu, Jul 17, 2014 at 2:46 AM, Michael Rose

 Maybe we can help with your topology design if you let us know what
 you're doing that requires you to shuffle half of the whole stream output
 to each of the two different types of bolts.

 If bolt b1 and bolt b2 are both instances of ExampleBolt (and not two
 different types) as above, there's no point to doing this. Setting the
 parallelism will make sure that data is partitioned across machines (by
 default, setting parallelism sets tasks = executors = parallelism).

 Unfortunately, I don't know of any way to do this other than shuffling
 the output to a new bolt, e.g. bolt b0 a 'RouterBolt', then having bolt
 b0 round-robin the received tuples between two streams, then have b1 and b2
 shuffle over those streams instead.

 Michael Rose (@Xorlev
 Senior Platform Engineer, FullContact

 On Wed, Jul 16, 2014 at 5:40 PM, Andrew Xor

 Hi Tomas,

  As I said in my previous mail the grouping is for a bolt *task* not for
 the actual number of spawned bolts; for example let's say you have two
 bolts that have a parallelism hint of 3 and these two bolts are wired to
 the same spout. If you set the bolts as such:

 tb.setBolt(b1, new ExampleBolt(), 2 /* p-hint
 tb.setBolt(b2, new ExampleBolt(), 2 /* p-hint

 Then each of the tasks will receive half of the spout tuples but each
 actual spawned bolt will receive all of the tuples emitted from the spout.
 This is more evident if you set up a counter in the bolt counting how many
 tuples if has received and testing this with no parallelism hint as such:

 tb.setBolt(b1, new ExampleBolt(),).shuffleGrouping(spout1);
 tb.setBolt(b2, new ExampleBolt()).shuffleGrouping(spout1);

 Now you will see that both bolts will receive all tuples emitted by

 Hope this helps.


 On Thu, Jul 17, 2014 at 2:33 AM, Tomas Mazukna


 when you connect your bolt to your spout you specify the grouping. If
 you use shuffle grouping then any free bolt gets the tuple - in my
 experience even in lightly loaded topologies the distribution amongst bolts
 is pretty even. If you use all grouping then all bolts receive a copy of
 the tuple.
 Use shuffle grouping and each of your bolts will get about 1/3 of the


 On Wed, Jul 16, 2014 at 7:05 PM, Andrew Xor wrote:


  I am trying to distribute the spout output to it's subscribed bolts
 evenly; let's say that I have a spout that emits tuples and three bolts
 that are subscribed to it. I want each of the three bolts to receive 1/3
 rth of the output (or emit a tuple to each one of these bolts in turns).
 Unfortunately as far as I understand all bolts will receive all of the
 emitted tuples of that particular spout regardless of the grouping defined
 (as grouping from my understanding is for bolt *tasks* not actual bolts).

  I've searched a bit and I can't seem to find a way to accomplish
 that...​ is there a way to do that or I am searching in vain?


 Tomas Mazukna

Re: Does Storm support JDK7 ??

2014-07-14 Thread Michael Rose
You need only run the existing releases on JDK 7 or 8.
On Jul 14, 2014 7:15 AM, Haralds Ulmanis wrote:

 Actually now I've  customized a bit storm and recompiled as I needed some
 changes in it.
 But initially I just downloaded and run.

 On 14 July 2014 14:02, Adrianos Dadis wrote:

 Hi Haralds,

 Have you build it with JDK8 and run with JDK8, or you just downloaded
 Storm (which is build with JDK6) and run it with JDK8?

 On Mon, Jul 14, 2014 at 3:24 PM, Haralds Ulmanis

 Do not know about jdk7, I'm running on jdk8 and seems fine.

 On 14 July 2014 13:11, Veder Lope wrote:

 Storm is a great project!!! We are trying to build a project, where
 Storm holds a crucial role in our architecture.

 As I see in pom.xml (in maven-compiler-plugin), source and target are
 set to Java 1.6.
 1) Is Storm compatible with JDK7?
 2) I know we can download Storm (build for JDK6) and run it using JDK7,
 but there are a few incompatibilities* between JDK7 and JDK6. Will these
 incompatibilities affect Storm or not?
 3) Do you plan to move to JDK7?
 4) What is the restriction that holds are back to JDK6? (we are now
 stuck to JDK6 compile and runtime because of Storm)
 5) Can we just build Storm with JDK7 (alter both source and target in
 pom.xml) and then use JDK7 for runtime or not? Have you seen any errors
 with this road?

 *incompatibilities: Check this:

 Adrianos Dadis.

Re: Choosing where your tasks run in Storm

2014-07-04 Thread Michael Rose
You can make it happen with a custom scheduler, see this article (sorry for
mangling, getting this link through SpamAssassin on the group was a

http xumingming dot sinaapp dotcom
slash 885/twitter-storm-how-to-develop-a-pluggable-scheduler/

But it's nothing I've seriously attempted before, the existing schedulers
are in Clojure. It's not impossible to do for sure, but like Andrew said it
might well just be easier to have separate clusters that share ZK clusters.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Fri, Jul 4, 2014 at 10:28 AM, Andrew Montalenti

 I don't think this is possible right now, though I have thought about the
 same thing before. It *might* be true that Storm's support for YARN could
 eventually lead to this kind of thing, but I don't know much about it. For
 now, you're best off having separate Storm clusters for different classes
 of machines. You could consider putting Kafka queues between them to ensure
 cross-topology message reliability guarantees. (e.g. have your I/O bound
 topology read from kafka and write to kafka, and have your CPU-bound
 topology read from the Kafka topic produced by the first queue).

 Andrew Montalenti
 Co-Founder  CTO

 On Fri, Jul 4, 2014 at 7:59 AM, jeff saremi

 I'm wondering if this concept applies to Storm and if there's a way to do

 I'd like to limit the machines that certain spouts or bolts run on. There
 are many reasons for this. But for one let's assume that I have a bolt
 that is just a proxy for some legacy service. I want to monitor that
 service by way of the bolt and use it in my topology.
 Another way of looking at it is that I want to have a topology that spans
 different classes of machines.
 Let's say I have 3 classes of machines: small, medium, and large. Some
 topologies are limited to only one class of machines however some other
 topologies need to span two or more classes of machines.
 How can I do this in storm?

Re: Storm trident, multiple workers nothing happens

2014-06-18 Thread Michael Rose
In a single worker, you don't incur serialization or network overhead.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Tue, Jun 17, 2014 at 11:09 PM, Romain Leroux

 Still netty performances were clearly better for me (when using only 1
 worker since multiple workers didn't work with netty)

 2014-06-18 1:33 GMT+09:00 Danijel Schiavuzzi

 I had a similar problem, with Netty my Trident transactional topology was
 getting stuck after several occurences of Kafka spout restarting due to
 Kafka SocketTimeouts (I can reproduce this bug by blocking access to Kafka
 from the Supervisor machines with iptables, only a few tries are needed to
 reproduce it). Reverted to ZMQ and now it works flawlessly.

 I'll prepare a reproducible test case and fill a JIRA bug report ASAP.
  On Jun 17, 2014 3:51 PM, Romain Leroux wrote:

 As I read in different topics, here also simply switching back to ZeroMQ
 solved the issue ...

 2014-06-13 21:58 GMT+09:00 Romain Leroux

 After tuning a trident topology (kafka-storm-cassandra) to run on 1
 worker (so on 1 server), it works really well.

 I tried to deploy it using 2 workers on 1 server or 2 workers on 2
 The result is the same, nothing happens, no tuples are emitted and no
 messages in the logs.

 A quick profiling showed me that :

 77% of CPU time is main-SendThread(a.zookeeper.hostname:2181)

 The rest mainly come from 2 threads New I/O

 Therefore I am wondering if the problem can come from one of the
 followings :

 - Zookeeper cluster version is 3.4.6, which is different from the 3.3.x
 used by Storm 0.9.1-incubating ?
 But that is strange because there are absolutely no problem when using
 the same settings but with only 1 worker

 - Communication layer is netty, which can be not working well with my
 hardware ? (is this possible?)
 In case of 1 worker only netty seems not to be too much involved (no
 inter worker communication)
 Maybe changing to ZeroMQ ?

 Has someone faced similar issue ? Any pointer ? Or anything in
 particular to monitor / profile ?

RE: Extracting Performance Metrics

2014-06-16 Thread Michael Rose
What kind of issues does Metrics have that leads you to recommend
On Jun 16, 2014 6:57 PM, Dan wrote:

 Be careful when using Coda Hale's Metrics package when measuring latency.
 Consider using Gil Tene's
 High Dynamic Range Histogram instead:


 Date: Mon, 16 Jun 2014 18:20:11 -0400
 Subject: Re: Extracting Performance Metrics

 Also, I came across this presentation by Visible Measures which actually
 walks through a lot of great options covering most of what you want to know

 One other thing to be aware of is that in Storm 0.9.2 (forthcoming
 release), there is a new REST API used by the Storm UI for gathering some
 of these metrics:

 On Mon, Jun 16, 2014 at 6:13 PM, Andrew Montalenti

 I haven't used it yet, but a lot of people get pointed to metrics_storm:

 With this blog post that discusses it:

 Michael Noll also has a nice blog post about streaming Storm 0.9 metrics
 to Graphite:

 Currently, when we use Storm, we do a lot of custom metrics in Graphite
 using statsd, as described in this post (not about Storm, but about

 On Mon, Jun 16, 2014 at 4:37 PM, Anis Nasir wrote:

 Dear all,

 I am running a cluster with 1 kafka + 1 nimbus + 10 supervisor + 1
 zookeeper nodes. I am executing multiple topologies on the cluster and I
 want to extract different metrics that I am mentioning below. Can someone
 help me by recommending tools that I can use to extract this information.

 Per Topology
  - Throughput
  - Latency

 Per Spout or Bolt
  - Throughput
  - Latency
  - Execution Time
  - Queuing Time
  - Number of Messages Processed


Re: Non-DRPC topologies and number of assigned workers

2014-06-11 Thread Michael Rose
A topology can run in as many workers as you assign at launch time, DRPC or

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Wed, Jun 11, 2014 at 1:08 PM, Nima Movafaghrad wrote:

 Hi Community,

 Wondering if a non-drpc topology can run in multiple workers or is it just
 gonna run within one?



Re: resource aware task scheduling in Storm cluster

2014-06-10 Thread Michael Rose
Out of curiosity, what kind of changes have you been making?

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Tue, Jun 10, 2014 at 9:36 PM, Jason Jackson wrote:

 Hi Alex,

 We're using a slightly modified version of
 storm-mesos in production. Currently there's no one supporting
 storm-mesos, nor have we been open sourcing our changes in this project at
 this point in time.

 // Jason

 On Mon, Mar 3, 2014 at 3:00 PM, Alexander S. Klimov wrote:

  Hi guys,

 I found this solution for resource aware task scheduling in Storm:

 What is the status of this project? Is this still supported? Would it be
 recommended to be used in production?


Re: Order of Bolt definition, catching that subscribes from non-existent component [ ...]

2014-06-06 Thread Michael Rose
You can have a loop on a different stream. It's not always the best thing
to do (deadlock possibilities from buffers) but we have a production
topology that has that kind of pattern. In our case, one bolt acts as a
coordinator for recursive search.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Fri, Jun 6, 2014 at 2:28 PM, Abhishek Bhattacharjee wrote:

 I am sorry for the late reply.
 Yes , you can't have a loop. You can have a chain though( which doesn't
 close upon itself ! ).

 Thanks :-)

 On Wed, May 7, 2014 at 12:50 PM, shahab wrote:

 Thanks Abhishek. But this also implies that we can not have a loop ( of
 message processing stages) using Storm, right?


 On Mon, May 5, 2014 at 9:45 PM, Abhishek Bhattacharjee wrote:

 I don't think what you are trying to do is achievable. Data in storm
 always move forward so you can't give it back to a bolt from which it
 originated. That is a bolt can subscribe from bolts which were created
 before it's creation. So, I think you can create another object of the A
 bolt say D and then assign the o/p of C to D.

 On Mon, May 5, 2014 at 8:11 PM, shahab wrote:


 I am trying to define a topology as following:
 S : is a spout
 A,B,C : are bolts
 -- : means emitting message

 S  --A
  A  --B
 B --C
 C --A

 I am declaring the Spouts and Bolts in the above order in my java code
 , first S, then A , B and finally C.

 I am using  globalGrouping(BoltName, StreamID) for collecting
 messages to be collected by each bolt,

 The problem is that I receive an error, while defining bolt A saying
 that subscribes from non-existent component [C] .

 I guess the error is happening because component C is not defined yet!
 but what could be the solution to this?


 *Abhishek Bhattacharjee*
 *Pune Institute of Computer Technology*

 *Abhishek Bhattacharjee*
 *Pune Institute of Computer Technology*

Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?

2014-06-02 Thread Michael Rose
You don't have to include a specific bolt for init code. It's not difficult
to push your init code into a separate class and call it from your bolts,
lock on that class, run init, and then allow other instances to skip over

Without changing bolt/spout code, I've taken to including a task hook for
init code (e.g. properties / Guice). Check out BaseTaskHook, it's easily
extendible and can be included pretty easily too:


Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Mon, Jun 2, 2014 at 7:25 PM, Chris Bedford wrote:

 Yes.. if i used prepare or open on spouts or bolts it would work, but
 unfortunately it would be a bit brittle.  I'd have to include a spout or
 bolt just for initializing my invariant code... i'd rather do that when the
 topology is activated on the worker.. so this seems like a good use of an
  activated()method on the StormTopology class   (where activated()
 would be called after the StormTopology is deserialized by the worker node

 But, if there is no such method, I will make do with what is there.

 thanks for your response.


 On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant

 The bolt base classes have a prepare method:

 and the spout base classes have a similar activate method:

 Is that sufficient for your needs or were you thinking of something


 On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote:
  Hi there -
  I would like to set up some state that spouts and bolts share, and I'd
 like to
  prepare this state when the StormTopology gets 'activated' on a worker.
  it would be great if the StormTopology had something like a prepare or
  method to indicate when it is starting.  I looked but i could find no
 such API.
Maybe I should submit an enhancement request ?
  Thanks in advance for your responses,
-  Chris
  [ if anyone is curious, the shared state is for all my application code
  check or not check invariants.  the invariant checking takes additional
  so we don't want to do it in production.. but during
 testing/development it
  helps catch bugs].
  Chris Bedford
  Founder  Lead Lackey
  Build Lackey Labs:
  Go Grails!:

 Chris Bedford

 Founder  Lead Lackey
 Build Lackey Labs:
 Go Grails!:

Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?

2014-06-02 Thread Michael Rose
No, it will be called per bolt instance. That's why init code needs to be
guarded behind a double-check lock to guarantee it only executes once per


private static volatile boolean initialized = false;


if (!initialized) {
   synchronized(MyInitCode.class) {
  if (!initialized) {
 // do stuff
 initialized = true;

Until there's a set of lifecycle hooks, that's about as good as I've cared
to make it.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Mon, Jun 2, 2014 at 8:27 PM, Chris Bedford wrote:

 This looks promising. thanks.

 hope you don't mind one more question  --  if i create my own
 implementation of ITaskHook and add it do the config as you illustrated in
 prev. msg..will the prepare() method of my implementation be called
 exactly once shortly after StormTopology is deserialized by the worker
 node process ?

  - chris

 On Mon, Jun 2, 2014 at 7:09 PM, Michael Rose

 You don't have to include a specific bolt for init code. It's not
 difficult to push your init code into a separate class and call it from
 your bolts, lock on that class, run init, and then allow other instances to
 skip over it.

 Without changing bolt/spout code, I've taken to including a task hook for
 init code (e.g. properties / Guice). Check out BaseTaskHook, it's easily
 extendible and can be included pretty easily too:


 Michael Rose (@Xorlev
 Senior Platform Engineer, FullContact

 On Mon, Jun 2, 2014 at 7:25 PM, Chris Bedford

 Yes.. if i used prepare or open on spouts or bolts it would work, but
 unfortunately it would be a bit brittle.  I'd have to include a spout or
 bolt just for initializing my invariant code... i'd rather do that when the
 topology is activated on the worker.. so this seems like a good use of an
  activated()method on the StormTopology class   (where activated()
 would be called after the StormTopology is deserialized by the worker node

 But, if there is no such method, I will make do with what is there.

 thanks for your response.


 On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant

 The bolt base classes have a prepare method:

 and the spout base classes have a similar activate method:

 Is that sufficient for your needs or were you thinking of something


 On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote:
  Hi there -
  I would like to set up some state that spouts and bolts share, and
 I'd like to
  prepare this state when the StormTopology gets 'activated' on a
  it would be great if the StormTopology had something like a prepare
 or open
  method to indicate when it is starting.  I looked but i could find no
 such API.
Maybe I should submit an enhancement request ?
  Thanks in advance for your responses,
-  Chris
  [ if anyone is curious, the shared state is for all my application
 code to
  check or not check invariants.  the invariant checking takes
 additional time,
  so we don't want to do it in production.. but during
 testing/development it
  helps catch bugs].
  Chris Bedford
  Founder  Lead Lackey
  Build Lackey Labs:
  Go Grails!:

 Chris Bedford

 Founder  Lead Lackey
 Build Lackey Labs:
 Go Grails!:

 Chris Bedford

 Founder  Lead Lackey
 Build Lackey Labs:
 Go Grails!:

Re: Which daemon tool for Storm nodes?

2014-05-21 Thread Michael Rose
We use upstart. Supervisord would also work. Just anything to keep an eye
on it and restart it if it dies (a very rare occurrence).

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Wed, May 21, 2014 at 7:09 PM, Connie Yang wrote:


 Is there a recommended supervisor or daemon tool for Storm nodes?  I'm
 using Supervisor for Storm and Zookeeper.

 I heard some concern in using Supervisor, but I haven't researched into
 the matter.

 Any comments on this?


Re: Storm with video/audio streams

2014-05-19 Thread Michael Rose
No reason why you couldn't do it, but as far as I know it hasn't been done
before. You can send any kind of serializable data into a topology. You'd
probably need to emit frames from the spout.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Mon, May 19, 2014 at 6:13 AM, Suparno Datta suparno.da...@gmail.comwrote:


 I just started playing around with storm a few days back. I worked
 with some basic examples using the twitter streaming api. I was wondering
 If storm can also be used with live video/audio streaming api ( the youtube
 streaming api for example). I wanted to develop a app which does something
 like detecting and tracking faces from a live video stream using Storm. But
 i am not sure if this is even feasible. Most of the examples i see deal
 with text streams. Has any one here already tried/ thought of something
 like this ?


 Suparno Datta

Re: How resilient is Storm w/o supervision

2014-05-02 Thread Michael Rose
In practice, we rarely have issues with our Storm processes. We have
Nimbus/supervisors that never restart for months. But when they do have
issues, it's very nice to have them under supervision. Sometimes it's not
even things under their control (e.g. OOM during high memory usage

For development, there shouldn't be an issues foregoing supervision.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Fri, May 2, 2014 at 12:41 PM, P. Taylor Goetz wrote:

 I don’t think you need root to run supervisord:

 If you’re just testing something out, and don’t mind your cluster going
 down, then running without supervision is okay. But I would NEVER suggest
 someone run Storm’s daemons without supervision in a production environment.

 - Taylor

 On May 2, 2014, at 2:29 PM, Albert Chu wrote:

  I'm attempting to run Storm on a platform that I don't have root on.  So
  I won't be able to run it under Redhat's supervisord that's already
  How resilient are the Storm daemons by themselves?  Are they reasonably
  resilient or are they programmed to not handle even relatively simple
  I should probably say, this probably wouldn't be run in a production
  environment.  Just trying to understand if the documentation writers are
  saying, you should really do this for production or it won't work if
  you don't do this.
  Albert Chu
  Computer Scientist
  High Performance Systems Division
  Lawrence Livermore National Laboratory

Re: Machine specs

2014-04-30 Thread Michael Rose
In AWS, we're fans of c1.xlarges, m3.xlarges, and c3.2xlarges, but have
seen Storm successfully run on cheaper hardware.

Our Nimbus server is usually bored on a m1.large.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Wed, Apr 30, 2014 at 9:48 PM, Cody A. Ray wrote:

 We use m1.larges in EC2 for
 both nimbus and supervisor machines (though the m1 family have been
 deprecated in favor of m3). Our use case is to do some pre-aggregation
 before persisting the data in a store. (The main bottleneck in this setup
 is the downstream datastore, but memory is the primary constraint on the
 worker machines due to the in-memory cache which wraps the trident state.)

 For what its worth, Infochimps 
  or m3.xlarge machines.

 Using the Amazon cloud machines as a reference, we like to use either the
 c1.xlarge machines (7GB ram, 8 cores, $424/month, giving the highest
 CPU-performance-per-dollar) or the m3.xlargemachines (15 GB ram, 4 cores,
 $365/month, the best balance of CPU-per-dollar and RAM-per-dollar). You
 shouldn’t use fewer than four worker machines in production, so if your
 needs are modest feel free to downsize the hardware accordingly.

 Not sure what others would recommend.


 On Wed, Apr 30, 2014 at 5:57 PM, Software Dev 

 What kind of specs are we looking at for

 1) Nimbus
 2) Workers

 Any recommendations?

 Cody A. Ray, LEED AP

Re: Is it a must to have /etc/hosts mapping or a DNS in a multinode setup?

2014-04-29 Thread Michael Rose
We don't use /etc/hosts mapping, we only use hostnames / ips.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Tue, Apr 29, 2014 at 8:29 AM, Derek Dagit wrote:

 I have not tried it, but there is a config for this purpose:

 On 4/29/14, 0:41, Sajith wrote:

 Hi all,

 Is it a must to have a /etc/hosts mapping or a DNS in a multinode storm
 cluster? Can't supervisors talk to each other through ZooKeeper or nimbus
 using IP addresses directly ?

Re: PDF processing use case in storm!!

2014-04-28 Thread Michael Rose
I'd be inclined to say that while you can make it work, your unit of work
(a whole PDF) makes it unsuitable to Storm. In general, the more you can
break down each operation the better Storm will work for you. You're likely
to get worse latency out of Storm due to the essentially random delegation
of tuples.

You could really abuse Storm if you wanted and use it as a distributed
application container with threadpools, I've done it. But you're really
going to see a better experience out of a webservice if it's live-mode

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Mon, Apr 28, 2014 at 8:23 AM, Deepak Sharma deepakmc...@gmail.comwrote:

 We need parallelism here.
 As lot of users may be using the service at the same time.It may be for
 different file or the same file.


 On Mon, Apr 28, 2014 at 7:41 PM, Marc Vaillant vaill...@animetrics.comwrote:

 I think it's important to know whether or not some form of parallelism
 (other than throughput) is required, otherwise a standard webservice
 seems sufficient for this use case.

 On Mon, Apr 28, 2014 at 07:46:35AM -0400, Andrew Perepelytsya wrote:
  You can build request response type topologies via DRPC. However,
 unless we're
  talking about processing numerous pdfs at once - bad fit, IMO.
  If there is parallelism required you might be better off with a custom
 yarn app
  - looks like YAYA makes it tolerable top write.
  On Apr 28, 2014 2:41 AM, Deepak Sharma wrote:
  Hi All,
  Just wanted to check if this can be valid storm use case.
  I want to write 1 simple storm topology which can read pdf file ,
  it , make some changes like convert it to doc and save the new file.
  I know this can be easily done in batch mode using hadoop.But we
 want to do
  it in real time ,i.e. when the user demands it.
  We already do it using some java api but it takes lot of time in all
  Can this be achieved in Storm?If yes , Is there any pointer to any
  similar to this use case?
  NOTICE: This message is intended for the use of the individual or
 entity to
  which it is addressed and may contain information that is confidential,
  privileged and exempt from disclosure under applicable law. If the
 reader of
  this message is not the intended recipient, you are hereby notified
 that any
  printing, copying, dissemination, distribution, disclosure or
 forwarding of
  this communication is strictly prohibited. If you have received this
  communication in error, please contact the sender immediately and
 delete it
  from your system. Thank You.


Re: Passing command line arguments to storm

2014-04-17 Thread Michael Rose
Yes. Ultimately, that runs the main method of MyTopology, so just like any
other main method you get String[] args.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Thu, Apr 17, 2014 at 5:36 PM, Software Dev static.void@gmail.comwrote:

 Is it possible to pass arguments that adhere to the CommandLine interface?

 storm jar topology.jar com.test.MyTopology --config foo --remote

Re: Server load - Topology optimization

2014-03-19 Thread Michael Rose
messageId is any unique identifier of the message, such that when ack is
called on your spout you're returned the identifier to then mark the work
as complete in the source in the case it supports replay.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Wed, Mar 19, 2014 at 10:18 AM, David Crossland da...@elastacloud.comwrote:

  Where is this messageId derived from?

  I note there is a tuple.getMessageId() but there does not appear to be
 an overload to emit that accepts this?

  There is an overload

 java.util.Listjava.lang.Object tuple)

  I don't think this is what you are referring to however.

  My bolt execute method looks like this;  (sorry if I'm being obtuse,
 this is still very new.. )

 public void execute(Tuple tuple) {
 String msg = (String)tuple.getValue(0);
 if(msg == null)
 logger.log(Level.ERROR, Message is null);
 //acknowledge the tuple has failed and return;

  MessageProcessor processor = new MessageProcessor();
 EventMessage message = processor.processMessage(msg);

  if(message == null)
 logger.log(Level.DEBUG, Message did not conform to a known
 logger.log(Level.DEBUG, msg);
 //acknowlege the tuple, but dont do anything with it, we dont
 have to emit

  if(message instanceof MonetiseEvent)
 logger.log(Level.DEBUG, recieved monetise message);
 _collector.emit(new Values(message));


   *From:* Sean Zhong
 *Sent:* ‎Wednesday‎, ‎19‎ ‎March‎ ‎2014 ‎15‎:‎39

  Hi David,

  As I said, to get the ack count for spout, you need to add a message Id
 when emitting. Likes this,
 collector.emit(new Values(msg), messageId)
   ^| here

  If the message Id is null, then there will be ack message sent to spout.


 On Wed, Mar 19, 2014 at 4:49 PM, David Crossland da...@elastacloud.comwrote:

  I had noticed the ack count never increased for the spout.

  The spout is a BaseRichSpout, I presumed the underlying code should
 handle the ack and display appropriate metrics. But you can see from the
 first bolt that all messages have been asked.. I don't know if this is an

  Ive been doing a bit of reading around the service bus, if I understand
 correctly it can push 20 messages per second per connection (I need to
 confirm this with a colleague who has the experience with this..) If that's
 the case then it tallies with the throughput I've been seeing (approx…).  I
 suspect the problem isn't so much the topology, but the service bus

  I'll come back if this doesn't pan out.


   *From:* Sean Zhong
 *Sent:* ‎Wednesday‎, ‎19‎ ‎March‎ ‎2014 ‎00‎:‎45

  The Spout is suspicous.

  From the screenshots, you are using no-acked spout, there may also be
 GC issue there.
 Here is the suggestion:

  Check whether you are making connection in the context of nextTuple. If
 that is true, it means a large latency. You can check the spout latency by
 enabling acked spout(when emmit, add a messageId. e.g. emit(new
 Values(msg), messageId), and check the latency.
 If this is the case, you need to create a seperate thread for data
 connection in spout, and use a queue to buffer messages, then nextTuple
 just pol the queue.

 On Wed, Mar 19, 2014 at 8:06 AM, Michael Rose mich...@fullcontact.comwrote:

  Well, I see you have 30 spouts instances and 3 bolt instances. Doesn't
 seem like it's a huge bottleneck, but it is having to send over the wire
 much of the time. Something to keep in mind for the future.

  I'd be most suspicious of your spouts. All spouts on a single worker
 are run single-threaded (in an event loop calling nextTuple()), so if you
 have ANY blocking work that'll kill throughput. If AzureServiceBus is
 anything like SQS, response times are not instant. We tend to have
 background threads feeding a queue in our spouts for that reason, as we use
 SQS for many of our topologies. Given 12 workers, you'd have ~3 spouts per
 machine running single-threaded.

  With spouts, you should attempt to maintain as little blocking work as
 possible...if you're using a queue, you should be using Queue#poll() to
 either return a value OR null, and only emit a tuple if an event is
 available (and meets your criteria). Storm will handle rate limiting the
 spouts with sleeps.

  Michael Rose (@Xorlev

Re: Server load - Topology optimization

2014-03-18 Thread Michael Rose
Well, I see you have 30 spouts instances and 3 bolt instances. Doesn't seem
like it's a huge bottleneck, but it is having to send over the wire much of
the time. Something to keep in mind for the future.

I'd be most suspicious of your spouts. All spouts on a single worker are
run single-threaded (in an event loop calling nextTuple()), so if you have
ANY blocking work that'll kill throughput. If AzureServiceBus is anything
like SQS, response times are not instant. We tend to have background
threads feeding a queue in our spouts for that reason, as we use SQS for
many of our topologies. Given 12 workers, you'd have ~3 spouts per machine
running single-threaded.

With spouts, you should attempt to maintain as little blocking work as
possible...if you're using a queue, you should be using Queue#poll() to
either return a value OR null, and only emit a tuple if an event is
available (and meets your criteria). Storm will handle rate limiting the
spouts with sleeps.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Tue, Mar 18, 2014 at 5:14 PM, David Crossland da...@elastacloud.comwrote:

  Perhaps these screenshots might shed some light? I don't think there is
 much of a latency issue.  I'm really starting to suspect there is some
 consumption rate issue from the topic.

  I set the spout to a high parallelism value as it did seem to improve

  But if there is anything you can spot that would be grand


   *From:* Nathan Leung
 *Sent:* ‎Tuesday‎, ‎18‎ ‎March‎ ‎2014 ‎21‎:‎14

  It could be bolt 3.  What is the latency like between your worker and
 your redis server?  Increasing the number of threads for bolt 3 will likely
 increase your throughput.  Bolt 1 and 2 are probably CPU bound, but bolt 3
 is probably restricted by your network access.  Also I've found that
 localOrShuffleGrouping can improve performance due to reduced network

 On Tue, Mar 18, 2014 at 3:55 PM, David Crossland da...@elastacloud.comwrote:

  A bit more information then

  There are 4 components

  Spout - This is reading from an azure service bus topic/subscription.
 A connection is created in the open() method of the spout, nextTuple does a
 peek on the message, and invokes the following code;

 StringWriter writer = new StringWriter();
 IOUtils.copy(message.getBody(), writer);
 String messageBody = writer.toString();

  It then deletes the message from the queue.

  Overall nothing all that exciting..

  Bolt 1 - Filtering

  Parses the message body (json string) and converts it to an object
 representation.  Filters out anything that isn't a monetise message.  It
 then emits the monetise message object to the next bolt.  Monetise messages
 account for ~ 0.03% of the total message volume.

  Bolt 2 - transformation

  Basically extracts from the monetise object the values that are
 interesting and contracts a string which it emits

  Bolt 3 - Storage

  Stores the transformed string in Redis using the current date/time as


  Shuffle grouping is used with the topology

  I ack every tuple irrespective of whether I emit the tuple or not.  It
 should not be attempting to replay tuple.


  I don't think Bolt 2/3 are the cause of the bottleneck.  They don't
 have to process much data at all tbh.

  I can accept that perhaps there is something inefficient with the
 spout, perhaps it just can't read from the service bus quickly enough. I
 will do some more research on this and have a chat with the colleague who
 wrote this component.

  I suppose I'm just trying to identify if I've configured something
 incorrectly with respect to storm, whether I'm correct to relate the total
 number of executors and tasks to the total number of cores I have
 available.  I find it strange that I get a better throughput when I choose
 an arbitrary large number for the parallelism hint than if I constrain
 myself to a maximum that equates to the number of cores.


   *From:* Nathan Leung
 *Sent:* ‎Tuesday‎, ‎18‎ ‎March‎ ‎2014 ‎18‎:‎38

  In my experience storm is able to make good use of CPU resources, if
 the application is written appropriately.  You shouldn't require too much
 executor parallelism if your application is CPU intensive.  If your bolts
 are doing things like remote DB/NoSQL accesses, then that changes things
 and parallelizing bolts will give you more throughput.  Not knowing your
 application, the best way to pin down the problem is to simplify your
 topology.  Cut out everything except for the Spout.  How is your filtering
 done?  if you return without emitting, the latest versions of storm will
 sleep before trying again.  It may be worthwhile to loop in the spout until
 you receive a valid message, or the bus

Re: Best way to get JMX access to storm metrics for spout tuples emitted / acked | bolt tuples emitted, latency etc.

2014-03-17 Thread Michael Rose
Depending on what you need, you could use the Thrift interface to Nimbus to
grab the statistics. The UI project does do some calculations on the raw
metrics, so it's not quite the same. The algorithms it uses aren't too
difficult to replicate.

We ended up building something very similar to what Ooyala did, customized
for our specific needs, it's an excellent pattern.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Mon, Mar 17, 2014 at 6:21 PM, Chris Bedford ch...@buildlackey.comwrote:

 Hi, I'm interested in getting metrics via JMX on not onlycontainer
 level  factors (such as # of garbage collects, heap usage,etc.),   but
 also metrics that describe how spouts and bolts are performing  (e.g., # of
 tuples emitted, # transferred -- the same kind of stuff that the storm UI

 I ran across this project :

 I was just wondering if this is the best best to pursue.  Does anyone have
 any concrete experience with this that they can share?


Re: Storm stream grouping examples

2014-03-05 Thread Michael Rose
What kind of comparisons are you looking for? How they functionally work?

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Wed, Mar 5, 2014 at 9:52 AM, Roberto Coluccio

 Hello folks,

 I was unable to find any complete example (or, better, related work in the
 scientific literature) in which (almost) all the *stream grouping
 policies* have been used and compared. Do you have any reference you
 could please share with me?

 Thank you and best regards,

 Roberto Coluccio

Re: Storm stream grouping examples

2014-03-05 Thread Michael Rose
+1, localOrShuffle will be a winner, as long as it's evenly distributing
work. If 1 tuple could say produce a variable 1-100 resultant tuples (and
these results were expensive enough to process, e.g. IO), it might well be
worth shuffling vs. localShuffling.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Wed, Mar 5, 2014 at 11:19 AM, Nathan Leung wrote:

 In my experience on a 1 Gb network localOrShuffleGrouping was a clear
 winner in terms of performance.  But I haven't tested with 10 Gb, and if
 you have substantial business logic then that becomes a bigger factor than
 serializing/transferring data on the network.  I think the performance of
 any given grouping is too dependent on your business logic; it will be
 difficult to quantify how well it performs in a canned benchmark.  And
 sometimes your business logic will define a grouping for you (e.g. fields
 grouping) whether it's the best performer or not.

 On Wed, Mar 5, 2014 at 1:05 PM, Roberto Coluccio wrote:

 Hello Michael, thanks for your feedback.

 I'm looking for a performance comparison. I know that not all the
 policies are really comparable, but even obvious comparisons all listed
 together could be a useful reference.


 On Wed, Mar 5, 2014 at 6:58 PM, Michael Rose mich...@fullcontact.comwrote:

 What kind of comparisons are you looking for? How they functionally work?

 Michael Rose (@Xorlev
 Senior Platform Engineer, FullContact

 On Wed, Mar 5, 2014 at 9:52 AM, Roberto Coluccio wrote:

 Hello folks,

 I was unable to find any complete example (or, better, related work in
 the scientific literature) in which (almost) all the *stream grouping
 policies* have been used and compared. Do you have any reference you
 could please share with me?

 Thank you and best regards,

 Roberto Coluccio

Re: Tuning and nimbus at 99%

2014-03-03 Thread Michael Rose

I'm a fan of SPM for Storm, but there's other debugging that needs to be
done here if the process quits constantly.


Since you're using storm-deploy, I assume the processes are running under
supervisor. It might be worth killing the supervisor by hand, then running
it yourself (ssh as storm, cd storm/daemon, supervise .) and seeing what
kind of errors you see.

Are your disks perhaps filled?

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Mon, Mar 3, 2014 at 6:49 PM, Otis Gospodnetic

 Hi Sean,

 I don't think you can see the metrics you need to see with AWS CloudWatch.
  Have a look at SPM for Storm.  You can share graphs from SPM directly if
 you want, so you don't have to grab and attach screenshots manually. See:

 My bet is that you'll see GC metrics spikes

 Performance Monitoring * Log Analytics * Search Analytics
 Solr  Elasticsearch Support *

 On Mon, Mar 3, 2014 at 8:21 PM, Sean Solbak wrote:

 I just created a brand new cluster with storm-deploy command.

 lein deploy-storm --start --name storm-dev --commit

 I had a meeting, did nothing to the box, no topologies were run.  I came
 back 2 hours later and nimbus was at 100% cpu.

 I'm running on an m1-small on the following ami - ami-58a3cf68. Im
 unable to get a threaddump as the process is getting killed and restarted
 too fast.  I did attach a 3 hour snapshot of the ec2 monitors.  Any
 guidance would be much appreciated.


 On Sun, Mar 2, 2014 at 9:11 PM, Sean Solbak wrote:

 The only error in the logs is which happened over 10 days ago was.

 2014-02-22 01:41:27 b.s.d.nimbus [ERROR] Error when processing event Unable to delete directory
 at backtype.storm.util$rmr.invoke(util.clj:442)
 at backtype.storm.timer$mk_timer$fn__3002.invoke(timer.clj:26)
 at ~[clojure-1.4.0.jar:na]
 at ~[na:1.6.0_27]

 Its fine.  I can rebuild a new cluster.  Storm deploy makes it pretty

 Thanks for you help on this!

 As for my other question.

 If my trident batch interval is 500ms and I keep the spout pending and
 batch size small enough, will I be able to get real time results (ie sub 2
 seconds)?  I've played with the various metrics (I literally have a
 spreadsheet of parameters to results) and haven't been able to get it.  Am
 I just doing it wrong?  What would the key parameters be?  The complete
 latency is 500 ms but trident seems to be way behind despite non of my
 bolts having a capacity  0.6.  This may have to do with nimbus being
 throttled so I will report back.  But if there are people out there who
 have done this kind of thing, Id like to know if Im missing an obvious
 parameter or something.


 On Sun, Mar 2, 2014 at 8:09 PM, Michael Rose mich...@fullcontact.comwrote:

 The fact that the process is being killed constantly is a red flag.
 Also, why are you running it as a client VM?

 Check your nimbus.log to see why it's restarting.

 Michael Rose (@Xorlev
 Senior Platform Engineer, FullContact

 On Sun, Mar 2, 2014 at 7:50 PM, Sean Solbak wrote:

   uintx ErgoHeapSizeLimit = 0
 uintx InitialHeapSize  := 27080896
 uintx LargePageHeapSizeThreshold= 134217728
 uintx MaxHeapSize  := 698351616

 so initial size of ~25mb and max of ~666 mb

 Its a client process (not server ie the command is java -client
 -Dstorm.options...).  The process gets killed and restarted continously
 with a new PID (which makes getting the PID tough to get stats on).  I 
 have VisualVM but if I run

 jstat -gc PID, I get

Re: Zookeepr on different ports

2014-03-02 Thread Michael Rose
I'd recommend just using one Zookeeper instance if they're on the same
physical host. There's no reason why a development ZK ensemble needs 3

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Sun, Mar 2, 2014 at 10:15 AM, Arun Sethia wrote:


 We have setup three zookeeper instances on one virtual machine, they
 are running on different ports (2181,2182,2183).

 Eventually in production we will have each instance on separate
 virtual machine and we can have same port (2181).

 We have seen we can configure multiple zookeeper instance (cluster)
 using storm.zookeeper.servers, and we can use storm.zookeeper.port to
 define a port.

 Since we have zookeeper on one machine on different ports
 (2181,2182,2183), but not able to configure different ports using

 Any help will be great for us.


Re: Tuning and nimbus at 99%

2014-03-02 Thread Michael Rose
The fact that the process is being killed constantly is a red flag. Also,
why are you running it as a client VM?

Check your nimbus.log to see why it's restarting.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Sun, Mar 2, 2014 at 7:50 PM, Sean Solbak wrote:

   uintx ErgoHeapSizeLimit = 0
 uintx InitialHeapSize  := 27080896
 uintx LargePageHeapSizeThreshold= 134217728
 uintx MaxHeapSize  := 698351616

 so initial size of ~25mb and max of ~666 mb

 Its a client process (not server ie the command is java -client
 -Dstorm.options...).  The process gets killed and restarted continously
 with a new PID (which makes getting the PID tough to get stats on).  I dont
 have VisualVM but if I run

 jstat -gc PID, I get

 832.0  832.0   0.0   352.9   7168.0   1115.9   17664.0 1796.0
 21248.0 16029.6  50.268   0  0.0000.268

 At this point I'll likely just rebuild the cluster.  Its not in prod yet
 as I still need to tune it.  I should have wrote 2 separate emails :)


 On Sun, Mar 2, 2014 at 7:10 PM, Michael Rose mich...@fullcontact.comwrote:

 I'm not seeing too much to substantiate that. What size heap are you
 running, and is it near filled? Perhaps attach VisualVM and check for GC

  Michael Rose (@Xorlev
 Senior Platform Engineer, FullContact

 On Sun, Mar 2, 2014 at 6:54 PM, Sean Solbak wrote:

 Here it is.  Appears to be some kind of race condition.

 On Sun, Mar 2, 2014 at 6:42 PM, Michael Rose mich...@fullcontact.comwrote:

 Can you do a thread dump and pastebin it? It's a nice first step to
 figure this out.

 I just checked on our Nimbus and while it's on a larger machine, it's
 using 1% CPU. Also look in your logs for any clues.

 Michael Rose (@Xorlev
 Senior Platform Engineer, FullContact

 On Sun, Mar 2, 2014 at 6:31 PM, Sean Solbak wrote:

 No, they are on seperate machines.  Its a 4 machine cluster - 2
 workers, 1 nimbus and 1 zookeeper.

 I suppose I could just create a new cluster but Id like to know why
 this is occurring to avoid future production outages.


 On Sun, Mar 2, 2014 at 6:19 PM, Michael Rose 

 Are you running Zookeeper on the same machine as the Nimbus box?

  Michael Rose (@Xorlev
 Senior Platform Engineer, FullContact

 On Sun, Mar 2, 2014 at 6:16 PM, Sean Solbak wrote:

 This is the first step of 4. When I save to db I'm actually saving
 to a queue, (just using db for now).  The 2nd step we index the data and
 3rd we do aggregation/counts for reporting.  The last is a search that 
 planning on using drpc for.  Within step 2 we pipe certain datasets in 
 time to the clients it applies to.  I'd like this and the drpc to be 
 sub 2s
 which should be reasonable.

 Your right that I could speed up step 1 by not using trident but our
 requirements seem like a good use case for the other 3 steps.  With many
 results per second batching should effect performance a ton if the batch
 size is small enough.

 What would cause nimbus to be at 100% CPU with the topologies

 Sent from my iPhone

 On Mar 2, 2014, at 5:46 PM, Sean Allen

 Is there a reason you are using trident?

 If you don't need to handle the events as a batch, you are probably
 going to get performance w/o it.

 On Sun, Mar 2, 2014 at 2:23 PM, Sean Solbak wrote:

 Im writing a fairly basic trident topology as follows:

 - 4 spouts of events
 - merges into one stream
 - serializes the object as an event in a string
 - saves to db

 I split the serialization task away from the spout as it was cpu
 intensive to speed it up.

 The problem I have is that after 10 minutes there is over 910k
 tuples emitted/transfered but only 193k records are saved.

 The overall load of the topology seems fine.

 - 536.404 ms complete latency at the topolgy level
 - The highest capacity of any bolt is 0.3 which is the
 serialization one.
 - each bolt task has sub 20 ms execute latency and sub 40 ms
 process latency.

 So it seems trident has all the records internally, but I need
 these events as close to realtime as possible.

 Does anyone have any guidance as to how to increase the throughput?
  Is it simply a matter of tweeking max spout pending and the batch 

 Im running it on 2 m1

Re: JDBC Connections

2014-02-20 Thread Michael Rose
We generally instantiate a pool per JVM, where the maxActive is (# of bolts
using JDBC/num workers+1) (e.g. 64 bolts, 4 workers, 17 conns/JVM).

Pooling is our preferred strategy as the pool will shrink (and thus use
less memory on the corresponding SQL server) if it's not being utilized. In
either case, t's pushing the management, verification, and reestablishment
of broken connections into the pool (which is also why we have 1 extra conn
-- for when a conn is tied up running a validation query or is being

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Thu, Feb 20, 2014 at 8:28 AM, Richards Peter hbkricha...@gmail.comwrote:

 Hi Pablo,

 Your approach is fine. Alternative approach proposed by Brian can be used
 if you can group the relevant bolt instance/es, which are communicating to
 the database, into the appropriate worker process/es. However with the
 alternative approach you shouldn't end up creating separate pools in all
 the workers used by the topology.

 Hope this helps.

 Richards Peter.

 On Thu, Feb 20, 2014 at 8:33 PM, Pablo Acuña wrote:

 I keep one connection per bolt and for now it works just fine with many
 bolts. I would also be interested in hearing from someone else and share

 For now, I open the connection in the method prepare (and close it in
 cleanup), but to be completely honest, I'm not 100% sure if this is the
 best approach.


 On Wed, Feb 19, 2014 at 4:41 PM, Brian O'Neill b...@alumni.brown.eduwrote:

 Yes.  We use JDBI to access MySQL.

 We've had success with a shared connection pool. (one per JVM)
 The bolts in the JVM share the pool.

 But either approach should work (pool per bolt, or connection per bolt).
 It just depends at what level you want to manage your connections.

 We do it at the worker level.  (n connections per worker)



 Brian O'Neill

 Chief Technology Officer

 *Health Market Science*

 *The Science of Better Results*

 2700 Horizon Drive * King of Prussia, PA * 19406

 M: 215.588.6024 * @boneill42  *

 This information transmitted in this email message is for the intended
 recipient only and may contain confidential and/or privileged material. If
 you received this email in error and are not the intended recipient, or the
 person responsible to deliver it to the intended recipient, please contact
 the sender at the email above and delete this email and any attachments and
 destroy any copies thereof. Any review, retransmission, dissemination,
 copying or other use of, or taking any action in reliance upon, this
 information by persons or entities other than the intended recipient is
 strictly prohibited.

 From: Klausen Schaefersinho
 Date: Wednesday, February 19, 2014 at 5:58 AM
 Subject: JDBC Connections


 one of my bolt will need to write to a MySql data base. Does anybody has
 some experience with this? What are the best practices? Use an connection
 pool? Or keep one connection open per bolt?



Re: Bolts with long instantiation times -- A.K.A. Zookeeper shenanigans

2014-02-18 Thread Michael Rose
You may need to configure your cluster to give it more time to start up.
Additionally, knowing how long it can take to load the Stanford NLP models,
make sure you're only doing it in a single bolt instance (e.g. static
initializer or double-check synch) and sharing it between all your bolt

supervisor.worker.start.timeout.secs 120
supervisor.worker.timeout.secs 60

I'd try tuning your worker start timeout here. Try setting it up to 300s
and (again) ensuring your prepare method only initializes expensive
resources once, then shares them between instances in the JVM.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Tue, Feb 18, 2014 at 1:45 PM, Eddie Santos wrote:

 Hi all,

 How do you get bolts that take a ludicrously long time to load (we're
 talking minutes here) to cooperate with Zookeeper?

 I may not be understanding my problem properly, but on my test cluster
 (**not** in local mode!) my bolt keeps getting restarted in the middle of
 its prepare() method -- which may take up to two minutes to return.

 The problem seems to be the  Client session timed out, but I'm not
 knowledgable enough with Zookeeper to really know how to fix this.

 Here's a portion of logs from the supervisor affected. The STDIO messages
 come from a poorly-coded third party library that I have to use.

 2014-01-17 23:19:28 o.a.z.ClientCnxn [INFO] Client session timed out,
 have not heard from server in 2747ms for sessionid 0x143a22eb4060078,
 closing socket connection and attempting reconnect
 2014-01-17 23:19:28 b.s.d.worker [DEBUG] Doing heartbeat
 #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139768,
 :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]},
 :port 6702}
 2014-01-17 23:19:28 b.s.d.worker [DEBUG] Doing heartbeat
 #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139768,
 :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]},
 :port 6702}
 2014-01-17 23:19:28 c.n.c.f.s.ConnectionStateManager [INFO] State
 change: SUSPENDED
 2014-01-17 23:19:28 c.n.c.f.s.ConnectionStateManager [WARN] There are
 no ConnectionStateListeners registered.
 2014-01-17 23:19:28 b.s.cluster [WARN] Received event
 :disconnected::none: with disconnected Zookeeper.
 2014-01-17 23:19:28 b.s.cluster [WARN] Received event
 :disconnected::none: with disconnected Zookeeper.
 2014-01-17 23:19:28 STDIO [ERROR] done [7.2 sec].
 2014-01-17 23:19:28 STDIO [ERROR] Adding annotator lemma
 2014-01-17 23:19:28 STDIO [ERROR] Adding annotator ner
 2014-01-17 23:19:28 STDIO [ERROR] Loading classifier from
 2014-01-17 23:19:28 STDIO [ERROR] ...
 2014-01-17 23:19:29 b.s.d.worker [DEBUG] Doing heartbeat
 #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139769,
 :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]},
 :port 6702}
 2014-01-17 23:19:29 b.s.d.worker [DEBUG] Doing heartbeat
 #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139769,
 :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]},
 :port 6702}
 2014-01-17 23:19:30 o.a.z.ClientCnxn [INFO] Opening socket connection
 to server zookeeper/

   ^-- This is where the bolt gets restarted in its initialization.


Re: http-client version conflict

2014-02-06 Thread Michael Rose
We've done this with SLF4j and Guava as well without issues.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Thu, Feb 6, 2014 at 3:03 PM, Mark Greene wrote:

 We had this problem as well. We modified our chef cookbook to just replace
 the older version with the newer one and storm didn't complain or have any
 other issues as a result.

 On Wed, Feb 5, 2014 at 10:31 AM, P. Taylor Goetz ptgo...@gmail.comwrote:

 Your best bet is probably  to use the shade plugin to relocate the
 http-client package so it doesn't conflict with the version storm uses.

 Storm does this with the libtrhift dependency in storm-core:

 (You can ignore the clojure transformer in that config, unless you have
 non-AOT clojure code that uses the http-client library).

 More information on using the shade plugin to do package relocations can
 be found here:

 - Taylor

 On Feb 4, 2014, at 4:27 PM, Vinay Pothnis

  I am using storm version
  My application depends on apache http-client version 4.3.2 - but storm
 depends on http-client version 4.1.1.
  What is the best way to override this dependency?

Re: ZeroMQ Exception causes Topology to get killed

2014-01-10 Thread Michael Rose
What version of ZeroMQ are you running?

You should be running 2.1.7 with nathan's provided fork of JZMQ.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Fri, Jan 10, 2014 at 9:09 PM, Gaurav Sehgal wrote:

  I am getting the following exception in the cluster. These exceptions
 happen in the worker log; but they eventually cause to topology to die. Can
 anyone please share some inputs.

 java.lang.UnsatisfiedLinkError: org.zeromq.ZMQ$Socket.destroy()V
 at org.zeromq.ZMQ$Socket.destroy(Native Method)
 at org.zeromq.ZMQ$Socket.close(
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at java.lang.reflect.Method.invoke(
 at clojure.lang.Reflector.invokeMatchingMethod(
 at clojure.lang.Reflector.invokeNoArgInstanceMember(
 at backtype.storm.messaging.zmq.ZMQConnection.close(zmq.clj:45)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at java.lang.reflect.Method.invoke(
 at clojure.lang.Reflector.invokeMatchingMethod(
 at clojure.lang.Reflector.invokeNoArgInstanceMember(
 at backtype.storm.timer$mk_timer$fn__1759$fn__1760.invoke(timer.clj:33)
 at backtype.storm.timer$mk_timer$fn__1759.invoke(timer.clj:26)


Re: Workers elasticity

2014-01-06 Thread Michael Rose
Each machine can support a configurable number of workers. If a machine
goes away, it'll attempt to reassign the orphaned workers to other machines
using the fair scheduler.

1) Should you not have enough slots (worker slots), your topology will be a
'broken' state. However if you allow 4 workers per machine and run your
topology with 8 workers, you can run 2 minimum machines and still support
the topology.

If you are to auto-scale your workers, you'll need a long cooldown time
between changes. A rebalance isn't instant and must allow the topology to
drain before reshuffling workers. A rebalance will wait
{TUPLE_TIMEOUT_TIME} before rebalancing. Additionally, when adding workers
you'll need to trigger a rebalance.

2) If workers  slots, the topology will attempt to function but ultimately
freeze as the send buffers to that worker fill. I'm not sure what you mean
'round-robin fashion to distribute load' -- the ShuffleGrouping will
partition work across tasks on an even basis.

3) Yes, in storm.yaml, supervisor.slots.ports. By default it'll run with 4
slots per machine. See

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Mon, Jan 6, 2014 at 11:08 AM, Spico Florin wrote:

  I'm newbie to storm and also to Amazon Cloud. I have the following

   1. I have topology that runs on 3 workers on EC2.
   2. Due to the increasing load, EC2 intantiates 2 new instances and I
 have to rebalance to 5 workers.
   3. After the resource demand, EC2 released 2 instances and  I'm
 forgetting to decrease the number of workers to 3.

 1.So, in this case what is the behavior of the application? Will sign an
 error that there are more workers allocated then existing machines? Or will
 continue to run as nothing has happened?

 2.More generally, what is the behavior of the application that declares
 more workers than the number of instances? Do we have a round robin fashion
 to distribute load among the workers?

 3.Can I declare more workers on the same machine? If yes how?

 I look forward for your answers.


Re: storm over WAN and NAT

2013-12-29 Thread Michael Rose
Generally speaking, I don't know of many services that work exceedingly
well over a WAN.

Can you not do processing at each location and forward it on with a queue
that isn't adverse to WAN links?
On Dec 29, 2013 10:03 AM, Derrick Karimi wrote:


 I have a requirement for real time data processing where data sources are
 spread over a wide geographic area on  multiple networks.  Are there any
 known previous deployments, or any forseen issues with deploying a storm
 cluster where machines are behind firewalls/using nat, and spread out such
 that latency issues and network reliability issues will be noticeable?


Re: Guaranteeing message processing on strom fails

2013-12-29 Thread Michael Rose
What spout are you using? Guarantees must be enforced by spout. For
instance, the RabbitMQ spout doesn't ack an AMQP message until ack() is
called for that message tag (or rejects if fail() is called).

The spout must pass along a unique message identifier to enable this

If a tuple goes missing somewhere along the line, fail() will be called
after a timeout. If you kill the acker that tuple was tracked with, it's
then up to the message queue or other impl to be able to replay that

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Sun, Dec 29, 2013 at 11:39 PM, Michal Singer mic...@leadspace.comwrote:

 But what I see is that some of the tuples are not replayed. I print out
 all the tuples and some don’t arrive when I kill a worker process.

 Thanks, Michal

 *From:* Michael Rose []
 *Sent:* Sunday, December 29, 2013 7:26 PM
 *Subject:* Re: Guaranteeing message processing on strom fails

 You are not guaranteed that tuples make it, only that if they go missing
 or processing fails it will replayed from the spout at least once

 On Dec 29, 2013 4:47 AM, Michal Singer wrote:

 Hi, I am trying to test the guaranteeing of messages on storm:

 I have two nodes:

 1.   Node-A contains: supervisor, ui, zookeeper, nimbus

 2.   Node-B contains: supervisor

 I have 1 spout and 2 bolts (I am actually running the word counter test)

 I send about 17 messages.

 The first bolt is on Node-B

 The second bolt is divided to 4 executors which are divided between the

 I kill the worker on node-B and I expect the worker on the other node to
 get the data which was supposed to be sent to Node-B.

 But the worker is raised on node-B and the data is sent there accept for
 one touple which is missing.

 1.   According to the ui – it is very difficult to see what is going
 on cause I guess there are messages resent and it is hard to know exactly
 what happened.

 2.   According to my output – one message is missing which was
 supposed to go to node-B which I killed it’s worker.

 I defined anchors and I sent acks.

 So what am I missing? Why is there data loss?

 Thanks, Michal

Re: Spring bolts

2013-12-25 Thread Michael Rose
Make a base spring bolt, in your prepare method inject the members. That's
the best I've come up with, as prepare happens server side whereas topology
config and static initializers happen at deploy time client side.
On Dec 25, 2013 7:51 AM, Michal Singer wrote:

 Hi, I am trying to understand how to use beans in spring as bolts/spouts.

 If I have the definition in spring which is initialized once the bolt or
 spout is initialized.

 But when creating a topology I need to do: new Bolt()….

 And cannot get it from spring.

 So what is the right way to do this?

 Thanks, Michal

RE: Spring bolts

2013-12-25 Thread Michael Rose
Yes, you'll need a Spring context in prepare. Given you have multiple bolts
per JVM, its worth ensuring only one creates it in prepare then shares that

We do this with Guice injectors and double checked locks.

Each bolt uses the singleton injector to inject its members. I imagine
Spring has a similar concept once you have a context.

Life cycle of bolts is quite strange in Storm given they're made before
deployment and serialized. There's quite a few gotchas. Bolt constructors
can't be trusted, thus prepare.

There may be a spring storm example out there somewhere.

Merry Christmas!
On Dec 25, 2013 8:17 AM, Michal Singer wrote:

 I am not sure I understand.

 Spring beans are defined in the spring configuration files. How can I
 inject them in the members.

 What I thought to do is that the bolts will not be spring beans and in the
 prepare method I will initialize the spring context.

 This way, the bolts will call other spring beans which are not bolts and
 initialized in spring. But of course this is a very limited solution.

 *From:* Michael Rose []
 *Sent:* Wednesday, December 25, 2013 5:06 PM
 *Subject:* Re: Spring bolts

 Make a base spring bolt, in your prepare method inject the members. That's
 the best I've come up with, as prepare happens server side whereas topology
 config and static initializers happen at deploy time client side.

 On Dec 25, 2013 7:51 AM, Michal Singer wrote:

 Hi, I am trying to understand how to use beans in spring as bolts/spouts.

 If I have the definition in spring which is initialized once the bolt or
 spout is initialized.

 But when creating a topology I need to do: new Bolt()….

 And cannot get it from spring.

 So what is the right way to do this?

 Thanks, Michal

Re: Emitted ≠ Acked + Failed

2013-12-23 Thread Michael Rose
The statistics are sampled, but in general should be +/- 20 tuples of where
they should be.

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Mon, Dec 23, 2013 at 9:53 PM, churly lin wrote:

 Is it possible that because the statistics storm-ui provided is not very

 2013/12/23 churly lin

 Hi all:
 I have a question about the running topology's statistics.
 I found that, for Spout, the number spout emitted is unequal to the
 number spout acked plus the number spout failed.Actually, the emitted is
 lower than acked plus failed.
 Is this normal? If so, why there is some emitted tuples that neither
 being acked nor being faild?

Re: Example of bolt emitting more than one stream

2013-12-20 Thread Michael Rose

This is the...gist...of it. :) Hope this helps!

Michael Rose (@Xorlev
Senior Platform Engineer, FullContact

On Fri, Dec 20, 2013 at 10:36 AM, Pete Carlson p...@tetraconcepts.comwrote:

 I read on the Storm concepts wiki i.e.,

 that a bolt can emit more than one stream.

 Can anyone point me to an example that declares multiple streams using the
 declareStream method of OutputFieldsDeclarer, and specifies which stream to
 emit with the emit method on OutputCollector?

