Re: Storm performing very slow
Storm is not your bottleneck. Check your Storm code to 1) ensure you're parallelizing your writes and 2) you're batching writes to your external resources if possible. Some quick napkin math shows you only doing 110 writes/s, which seems awfully low. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Sep 22, 2014 at 8:05 PM, Kushan Maskey kushan.mas...@mmillerassociates.com wrote: I am trying to load 20 M records into Cassandra database through Kafka-Storm. I am able to post all the data in 5 mins into Kafka. But reading it from storm and inserting into Cassandra, Couch and Solr is kind of very slow. It has been running for past 5 hours and so far only 2 Million records. How do I make the storm perform faster? Coz in this pace it will take couple of days to load all the data. -- Kushan Maskey
Re: Question on failing ack
Hi Kushan, Depending on the Kafka spout you're using, it could be doing different things when it failed. However, if it's running reliably, the Cassandra insertion failures would have forced a replay from the spout until they had completed. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Aug 25, 2014 at 4:42 PM, Kushan Maskey kushan.mas...@mmillerassociates.com wrote: I have set up topology to load a very large volume of data. Recently I just loaded about 60K records and found out that there are some failed acks on few spouts but non on the bolts. Storm completed running and seem to look stable. Initially i started with a lesser amount of data like about 500 records successfully and then increased up to 60K where i saw the failed acks. Questions: 1. Does that mean that the spout was not able to read some messages from Kafka? Since there are no failed ack on the bolts as per UI, what ever the message received has been successfully processed by the bolts. 2. how do i interpret the numbers of failed acks like this acked:315500 and failed: 2980. Does this mean that 2980 records failed to be processed? Is this is the case then, how do I avoid this from happening because I will be loosing 2980 records. 3. I also see that few of the records failed to be inserted into Cassandra database. What is the best way to reprocess the data again as it is quite difficult to do it through the batch process that I am currently running. LMK, thanks. -- Kushan Maskey 817.403.7500
Re: Implementing a barrier mechanism in storm
In the prepare method you receive a copy of the topology context which can tell you all of the stream-components you're subscribed to. You could make it a static field or just fields group on messageId On Aug 1, 2014 12:26 AM, Spico Florin spicoflo...@gmail.com wrote: Hello! Thanks for your reply. I was thinking to use such the cache mechanism (the guava cache as a static field in the final bolt). The question that I have now, is how do you keep track of the bolts upfront? Suppose that you have 3 bolts then you are counting the number of bolts that were processing one message MapmsgId,count_of_bolts? Can you send the name of the bolts that have been processed the message to the final bolt, and in the final bolt to check if the the list of all processing bolts is the same? What is the best approach here? Thanks . Regards, Florin On Fri, Aug 1, 2014 at 7:51 AM, Michael Rose mich...@fullcontact.com wrote: It's another case of a streaming join. I've done this before, there aren't too many gotchas, other than you need a datastructure which purges stale unresolved joins beyond the tuple timeout time (I used a Guava cache for this). Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Thu, Jul 31, 2014 at 10:44 PM, Varun Vijayaraghavan varun@gmail.com wrote: That's interesting. Note that I have not used such a pattern before - and have done something similar. I have not used trident - so this probably will not answer your last question completely :) If you set up the topology such that links between bolt {1, 2, 3} and final bolt is stream grouped by msgId - you could keep the partially processed results in memory (or in a persisted state somewhere) - till you see the processed result for all the bolts. I would also expire msgIds which have not seen further results for beyond a certain threshold time. What do you think? On Fri, Aug 1, 2014 at 12:35 AM, Spico Florin spicoflo...@gmail.com wrote: Hello! I have a case study where the same message (identified by an id) is spread over a couple of processing bolts and a final bolt should act as a barrier. This final bolt should do its job on the message ID only when all the upfront bolts have finished their process on the message. As sketch is bellow Spout - (msgid1, payload)-process bolt 1 (msgId1, payloadb1)| -process bolt 2(msgId1, payloadb2) |-final bolt(msg1,finalp) -process bolt 3(msgId1, payloadb3) | So the final bolt should not start a work on a message id till the message was not processed by all the 3 processing bolts. My question are: 1. Can be these case viable for storm? 2. If the answer for the first question is yes, how can I achieve this request? 3. Is possible to achieve this request without using trident? I look forward for your answers. Thanks, Florin -- - varun :)
Re: Acking is delayed by 5 seconds (in disruptor queue ?)
Run your producer code in another thread to fill a LBQ, poll that with nextTuple instead. You should never be blocking yourself inside a spout. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Fri, Jul 18, 2014 at 1:03 PM, Itai Frenkel i...@forter.com wrote: Hello again, Attached is a simplified reproduction (without the ShellSpout, but the concepts are the same). It seems that ack() and nextTuple() are always called on the same thread. That means that there is an inherent tradeoff. Either nextTuple sleeps a few ms (and then the ShellSpout would serialize alot of nextTuple messages) or nextTuple can sleep but then the ack is delayed. Is there a way around this limitation? Itai -- *From:* Itai Frenkel i...@forter.com *Sent:* Thursday, July 17, 2014 9:42 PM *To:* user@storm.incubator.apache.org *Subject:* Acking is delayed by 5 seconds (in disruptor queue ?) Hello, I have noticed that an ack takes 5 seconds to pass from the bolt to the spout (see debug log below). It is a simple topology with 1 spout, 1 bolt and 1 acker all running on the same worker. The spout and the bolt are ShellSpout and ShellBolt respectively. It looks like the message is delayed in the LMAX disruptor queue. How can I reduce this delay to ~1ms ? Regards, Itai 2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to tuple 2759481868963667531 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack [-357211617823660063 -3928495599512172728] 2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Bolt sent ack to tuple 2759481868963667531 2014-07-17 18:30:30 b.s.d.executor [INFO] Processing received message source: bolt:2, stream: __ack_ack, id: {}, [-357211617823660063 -3928495599512172728] 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting direct: 3; __acker __ack_ack [-357211617823660063] 2014-07-17 18:30:35 b.s.d.executor [INFO] Processing received message source: __acker:1, stream: __ack_ack, id: {}, [-357211617823660063] 2014-07-17 18:30:35 b.s.d.executor [INFO] Acking message 1138
Re: Acking is delayed by 5 seconds (in disruptor queue ?)
1) Lets say we have it set at a limit of 100 items. The LBQ currently has 97 items in it. The SQS client runs again, pulls 10 messages, and successfully inputs 3. The other 7 are blocked waiting for the queue to clear out. No new HTTP requests are made to SQS while this LBQ is full (essentially this is just called in a loop). nextTuple() eventually comes around, 6 blocked for insertion. etc. etc. until all 6 blocked messages are inserted into the LBQ. At this point, we call out to the SQS client again to fetch another 10 (and hopefully the LBQ has not completely drained by the time the SQS client returns another 1-10 messages). This model isn't picky about which SQS client is grabbing messages. SQS doesn't guarantee order (or single delivery) anyways. In one of our topologies, we have 8 spout instances consuming the same SQS queue for throughput purposes. Maybe I've misunderstood your question though. We normally don't have multiple topologies consuming the same queue, but depending on the data there's no reason we couldn't. Also in this model, we don't use a blocking poll method, if the LBQ is empty, we skip emission and let Storm handle backoff if it wants to (see below). 2) By backoff I mean, your spout hasn't emitted in a while, it's going to slow down on calling nextTuple() to not busy-wait your entire CPU. If you're at maxSpoutPending limit, nextTuple is also not called until an ack or fail has been received (which is one of the reasons its so important to not block in the spout as much as possible). Storm will use the https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java when you don't emit. By default, it waits only 1ms before calling again (not exponential, just fixed), which is enough to prevent 10-100k/s polling behavior but not significantly increase latency -- if you're ever not emitting, you can probably afford to sleep for 1ms. We actually have it set to 10ms, given that 99.9% of the time we'll have a message ready for processing. An alternative to this is the https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/NothingEmptyEmitStrategy.java if you do see an impact on your throughput--but I've never needed this. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Fri, Jul 18, 2014 at 2:41 PM, Itai Frenkel i...@forter.com wrote: Thanks for the thorough and quick answer. Two follow up questions. The context is performing low latency stream manipulation (hopefully process a message the moment it arrives, maybe a few milliseconds later). 1. What happens if the LBQ contains 10 items, while the Storm topology does not call nextTuple because of backoffs ? Wouldn't it be better of for another Amazon SQS client to handle these items? Or are you assuming a single Storm topology is the sole handler of these items ? 2. If by backoff, you mean storm topology cannot handle any more messages, or maxSpoutPending is reached, then ignore this question. If by backoff you mean exponential backoff then I am worried about a message arriving to the queue and nextTuple is not called for a long time (more than a few milliseconds). Regards, Itai -- *From:* Michael Rose mich...@fullcontact.com *Sent:* Friday, July 18, 2014 11:27 PM *To:* user@storm.incubator.apache.org *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?) I have no experience with multilang spouts, however my impression from the docs is that you should be handling your own multiplexing if you're writing a shellspout. Otherwise if you block for 5 seconds emitting a tuple, you cannot process an ack until that's done. I'd experiment with that, if you change the sleep.spout.wait time to be 500ms and you don't block in your spout (instead returning sync) it should back off just as it does with a normal spout (see https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java, sync is a no-op). The post you linked to was mine, and for a long time that was true (especially 0.6 and 0.7). Since Storm 0.8, the spout wait strategy will do automatic backoffs when no tuples are emitted. The only time I've intentionally blocked in a spout after 0.8.0 is to control throughout (e.g. only allow 10/s during development). I've never built a multilang spout before. Spouts, like bolts, run in a single-threaded context so blocking at all prevents acks/fails/emits from being done until the thread is unblocked. That is why it's best to have another thread dealing with IO and asynchronously feeding a concurrent data structure the spout can utilize. For example, in our internal Amazon SQS client our IO thread continuously fetches up to 10 messages per get and shoves them into a LinkedBlockingQueue (until full
Re: Acking is delayed by 5 seconds (in disruptor queue ?)
Worth clarifying for anyone else in this thread that a LBQ separating production from consumption is not a default thing in Storm, it's something we cooked up to prefetch elements from slow/batching resources. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Fri, Jul 18, 2014 at 3:16 PM, Itai Frenkel i...@forter.com wrote: Got it! thanks. -- *From:* Michael Rose mich...@fullcontact.com *Sent:* Saturday, July 19, 2014 12:10 AM *To:* user@storm.incubator.apache.org *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?) 1) Lets say we have it set at a limit of 100 items. The LBQ currently has 97 items in it. The SQS client runs again, pulls 10 messages, and successfully inputs 3. The other 7 are blocked waiting for the queue to clear out. No new HTTP requests are made to SQS while this LBQ is full (essentially this is just called in a loop). nextTuple() eventually comes around, 6 blocked for insertion. etc. etc. until all 6 blocked messages are inserted into the LBQ. At this point, we call out to the SQS client again to fetch another 10 (and hopefully the LBQ has not completely drained by the time the SQS client returns another 1-10 messages). This model isn't picky about which SQS client is grabbing messages. SQS doesn't guarantee order (or single delivery) anyways. In one of our topologies, we have 8 spout instances consuming the same SQS queue for throughput purposes. Maybe I've misunderstood your question though. We normally don't have multiple topologies consuming the same queue, but depending on the data there's no reason we couldn't. Also in this model, we don't use a blocking poll method, if the LBQ is empty, we skip emission and let Storm handle backoff if it wants to (see below). 2) By backoff I mean, your spout hasn't emitted in a while, it's going to slow down on calling nextTuple() to not busy-wait your entire CPU. If you're at maxSpoutPending limit, nextTuple is also not called until an ack or fail has been received (which is one of the reasons its so important to not block in the spout as much as possible). Storm will use the https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java when you don't emit. By default, it waits only 1ms before calling again (not exponential, just fixed), which is enough to prevent 10-100k/s polling behavior but not significantly increase latency -- if you're ever not emitting, you can probably afford to sleep for 1ms. We actually have it set to 10ms, given that 99.9% of the time we'll have a message ready for processing. An alternative to this is the https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/NothingEmptyEmitStrategy.java if you do see an impact on your throughput--but I've never needed this. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Fri, Jul 18, 2014 at 2:41 PM, Itai Frenkel i...@forter.com wrote: Thanks for the thorough and quick answer. Two follow up questions. The context is performing low latency stream manipulation (hopefully process a message the moment it arrives, maybe a few milliseconds later). 1. What happens if the LBQ contains 10 items, while the Storm topology does not call nextTuple because of backoffs ? Wouldn't it be better of for another Amazon SQS client to handle these items? Or are you assuming a single Storm topology is the sole handler of these items ? 2. If by backoff, you mean storm topology cannot handle any more messages, or maxSpoutPending is reached, then ignore this question. If by backoff you mean exponential backoff then I am worried about a message arriving to the queue and nextTuple is not called for a long time (more than a few milliseconds). Regards, Itai -- *From:* Michael Rose mich...@fullcontact.com *Sent:* Friday, July 18, 2014 11:27 PM *To:* user@storm.incubator.apache.org *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?) I have no experience with multilang spouts, however my impression from the docs is that you should be handling your own multiplexing if you're writing a shellspout. Otherwise if you block for 5 seconds emitting a tuple, you cannot process an ack until that's done. I'd experiment with that, if you change the sleep.spout.wait time to be 500ms and you don't block in your spout (instead returning sync) it should back off just as it does with a normal spout (see https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java, sync is a no-op). The post you linked to was mine, and for a long time that was true (especially 0.6 and 0.7). Since Storm 0.8
Re: Distribute Spout output among all bolts
Maybe we can help with your topology design if you let us know what you're doing that requires you to shuffle half of the whole stream output to each of the two different types of bolts. If bolt b1 and bolt b2 are both instances of ExampleBolt (and not two different types) as above, there's no point to doing this. Setting the parallelism will make sure that data is partitioned across machines (by default, setting parallelism sets tasks = executors = parallelism). Unfortunately, I don't know of any way to do this other than shuffling the output to a new bolt, e.g. bolt b0 a 'RouterBolt', then having bolt b0 round-robin the received tuples between two streams, then have b1 and b2 shuffle over those streams instead. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Wed, Jul 16, 2014 at 5:40 PM, Andrew Xor andreas.gramme...@gmail.com wrote: Hi Tomas, As I said in my previous mail the grouping is for a bolt *task* not for the actual number of spawned bolts; for example let's say you have two bolts that have a parallelism hint of 3 and these two bolts are wired to the same spout. If you set the bolts as such: tb.setBolt(b1, new ExampleBolt(), 2 /* p-hint */).shuffleGrouping(spout1); tb.setBolt(b2, new ExampleBolt(), 2 /* p-hint */).shuffleGrouping(spout1); Then each of the tasks will receive half of the spout tuples but each actual spawned bolt will receive all of the tuples emitted from the spout. This is more evident if you set up a counter in the bolt counting how many tuples if has received and testing this with no parallelism hint as such: tb.setBolt(b1, new ExampleBolt(),).shuffleGrouping(spout1); tb.setBolt(b2, new ExampleBolt()).shuffleGrouping(spout1); Now you will see that both bolts will receive all tuples emitted by spout1. Hope this helps. Andrew. On Thu, Jul 17, 2014 at 2:33 AM, Tomas Mazukna tomas.mazu...@gmail.com wrote: Andrew, when you connect your bolt to your spout you specify the grouping. If you use shuffle grouping then any free bolt gets the tuple - in my experience even in lightly loaded topologies the distribution amongst bolts is pretty even. If you use all grouping then all bolts receive a copy of the tuple. Use shuffle grouping and each of your bolts will get about 1/3 of the workload. Tomas On Wed, Jul 16, 2014 at 7:05 PM, Andrew Xor andreas.gramme...@gmail.com wrote: H i, I am trying to distribute the spout output to it's subscribed bolts evenly; let's say that I have a spout that emits tuples and three bolts that are subscribed to it. I want each of the three bolts to receive 1/3 rth of the output (or emit a tuple to each one of these bolts in turns). Unfortunately as far as I understand all bolts will receive all of the emitted tuples of that particular spout regardless of the grouping defined (as grouping from my understanding is for bolt *tasks* not actual bolts). I've searched a bit and I can't seem to find a way to accomplish that... is there a way to do that or I am searching in vain? Thanks. -- Tomas Mazukna 678-557-3834
Re: Distribute Spout output among all bolts
It doesn't say so, but if you have 4 workers, the 4 executors will be shared evenly over the 4 workers. Likewise, 16 will partition 4 each. The only case where a worker will not get a specific executor is when there are less executors than workers (e.g. 8 workers, 4 executors), 4 of the workers will receive an executor but the others will not. It sounds like for your case, shuffle+parallelism is more than sufficient. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Wed, Jul 16, 2014 at 5:53 PM, Andrew Xor andreas.gramme...@gmail.com wrote: Hey Stephen, Michael, Yea I feared as much... as searching the docs and API did not surface any reliable and elegant way of doing that unless you had a RouterBolt. If setting the parallelism of a component is enough for load balancing the processes across different machines that are part of the Storm cluster then this would suffice in my use case. Although here https://storm.incubator.apache.org/documentation/Understanding-the-parallelism-of-a-Storm-topology.html the documentation says executors are threads and it does not explicitly say anywhere that threads are spawned across different nodes of the cluster... I want to avoid the possibility of these threads only spawning locally and not in a distributed fashion among the cluster nodes.. Andrew. On Thu, Jul 17, 2014 at 2:46 AM, Michael Rose mich...@fullcontact.com wrote: Maybe we can help with your topology design if you let us know what you're doing that requires you to shuffle half of the whole stream output to each of the two different types of bolts. If bolt b1 and bolt b2 are both instances of ExampleBolt (and not two different types) as above, there's no point to doing this. Setting the parallelism will make sure that data is partitioned across machines (by default, setting parallelism sets tasks = executors = parallelism). Unfortunately, I don't know of any way to do this other than shuffling the output to a new bolt, e.g. bolt b0 a 'RouterBolt', then having bolt b0 round-robin the received tuples between two streams, then have b1 and b2 shuffle over those streams instead. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Wed, Jul 16, 2014 at 5:40 PM, Andrew Xor andreas.gramme...@gmail.com wrote: Hi Tomas, As I said in my previous mail the grouping is for a bolt *task* not for the actual number of spawned bolts; for example let's say you have two bolts that have a parallelism hint of 3 and these two bolts are wired to the same spout. If you set the bolts as such: tb.setBolt(b1, new ExampleBolt(), 2 /* p-hint */).shuffleGrouping(spout1); tb.setBolt(b2, new ExampleBolt(), 2 /* p-hint */).shuffleGrouping(spout1); Then each of the tasks will receive half of the spout tuples but each actual spawned bolt will receive all of the tuples emitted from the spout. This is more evident if you set up a counter in the bolt counting how many tuples if has received and testing this with no parallelism hint as such: tb.setBolt(b1, new ExampleBolt(),).shuffleGrouping(spout1); tb.setBolt(b2, new ExampleBolt()).shuffleGrouping(spout1); Now you will see that both bolts will receive all tuples emitted by spout1. Hope this helps. Andrew. On Thu, Jul 17, 2014 at 2:33 AM, Tomas Mazukna tomas.mazu...@gmail.com wrote: Andrew, when you connect your bolt to your spout you specify the grouping. If you use shuffle grouping then any free bolt gets the tuple - in my experience even in lightly loaded topologies the distribution amongst bolts is pretty even. If you use all grouping then all bolts receive a copy of the tuple. Use shuffle grouping and each of your bolts will get about 1/3 of the workload. Tomas On Wed, Jul 16, 2014 at 7:05 PM, Andrew Xor andreas.gramme...@gmail.com wrote: H i, I am trying to distribute the spout output to it's subscribed bolts evenly; let's say that I have a spout that emits tuples and three bolts that are subscribed to it. I want each of the three bolts to receive 1/3 rth of the output (or emit a tuple to each one of these bolts in turns). Unfortunately as far as I understand all bolts will receive all of the emitted tuples of that particular spout regardless of the grouping defined (as grouping from my understanding is for bolt *tasks* not actual bolts). I've searched a bit and I can't seem to find a way to accomplish that... is there a way to do that or I am searching in vain? Thanks. -- Tomas Mazukna 678-557-3834
Re: Does Storm support JDK7 ??
You need only run the existing releases on JDK 7 or 8. On Jul 14, 2014 7:15 AM, Haralds Ulmanis hara...@evilezh.net wrote: Actually now I've customized a bit storm and recompiled as I needed some changes in it. But initially I just downloaded and run. On 14 July 2014 14:02, Adrianos Dadis add...@gmail.com wrote: Hi Haralds, Have you build it with JDK8 and run with JDK8, or you just downloaded Storm (which is build with JDK6) and run it with JDK8? On Mon, Jul 14, 2014 at 3:24 PM, Haralds Ulmanis hara...@evilezh.net wrote: Do not know about jdk7, I'm running on jdk8 and seems fine. On 14 July 2014 13:11, Veder Lope add...@gmail.com wrote: Storm is a great project!!! We are trying to build a project, where Storm holds a crucial role in our architecture. As I see in pom.xml (in maven-compiler-plugin), source and target are set to Java 1.6. 1) Is Storm compatible with JDK7? 2) I know we can download Storm (build for JDK6) and run it using JDK7, but there are a few incompatibilities* between JDK7 and JDK6. Will these incompatibilities affect Storm or not? 3) Do you plan to move to JDK7? 4) What is the restriction that holds are back to JDK6? (we are now stuck to JDK6 compile and runtime because of Storm) 5) Can we just build Storm with JDK7 (alter both source and target in pom.xml) and then use JDK7 for runtime or not? Have you seen any errors with this road? *incompatibilities: Check this: http://www.oracle.com/technetwork/java/javase/compatibility-417013.html#incompatibilities Regards, Adrianos Dadis.
Re: Choosing where your tasks run in Storm
You can make it happen with a custom scheduler, see this article (sorry for mangling, getting this link through SpamAssassin on the group was a nightmare): http xumingming dot sinaapp dotcom slash 885/twitter-storm-how-to-develop-a-pluggable-scheduler/ But it's nothing I've seriously attempted before, the existing schedulers are in Clojure. It's not impossible to do for sure, but like Andrew said it might well just be easier to have separate clusters that share ZK clusters. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Fri, Jul 4, 2014 at 10:28 AM, Andrew Montalenti and...@parsely.com wrote: I don't think this is possible right now, though I have thought about the same thing before. It *might* be true that Storm's support for YARN could eventually lead to this kind of thing, but I don't know much about it. For now, you're best off having separate Storm clusters for different classes of machines. You could consider putting Kafka queues between them to ensure cross-topology message reliability guarantees. (e.g. have your I/O bound topology read from kafka and write to kafka, and have your CPU-bound topology read from the Kafka topic produced by the first queue). --- Andrew Montalenti Co-Founder CTO http://parse.ly On Fri, Jul 4, 2014 at 7:59 AM, jeff saremi jeffsar...@hotmail.com wrote: I'm wondering if this concept applies to Storm and if there's a way to do this. I'd like to limit the machines that certain spouts or bolts run on. There are many reasons for this. But for one let's assume that I have a bolt that is just a proxy for some legacy service. I want to monitor that service by way of the bolt and use it in my topology. Another way of looking at it is that I want to have a topology that spans different classes of machines. Let's say I have 3 classes of machines: small, medium, and large. Some topologies are limited to only one class of machines however some other topologies need to span two or more classes of machines. How can I do this in storm? Thanks Jeff
Re: Storm trident, multiple workers nothing happens
In a single worker, you don't incur serialization or network overhead. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Tue, Jun 17, 2014 at 11:09 PM, Romain Leroux leroux@gmail.com wrote: Still netty performances were clearly better for me (when using only 1 worker since multiple workers didn't work with netty) 2014-06-18 1:33 GMT+09:00 Danijel Schiavuzzi dani...@schiavuzzi.com: I had a similar problem, with Netty my Trident transactional topology was getting stuck after several occurences of Kafka spout restarting due to Kafka SocketTimeouts (I can reproduce this bug by blocking access to Kafka from the Supervisor machines with iptables, only a few tries are needed to reproduce it). Reverted to ZMQ and now it works flawlessly. I'll prepare a reproducible test case and fill a JIRA bug report ASAP. On Jun 17, 2014 3:51 PM, Romain Leroux leroux@gmail.com wrote: As I read in different topics, here also simply switching back to ZeroMQ solved the issue ... 2014-06-13 21:58 GMT+09:00 Romain Leroux leroux@gmail.com: After tuning a trident topology (kafka-storm-cassandra) to run on 1 worker (so on 1 server), it works really well. I tried to deploy it using 2 workers on 1 server or 2 workers on 2 servers. The result is the same, nothing happens, no tuples are emitted and no messages in the logs. A quick profiling showed me that : 77% of CPU time is main-SendThread(a.zookeeper.hostname:2181) org.apache.zookeeper.ClientCnx$sendThreadrun() sun.nio.ch.SelectorImpl.select() The rest mainly come from 2 threads New I/O org.jboss.netty.channel.socket.nio.SelectorUtil.select() sun.nio.ch.SelectorImpl.select() Therefore I am wondering if the problem can come from one of the followings : - Zookeeper cluster version is 3.4.6, which is different from the 3.3.x used by Storm 0.9.1-incubating ? But that is strange because there are absolutely no problem when using the same settings but with only 1 worker - Communication layer is netty, which can be not working well with my hardware ? (is this possible?) In case of 1 worker only netty seems not to be too much involved (no inter worker communication) Maybe changing to ZeroMQ ? Has someone faced similar issue ? Any pointer ? Or anything in particular to monitor / profile ?
RE: Extracting Performance Metrics
What kind of issues does Metrics have that leads you to recommend HdrHistogram? On Jun 16, 2014 6:57 PM, Dan dcies...@hotmail.com wrote: Be careful when using Coda Hale's Metrics package when measuring latency. Consider using Gil Tene's High Dynamic Range Histogram instead: http://hdrhistogram.github.io/HdrHistogram/ -Dan -- From: and...@parsely.com Date: Mon, 16 Jun 2014 18:20:11 -0400 Subject: Re: Extracting Performance Metrics To: user@storm.incubator.apache.org Also, I came across this presentation by Visible Measures which actually walks through a lot of great options covering most of what you want to know about: http://files.meetup.com/5809742/storm%20monitoring.pdf One other thing to be aware of is that in Storm 0.9.2 (forthcoming release), there is a new REST API used by the Storm UI for gathering some of these metrics: https://github.com/apache/incubator-storm/pull/101 https://issues.apache.org/jira/browse/STORM-205 On Mon, Jun 16, 2014 at 6:13 PM, Andrew Montalenti and...@parsely.com wrote: I haven't used it yet, but a lot of people get pointed to metrics_storm: https://github.com/ooyala/metrics_storm With this blog post that discusses it: http://engineering.ooyala.com/blog/open-sourcing-metrics-storm Michael Noll also has a nice blog post about streaming Storm 0.9 metrics to Graphite: http://www.michael-noll.com/blog/2013/11/06/sending-metrics-from-storm-to-graphite/ Currently, when we use Storm, we do a lot of custom metrics in Graphite using statsd, as described in this post (not about Storm, but about Graphite/statsd): http://codeascraft.com/2011/02/15/measure-anything-measure-everything/ On Mon, Jun 16, 2014 at 4:37 PM, Anis Nasir aadi.a...@gmail.com wrote: Dear all, I am running a cluster with 1 kafka + 1 nimbus + 10 supervisor + 1 zookeeper nodes. I am executing multiple topologies on the cluster and I want to extract different metrics that I am mentioning below. Can someone help me by recommending tools that I can use to extract this information. Per Topology - Throughput - Latency Per Spout or Bolt - Throughput - Latency - Execution Time - Queuing Time - Number of Messages Processed Regards Anis
Re: Non-DRPC topologies and number of assigned workers
A topology can run in as many workers as you assign at launch time, DRPC or not. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Wed, Jun 11, 2014 at 1:08 PM, Nima Movafaghrad nima.movafagh...@oracle.com wrote: Hi Community, Wondering if a non-drpc topology can run in multiple workers or is it just gonna run within one? Thanks, Nima
Re: resource aware task scheduling in Storm cluster
Out of curiosity, what kind of changes have you been making? Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Tue, Jun 10, 2014 at 9:36 PM, Jason Jackson jasonj...@gmail.com wrote: Hi Alex, We're using a slightly modified version of https://github.com/nathanmarz/ storm-mesos in production. Currently there's no one supporting storm-mesos, nor have we been open sourcing our changes in this project at this point in time. // Jason On Mon, Mar 3, 2014 at 3:00 PM, Alexander S. Klimov alexk...@microsoft.com wrote: Hi guys, I found this solution for resource aware task scheduling in Storm: https://github.com/nathanmarz/storm-mesos What is the status of this project? Is this still supported? Would it be recommended to be used in production? Thanks, Alex
Re: Order of Bolt definition, catching that subscribes from non-existent component [ ...]
You can have a loop on a different stream. It's not always the best thing to do (deadlock possibilities from buffers) but we have a production topology that has that kind of pattern. In our case, one bolt acts as a coordinator for recursive search. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Fri, Jun 6, 2014 at 2:28 PM, Abhishek Bhattacharjee abhishek.bhattacharje...@gmail.com wrote: I am sorry for the late reply. Yes , you can't have a loop. You can have a chain though( which doesn't close upon itself ! ). Thanks :-) On Wed, May 7, 2014 at 12:50 PM, shahab shahab.mok...@gmail.com wrote: Thanks Abhishek. But this also implies that we can not have a loop ( of message processing stages) using Storm, right? best, /Shahab On Mon, May 5, 2014 at 9:45 PM, Abhishek Bhattacharjee abhishek.bhattacharje...@gmail.com wrote: I don't think what you are trying to do is achievable. Data in storm always move forward so you can't give it back to a bolt from which it originated. That is a bolt can subscribe from bolts which were created before it's creation. So, I think you can create another object of the A bolt say D and then assign the o/p of C to D. On Mon, May 5, 2014 at 8:11 PM, shahab shahab.mok...@gmail.com wrote: Hi, I am trying to define a topology as following: S : is a spout A,B,C : are bolts -- : means emitting message S --A A --B B --C C --A I am declaring the Spouts and Bolts in the above order in my java code , first S, then A , B and finally C. I am using globalGrouping(BoltName, StreamID) for collecting messages to be collected by each bolt, The problem is that I receive an error, while defining bolt A saying that subscribes from non-existent component [C] . I guess the error is happening because component C is not defined yet! but what could be the solution to this? best, /Shahab -- *Abhishek Bhattacharjee* *Pune Institute of Computer Technology* -- *Abhishek Bhattacharjee* *Pune Institute of Computer Technology*
Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?
You don't have to include a specific bolt for init code. It's not difficult to push your init code into a separate class and call it from your bolts, lock on that class, run init, and then allow other instances to skip over it. Without changing bolt/spout code, I've taken to including a task hook for init code (e.g. properties / Guice). Check out BaseTaskHook, it's easily extendible and can be included pretty easily too: stormConfig.put(Config.TOPOLOGY_AUTO_TASK_HOOKS, Lists.newArrayList(MyTaskHook.class.getName())); Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Jun 2, 2014 at 7:25 PM, Chris Bedford ch...@buildlackey.com wrote: Yes.. if i used prepare or open on spouts or bolts it would work, but unfortunately it would be a bit brittle. I'd have to include a spout or bolt just for initializing my invariant code... i'd rather do that when the topology is activated on the worker.. so this seems like a good use of an activated()method on the StormTopology class (where activated() would be called after the StormTopology is deserialized by the worker node process). But, if there is no such method, I will make do with what is there. thanks for your response. chris On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant vaill...@animetrics.com wrote: The bolt base classes have a prepare method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html and the spout base classes have a similar activate method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html Is that sufficient for your needs or were you thinking of something different? Marc On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote: Hi there - I would like to set up some state that spouts and bolts share, and I'd like to prepare this state when the StormTopology gets 'activated' on a worker. it would be great if the StormTopology had something like a prepare or open method to indicate when it is starting. I looked but i could find no such API. Maybe I should submit an enhancement request ? Thanks in advance for your responses, - Chris [ if anyone is curious, the shared state is for all my application code to check or not check invariants. the invariant checking takes additional time, so we don't want to do it in production.. but during testing/development it helps catch bugs]. -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com
Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?
No, it will be called per bolt instance. That's why init code needs to be guarded behind a double-check lock to guarantee it only executes once per JVM. e.g. private static volatile boolean initialized = false; ... if (!initialized) { synchronized(MyInitCode.class) { if (!initialized) { // do stuff initialized = true; } } } Until there's a set of lifecycle hooks, that's about as good as I've cared to make it. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Jun 2, 2014 at 8:27 PM, Chris Bedford ch...@buildlackey.com wrote: This looks promising. thanks. hope you don't mind one more question -- if i create my own implementation of ITaskHook and add it do the config as you illustrated in prev. msg..will the prepare() method of my implementation be called exactly once shortly after StormTopology is deserialized by the worker node process ? - chris On Mon, Jun 2, 2014 at 7:09 PM, Michael Rose mich...@fullcontact.com wrote: You don't have to include a specific bolt for init code. It's not difficult to push your init code into a separate class and call it from your bolts, lock on that class, run init, and then allow other instances to skip over it. Without changing bolt/spout code, I've taken to including a task hook for init code (e.g. properties / Guice). Check out BaseTaskHook, it's easily extendible and can be included pretty easily too: stormConfig.put(Config.TOPOLOGY_AUTO_TASK_HOOKS, Lists.newArrayList(MyTaskHook.class.getName())); Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Jun 2, 2014 at 7:25 PM, Chris Bedford ch...@buildlackey.com wrote: Yes.. if i used prepare or open on spouts or bolts it would work, but unfortunately it would be a bit brittle. I'd have to include a spout or bolt just for initializing my invariant code... i'd rather do that when the topology is activated on the worker.. so this seems like a good use of an activated()method on the StormTopology class (where activated() would be called after the StormTopology is deserialized by the worker node process). But, if there is no such method, I will make do with what is there. thanks for your response. chris On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant vaill...@animetrics.com wrote: The bolt base classes have a prepare method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html and the spout base classes have a similar activate method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html Is that sufficient for your needs or were you thinking of something different? Marc On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote: Hi there - I would like to set up some state that spouts and bolts share, and I'd like to prepare this state when the StormTopology gets 'activated' on a worker. it would be great if the StormTopology had something like a prepare or open method to indicate when it is starting. I looked but i could find no such API. Maybe I should submit an enhancement request ? Thanks in advance for your responses, - Chris [ if anyone is curious, the shared state is for all my application code to check or not check invariants. the invariant checking takes additional time, so we don't want to do it in production.. but during testing/development it helps catch bugs]. -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com
Re: Which daemon tool for Storm nodes?
We use upstart. Supervisord would also work. Just anything to keep an eye on it and restart it if it dies (a very rare occurrence). Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Wed, May 21, 2014 at 7:09 PM, Connie Yang cybercon...@gmail.com wrote: Hi, Is there a recommended supervisor or daemon tool for Storm nodes? I'm using Supervisor for Storm and Zookeeper. I heard some concern in using Supervisor, but I haven't researched into the matter. Any comments on this? Thanks, Connie
Re: Storm with video/audio streams
No reason why you couldn't do it, but as far as I know it hasn't been done before. You can send any kind of serializable data into a topology. You'd probably need to emit frames from the spout. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, May 19, 2014 at 6:13 AM, Suparno Datta suparno.da...@gmail.comwrote: Hi, I just started playing around with storm a few days back. I worked with some basic examples using the twitter streaming api. I was wondering If storm can also be used with live video/audio streaming api ( the youtube streaming api for example). I wanted to develop a app which does something like detecting and tracking faces from a live video stream using Storm. But i am not sure if this is even feasible. Most of the examples i see deal with text streams. Has any one here already tried/ thought of something like this ? Thanks, -- Suparno Datta
Re: How resilient is Storm w/o supervision
In practice, we rarely have issues with our Storm processes. We have Nimbus/supervisors that never restart for months. But when they do have issues, it's very nice to have them under supervision. Sometimes it's not even things under their control (e.g. OOM during high memory usage scenarios). For development, there shouldn't be an issues foregoing supervision. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Fri, May 2, 2014 at 12:41 PM, P. Taylor Goetz ptgo...@gmail.com wrote: I don’t think you need root to run supervisord: http://supervisord.org/running.html If you’re just testing something out, and don’t mind your cluster going down, then running without supervision is okay. But I would NEVER suggest someone run Storm’s daemons without supervision in a production environment. - Taylor On May 2, 2014, at 2:29 PM, Albert Chu ch...@llnl.gov wrote: I'm attempting to run Storm on a platform that I don't have root on. So I won't be able to run it under Redhat's supervisord that's already installed. How resilient are the Storm daemons by themselves? Are they reasonably resilient or are they programmed to not handle even relatively simple errors? I should probably say, this probably wouldn't be run in a production environment. Just trying to understand if the documentation writers are saying, you should really do this for production or it won't work if you don't do this. Thanks, Al -- Albert Chu ch...@llnl.gov Computer Scientist High Performance Systems Division Lawrence Livermore National Laboratory
Re: Machine specs
In AWS, we're fans of c1.xlarges, m3.xlarges, and c3.2xlarges, but have seen Storm successfully run on cheaper hardware. Our Nimbus server is usually bored on a m1.large. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Wed, Apr 30, 2014 at 9:48 PM, Cody A. Ray cody.a@gmail.com wrote: We use m1.larges in EC2 http://aws.amazon.com/ec2/instance-types/ for both nimbus and supervisor machines (though the m1 family have been deprecated in favor of m3). Our use case is to do some pre-aggregation before persisting the data in a store. (The main bottleneck in this setup is the downstream datastore, but memory is the primary constraint on the worker machines due to the in-memory cache which wraps the trident state.) For what its worth, Infochimps suggestshttps://github.com/infochimps-labs/big_data_for_chimps/blob/master/25-storm%2Btrident-tuning.asciidocc1.xlarge or m3.xlarge machines. Using the Amazon cloud machines as a reference, we like to use either the c1.xlarge machines (7GB ram, 8 cores, $424/month, giving the highest CPU-performance-per-dollar) or the m3.xlargemachines (15 GB ram, 4 cores, $365/month, the best balance of CPU-per-dollar and RAM-per-dollar). You shouldn’t use fewer than four worker machines in production, so if your needs are modest feel free to downsize the hardware accordingly. Not sure what others would recommend. -Cody On Wed, Apr 30, 2014 at 5:57 PM, Software Dev static.void@gmail.comwrote: What kind of specs are we looking at for 1) Nimbus 2) Workers Any recommendations? -- Cody A. Ray, LEED AP cody.a@gmail.com 215.501.7891
Re: Is it a must to have /etc/hosts mapping or a DNS in a multinode setup?
We don't use /etc/hosts mapping, we only use hostnames / ips. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Tue, Apr 29, 2014 at 8:29 AM, Derek Dagit der...@yahoo-inc.com wrote: I have not tried it, but there is a config for this purpose: https://github.com/apache/incubator-storm/blob/ dc4de425eef5701ccafe0805f08eeb011baae0fb/storm-core/src/jvm/ backtype/storm/Config.java#L122-L131 -- Derek On 4/29/14, 0:41, Sajith wrote: Hi all, Is it a must to have a /etc/hosts mapping or a DNS in a multinode storm cluster? Can't supervisors talk to each other through ZooKeeper or nimbus using IP addresses directly ?
Re: PDF processing use case in storm!!
I'd be inclined to say that while you can make it work, your unit of work (a whole PDF) makes it unsuitable to Storm. In general, the more you can break down each operation the better Storm will work for you. You're likely to get worse latency out of Storm due to the essentially random delegation of tuples. You could really abuse Storm if you wanted and use it as a distributed application container with threadpools, I've done it. But you're really going to see a better experience out of a webservice if it's live-mode requests. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Apr 28, 2014 at 8:23 AM, Deepak Sharma deepakmc...@gmail.comwrote: We need parallelism here. As lot of users may be using the service at the same time.It may be for different file or the same file. Thanks Deepak On Mon, Apr 28, 2014 at 7:41 PM, Marc Vaillant vaill...@animetrics.comwrote: I think it's important to know whether or not some form of parallelism (other than throughput) is required, otherwise a standard webservice seems sufficient for this use case. On Mon, Apr 28, 2014 at 07:46:35AM -0400, Andrew Perepelytsya wrote: You can build request response type topologies via DRPC. However, unless we're talking about processing numerous pdfs at once - bad fit, IMO. If there is parallelism required you might be better off with a custom yarn app - looks like YAYA makes it tolerable top write. Andrew On Apr 28, 2014 2:41 AM, Deepak Sharma deepakmc...@gmail.com wrote: Hi All, Just wanted to check if this can be valid storm use case. I want to write 1 simple storm topology which can read pdf file , process it , make some changes like convert it to doc and save the new file. I know this can be easily done in batch mode using hadoop.But we want to do it in real time ,i.e. when the user demands it. We already do it using some java api but it takes lot of time in all conversions. Can this be achieved in Storm?If yes , Is there any pointer to any examples similar to this use case? -- Thanks Deepak www.bigdatabig.com CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. -- Thanks Deepak www.bigdatabig.com www.keosha.net
Re: Passing command line arguments to storm
Yes. Ultimately, that runs the main method of MyTopology, so just like any other main method you get String[] args. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Thu, Apr 17, 2014 at 5:36 PM, Software Dev static.void@gmail.comwrote: Is it possible to pass arguments that adhere to the CommandLine interface? storm jar topology.jar com.test.MyTopology --config foo --remote
Re: Server load - Topology optimization
messageId is any unique identifier of the message, such that when ack is called on your spout you're returned the identifier to then mark the work as complete in the source in the case it supports replay. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Wed, Mar 19, 2014 at 10:18 AM, David Crossland da...@elastacloud.comwrote: Where is this messageId derived from? I note there is a tuple.getMessageId() but there does not appear to be an overload to emit that accepts this? There is an overload *emit*http://nathanmarz.github.io/storm/doc/backtype/storm/task/OutputCollector.html#emit(java.lang.String,%20java.util.List)(java.lang.String streamId, java.util.Listjava.lang.Object tuple) I don't think this is what you are referring to however. My bolt execute method looks like this; (sorry if I'm being obtuse, this is still very new.. ) public void execute(Tuple tuple) { String msg = (String)tuple.getValue(0); if(msg == null) { logger.log(Level.ERROR, Message is null); //acknowledge the tuple has failed and return _collector.fail(tuple); return; } MessageProcessor processor = new MessageProcessor(); EventMessage message = processor.processMessage(msg); if(message == null) { logger.log(Level.DEBUG, Message did not conform to a known event); logger.log(Level.DEBUG, msg); //acknowlege the tuple, but dont do anything with it, we dont have to emit _collector.ack(tuple); } if(message instanceof MonetiseEvent) { logger.log(Level.DEBUG, recieved monetise message); _collector.emit(new Values(message)); _collector.ack(tuple); } } D *From:* Sean Zhong clock...@gmail.com *Sent:* Wednesday, 19 March 2014 15:39 *To:* user@storm.incubator.apache.org Hi David, As I said, to get the ack count for spout, you need to add a message Id when emitting. Likes this, collector.emit(new Values(msg), messageId) ^| here If the message Id is null, then there will be ack message sent to spout. Sean On Wed, Mar 19, 2014 at 4:49 PM, David Crossland da...@elastacloud.comwrote: I had noticed the ack count never increased for the spout. The spout is a BaseRichSpout, I presumed the underlying code should handle the ack and display appropriate metrics. But you can see from the first bolt that all messages have been asked.. I don't know if this is an issue. Ive been doing a bit of reading around the service bus, if I understand correctly it can push 20 messages per second per connection (I need to confirm this with a colleague who has the experience with this..) If that's the case then it tallies with the throughput I've been seeing (approx…). I suspect the problem isn't so much the topology, but the service bus itself. I'll come back if this doesn't pan out. D *From:* Sean Zhong clock...@gmail.com *Sent:* Wednesday, 19 March 2014 00:45 *To:* user@storm.incubator.apache.org The Spout is suspicous. From the screenshots, you are using no-acked spout, there may also be GC issue there. Here is the suggestion: Check whether you are making connection in the context of nextTuple. If that is true, it means a large latency. You can check the spout latency by enabling acked spout(when emmit, add a messageId. e.g. emit(new Values(msg), messageId), and check the latency. If this is the case, you need to create a seperate thread for data connection in spout, and use a queue to buffer messages, then nextTuple just pol the queue. On Wed, Mar 19, 2014 at 8:06 AM, Michael Rose mich...@fullcontact.comwrote: Well, I see you have 30 spouts instances and 3 bolt instances. Doesn't seem like it's a huge bottleneck, but it is having to send over the wire much of the time. Something to keep in mind for the future. I'd be most suspicious of your spouts. All spouts on a single worker are run single-threaded (in an event loop calling nextTuple()), so if you have ANY blocking work that'll kill throughput. If AzureServiceBus is anything like SQS, response times are not instant. We tend to have background threads feeding a queue in our spouts for that reason, as we use SQS for many of our topologies. Given 12 workers, you'd have ~3 spouts per machine running single-threaded. With spouts, you should attempt to maintain as little blocking work as possible...if you're using a queue, you should be using Queue#poll() to either return a value OR null, and only emit a tuple if an event is available (and meets your criteria). Storm will handle rate limiting the spouts with sleeps. Michael Rose (@Xorlev https://twitter.com/xorlev
Re: Server load - Topology optimization
Well, I see you have 30 spouts instances and 3 bolt instances. Doesn't seem like it's a huge bottleneck, but it is having to send over the wire much of the time. Something to keep in mind for the future. I'd be most suspicious of your spouts. All spouts on a single worker are run single-threaded (in an event loop calling nextTuple()), so if you have ANY blocking work that'll kill throughput. If AzureServiceBus is anything like SQS, response times are not instant. We tend to have background threads feeding a queue in our spouts for that reason, as we use SQS for many of our topologies. Given 12 workers, you'd have ~3 spouts per machine running single-threaded. With spouts, you should attempt to maintain as little blocking work as possible...if you're using a queue, you should be using Queue#poll() to either return a value OR null, and only emit a tuple if an event is available (and meets your criteria). Storm will handle rate limiting the spouts with sleeps. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Tue, Mar 18, 2014 at 5:14 PM, David Crossland da...@elastacloud.comwrote: Perhaps these screenshots might shed some light? I don't think there is much of a latency issue. I'm really starting to suspect there is some consumption rate issue from the topic. I set the spout to a high parallelism value as it did seem to improve throughput.. But if there is anything you can spot that would be grand Thanks David *From:* Nathan Leung ncle...@gmail.com *Sent:* Tuesday, 18 March 2014 21:14 *To:* user@storm.incubator.apache.org It could be bolt 3. What is the latency like between your worker and your redis server? Increasing the number of threads for bolt 3 will likely increase your throughput. Bolt 1 and 2 are probably CPU bound, but bolt 3 is probably restricted by your network access. Also I've found that localOrShuffleGrouping can improve performance due to reduced network communications. On Tue, Mar 18, 2014 at 3:55 PM, David Crossland da...@elastacloud.comwrote: A bit more information then There are 4 components Spout - This is reading from an azure service bus topic/subscription. A connection is created in the open() method of the spout, nextTuple does a peek on the message, and invokes the following code; StringWriter writer = new StringWriter(); IOUtils.copy(message.getBody(), writer); String messageBody = writer.toString(); It then deletes the message from the queue. Overall nothing all that exciting.. Bolt 1 - Filtering Parses the message body (json string) and converts it to an object representation. Filters out anything that isn't a monetise message. It then emits the monetise message object to the next bolt. Monetise messages account for ~ 0.03% of the total message volume. Bolt 2 - transformation Basically extracts from the monetise object the values that are interesting and contracts a string which it emits Bolt 3 - Storage Stores the transformed string in Redis using the current date/time as key. - Shuffle grouping is used with the topology I ack every tuple irrespective of whether I emit the tuple or not. It should not be attempting to replay tuple. - I don't think Bolt 2/3 are the cause of the bottleneck. They don't have to process much data at all tbh. I can accept that perhaps there is something inefficient with the spout, perhaps it just can't read from the service bus quickly enough. I will do some more research on this and have a chat with the colleague who wrote this component. I suppose I'm just trying to identify if I've configured something incorrectly with respect to storm, whether I'm correct to relate the total number of executors and tasks to the total number of cores I have available. I find it strange that I get a better throughput when I choose an arbitrary large number for the parallelism hint than if I constrain myself to a maximum that equates to the number of cores. D *From:* Nathan Leung ncle...@gmail.com *Sent:* Tuesday, 18 March 2014 18:38 *To:* user@storm.incubator.apache.org In my experience storm is able to make good use of CPU resources, if the application is written appropriately. You shouldn't require too much executor parallelism if your application is CPU intensive. If your bolts are doing things like remote DB/NoSQL accesses, then that changes things and parallelizing bolts will give you more throughput. Not knowing your application, the best way to pin down the problem is to simplify your topology. Cut out everything except for the Spout. How is your filtering done? if you return without emitting, the latest versions of storm will sleep before trying again. It may be worthwhile to loop in the spout until you receive a valid message, or the bus
Re: Best way to get JMX access to storm metrics for spout tuples emitted / acked | bolt tuples emitted, latency etc.
Depending on what you need, you could use the Thrift interface to Nimbus to grab the statistics. The UI project does do some calculations on the raw metrics, so it's not quite the same. The algorithms it uses aren't too difficult to replicate. We ended up building something very similar to what Ooyala did, customized for our specific needs, it's an excellent pattern. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Mar 17, 2014 at 6:21 PM, Chris Bedford ch...@buildlackey.comwrote: Hi, I'm interested in getting metrics via JMX on not onlycontainer level factors (such as # of garbage collects, heap usage,etc.), but also metrics that describe how spouts and bolts are performing (e.g., # of tuples emitted, # transferred -- the same kind of stuff that the storm UI shows.) I ran across this project : : https://github.com/ooyala/metrics_storm/blob/master/src/ooyala/common/metrics_storm/MetricsStorm.scala I was just wondering if this is the best best to pursue. Does anyone have any concrete experience with this that they can share? thanx! chris
Re: Storm stream grouping examples
What kind of comparisons are you looking for? How they functionally work? Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Wed, Mar 5, 2014 at 9:52 AM, Roberto Coluccio roberto.coluc...@gmail.com wrote: Hello folks, I was unable to find any complete example (or, better, related work in the scientific literature) in which (almost) all the *stream grouping policies* have been used and compared. Do you have any reference you could please share with me? Thank you and best regards, Roberto Coluccio
Re: Storm stream grouping examples
+1, localOrShuffle will be a winner, as long as it's evenly distributing work. If 1 tuple could say produce a variable 1-100 resultant tuples (and these results were expensive enough to process, e.g. IO), it might well be worth shuffling vs. localShuffling. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Wed, Mar 5, 2014 at 11:19 AM, Nathan Leung ncle...@gmail.com wrote: In my experience on a 1 Gb network localOrShuffleGrouping was a clear winner in terms of performance. But I haven't tested with 10 Gb, and if you have substantial business logic then that becomes a bigger factor than serializing/transferring data on the network. I think the performance of any given grouping is too dependent on your business logic; it will be difficult to quantify how well it performs in a canned benchmark. And sometimes your business logic will define a grouping for you (e.g. fields grouping) whether it's the best performer or not. On Wed, Mar 5, 2014 at 1:05 PM, Roberto Coluccio roberto.coluc...@gmail.com wrote: Hello Michael, thanks for your feedback. I'm looking for a performance comparison. I know that not all the policies are really comparable, but even obvious comparisons all listed together could be a useful reference. Roberto On Wed, Mar 5, 2014 at 6:58 PM, Michael Rose mich...@fullcontact.comwrote: What kind of comparisons are you looking for? How they functionally work? Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Wed, Mar 5, 2014 at 9:52 AM, Roberto Coluccio roberto.coluc...@gmail.com wrote: Hello folks, I was unable to find any complete example (or, better, related work in the scientific literature) in which (almost) all the *stream grouping policies* have been used and compared. Do you have any reference you could please share with me? Thank you and best regards, Roberto Coluccio
Re: Tuning and nimbus at 99%
Otis, I'm a fan of SPM for Storm, but there's other debugging that needs to be done here if the process quits constantly. Sean, Since you're using storm-deploy, I assume the processes are running under supervisor. It might be worth killing the supervisor by hand, then running it yourself (ssh as storm, cd storm/daemon, supervise .) and seeing what kind of errors you see. Are your disks perhaps filled? Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Mar 3, 2014 at 6:49 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi Sean, I don't think you can see the metrics you need to see with AWS CloudWatch. Have a look at SPM for Storm. You can share graphs from SPM directly if you want, so you don't have to grab and attach screenshots manually. See: http://blog.sematext.com/2014/01/30/announcement-apache-storm-monitoring-in-spm/+ http://sematext.com/spm/ My bet is that you'll see GC metrics spikes Otis -- Performance Monitoring * Log Analytics * Search Analytics Solr Elasticsearch Support * http://sematext.com/ On Mon, Mar 3, 2014 at 8:21 PM, Sean Solbak s...@solbak.ca wrote: I just created a brand new cluster with storm-deploy command. lein deploy-storm --start --name storm-dev --commit 1bcc169f5096e03a4ae117efc65c0f9bcfa2fa22 I had a meeting, did nothing to the box, no topologies were run. I came back 2 hours later and nimbus was at 100% cpu. I'm running on an m1-small on the following ami - ami-58a3cf68. Im unable to get a threaddump as the process is getting killed and restarted too fast. I did attach a 3 hour snapshot of the ec2 monitors. Any guidance would be much appreciated. Thanks, S On Sun, Mar 2, 2014 at 9:11 PM, Sean Solbak s...@solbak.ca wrote: The only error in the logs is which happened over 10 days ago was. 2014-02-22 01:41:27 b.s.d.nimbus [ERROR] Error when processing event java.io.IOException: Unable to delete directory /mnt/storm/nimbus/stormdist/test-25-1393022928. at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:981) ~[commons-io-1.4.jar:1.4] at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1381) ~[commons-io-1.4.jar:1.4] at backtype.storm.util$rmr.invoke(util.clj:442) ~[storm-core-0.9.0.1.jar:na] at backtype.storm.daemon.nimbus$do_cleanup.invoke(nimbus.clj:819) ~[storm-core-0.9.0.1.jar:na] at backtype.storm.daemon.nimbus$fn__5528$exec_fn__1229__auto5529$fn__5534.invoke(nimbus.clj:896) ~[storm-core-0.9.0.1.jar:na] at backtype.storm.timer$schedule_recurring$this__3019.invoke(timer.clj:77) ~[storm-core-0.9.0.1.jar:na] at backtype.storm.timer$mk_timer$fn__3002$fn__3003.invoke(timer.clj:33) ~[storm-core-0.9.0.1.jar:na] at backtype.storm.timer$mk_timer$fn__3002.invoke(timer.clj:26) ~[storm-core-0.9.0.1.jar:na] at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.4.0.jar:na] at java.lang.Thread.run(Thread.java:701) ~[na:1.6.0_27] Its fine. I can rebuild a new cluster. Storm deploy makes it pretty easy. Thanks for you help on this! As for my other question. If my trident batch interval is 500ms and I keep the spout pending and batch size small enough, will I be able to get real time results (ie sub 2 seconds)? I've played with the various metrics (I literally have a spreadsheet of parameters to results) and haven't been able to get it. Am I just doing it wrong? What would the key parameters be? The complete latency is 500 ms but trident seems to be way behind despite non of my bolts having a capacity 0.6. This may have to do with nimbus being throttled so I will report back. But if there are people out there who have done this kind of thing, Id like to know if Im missing an obvious parameter or something. Thanks, S On Sun, Mar 2, 2014 at 8:09 PM, Michael Rose mich...@fullcontact.comwrote: The fact that the process is being killed constantly is a red flag. Also, why are you running it as a client VM? Check your nimbus.log to see why it's restarting. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Sun, Mar 2, 2014 at 7:50 PM, Sean Solbak s...@solbak.ca wrote: uintx ErgoHeapSizeLimit = 0 {product} uintx InitialHeapSize := 27080896 {product} uintx LargePageHeapSizeThreshold= 134217728 {product} uintx MaxHeapSize := 698351616 {product} so initial size of ~25mb and max of ~666 mb Its a client process (not server ie the command is java -client -Dstorm.options...). The process gets killed and restarted continously with a new PID (which makes getting the PID tough to get stats on). I dont have VisualVM but if I run jstat -gc PID, I get
Re: Zookeepr on different ports
I'd recommend just using one Zookeeper instance if they're on the same physical host. There's no reason why a development ZK ensemble needs 3 nodes. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Sun, Mar 2, 2014 at 10:15 AM, Arun Sethia sethia.a...@gmail.com wrote: Hi, We have setup three zookeeper instances on one virtual machine, they are running on different ports (2181,2182,2183). Eventually in production we will have each instance on separate virtual machine and we can have same port (2181). We have seen we can configure multiple zookeeper instance (cluster) using storm.zookeeper.servers, and we can use storm.zookeeper.port to define a port. Since we have zookeeper on one machine on different ports (2181,2182,2183), but not able to configure different ports using storm.zookeeper.port. Any help will be great for us. Regards, Arun
Re: Tuning and nimbus at 99%
The fact that the process is being killed constantly is a red flag. Also, why are you running it as a client VM? Check your nimbus.log to see why it's restarting. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Sun, Mar 2, 2014 at 7:50 PM, Sean Solbak s...@solbak.ca wrote: uintx ErgoHeapSizeLimit = 0 {product} uintx InitialHeapSize := 27080896 {product} uintx LargePageHeapSizeThreshold= 134217728 {product} uintx MaxHeapSize := 698351616 {product} so initial size of ~25mb and max of ~666 mb Its a client process (not server ie the command is java -client -Dstorm.options...). The process gets killed and restarted continously with a new PID (which makes getting the PID tough to get stats on). I dont have VisualVM but if I run jstat -gc PID, I get S0CS1CS0US1U EC EUOC OU PC PUYGC YGCTFGCFGCT GCT 832.0 832.0 0.0 352.9 7168.0 1115.9 17664.0 1796.0 21248.0 16029.6 50.268 0 0.0000.268 At this point I'll likely just rebuild the cluster. Its not in prod yet as I still need to tune it. I should have wrote 2 separate emails :) Thanks, S On Sun, Mar 2, 2014 at 7:10 PM, Michael Rose mich...@fullcontact.comwrote: I'm not seeing too much to substantiate that. What size heap are you running, and is it near filled? Perhaps attach VisualVM and check for GC activity. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Sun, Mar 2, 2014 at 6:54 PM, Sean Solbak s...@solbak.ca wrote: Here it is. Appears to be some kind of race condition. http://pastebin.com/dANT8SQR On Sun, Mar 2, 2014 at 6:42 PM, Michael Rose mich...@fullcontact.comwrote: Can you do a thread dump and pastebin it? It's a nice first step to figure this out. I just checked on our Nimbus and while it's on a larger machine, it's using 1% CPU. Also look in your logs for any clues. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Sun, Mar 2, 2014 at 6:31 PM, Sean Solbak s...@solbak.ca wrote: No, they are on seperate machines. Its a 4 machine cluster - 2 workers, 1 nimbus and 1 zookeeper. I suppose I could just create a new cluster but Id like to know why this is occurring to avoid future production outages. Thanks, S On Sun, Mar 2, 2014 at 6:19 PM, Michael Rose mich...@fullcontact.comwrote: Are you running Zookeeper on the same machine as the Nimbus box? Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Sun, Mar 2, 2014 at 6:16 PM, Sean Solbak s...@solbak.ca wrote: This is the first step of 4. When I save to db I'm actually saving to a queue, (just using db for now). The 2nd step we index the data and 3rd we do aggregation/counts for reporting. The last is a search that I'm planning on using drpc for. Within step 2 we pipe certain datasets in real time to the clients it applies to. I'd like this and the drpc to be sub 2s which should be reasonable. Your right that I could speed up step 1 by not using trident but our requirements seem like a good use case for the other 3 steps. With many results per second batching should effect performance a ton if the batch size is small enough. What would cause nimbus to be at 100% CPU with the topologies killed? Sent from my iPhone On Mar 2, 2014, at 5:46 PM, Sean Allen s...@monkeysnatchbanana.com wrote: Is there a reason you are using trident? If you don't need to handle the events as a batch, you are probably going to get performance w/o it. On Sun, Mar 2, 2014 at 2:23 PM, Sean Solbak s...@solbak.ca wrote: Im writing a fairly basic trident topology as follows: - 4 spouts of events - merges into one stream - serializes the object as an event in a string - saves to db I split the serialization task away from the spout as it was cpu intensive to speed it up. The problem I have is that after 10 minutes there is over 910k tuples emitted/transfered but only 193k records are saved. The overall load of the topology seems fine. - 536.404 ms complete latency at the topolgy level - The highest capacity of any bolt is 0.3 which is the serialization one. - each bolt task has sub 20 ms execute latency and sub 40 ms process latency. So it seems trident has all the records internally, but I need these events as close to realtime as possible. Does anyone have any guidance as to how to increase the throughput? Is it simply a matter of tweeking max spout pending and the batch size? Im running it on 2 m1
Re: JDBC Connections
We generally instantiate a pool per JVM, where the maxActive is (# of bolts using JDBC/num workers+1) (e.g. 64 bolts, 4 workers, 17 conns/JVM). Pooling is our preferred strategy as the pool will shrink (and thus use less memory on the corresponding SQL server) if it's not being utilized. In either case, t's pushing the management, verification, and reestablishment of broken connections into the pool (which is also why we have 1 extra conn -- for when a conn is tied up running a validation query or is being reestablished). Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Thu, Feb 20, 2014 at 8:28 AM, Richards Peter hbkricha...@gmail.comwrote: Hi Pablo, Your approach is fine. Alternative approach proposed by Brian can be used if you can group the relevant bolt instance/es, which are communicating to the database, into the appropriate worker process/es. However with the alternative approach you shouldn't end up creating separate pools in all the workers used by the topology. Hope this helps. Richards Peter. On Thu, Feb 20, 2014 at 8:33 PM, Pablo Acuña pa...@guud.tv wrote: I keep one connection per bolt and for now it works just fine with many bolts. I would also be interested in hearing from someone else and share experiences. For now, I open the connection in the method prepare (and close it in cleanup), but to be completely honest, I'm not 100% sure if this is the best approach. cheers, Pablo. On Wed, Feb 19, 2014 at 4:41 PM, Brian O'Neill b...@alumni.brown.eduwrote: Yes. We use JDBI to access MySQL. We've had success with a shared connection pool. (one per JVM) The bolts in the JVM share the pool. But either approach should work (pool per bolt, or connection per bolt). It just depends at what level you want to manage your connections. We do it at the worker level. (n connections per worker) -brian --- Brian O'Neill Chief Technology Officer *Health Market Science* *The Science of Better Results* 2700 Horizon Drive * King of Prussia, PA * 19406 M: 215.588.6024 * @boneill42 http://www.twitter.com/boneill42 * healthmarketscience.com This information transmitted in this email message is for the intended recipient only and may contain confidential and/or privileged material. If you received this email in error and are not the intended recipient, or the person responsible to deliver it to the intended recipient, please contact the sender at the email above and delete this email and any attachments and destroy any copies thereof. Any review, retransmission, dissemination, copying or other use of, or taking any action in reliance upon, this information by persons or entities other than the intended recipient is strictly prohibited. From: Klausen Schaefersinho klaus.schaef...@gmail.com Reply-To: user@storm.incubator.apache.org Date: Wednesday, February 19, 2014 at 5:58 AM To: user@storm.incubator.apache.org Subject: JDBC Connections Hi, one of my bolt will need to write to a MySql data base. Does anybody has some experience with this? What are the best practices? Use an connection pool? Or keep one connection open per bolt? Cheers, klaus
Re: Bolts with long instantiation times -- A.K.A. Zookeeper shenanigans
You may need to configure your cluster to give it more time to start up. Additionally, knowing how long it can take to load the Stanford NLP models, make sure you're only doing it in a single bolt instance (e.g. static initializer or double-check synch) and sharing it between all your bolt instances. supervisor.worker.start.timeout.secs 120 supervisor.worker.timeout.secs 60 I'd try tuning your worker start timeout here. Try setting it up to 300s and (again) ensuring your prepare method only initializes expensive resources once, then shares them between instances in the JVM. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Tue, Feb 18, 2014 at 1:45 PM, Eddie Santos easan...@ualberta.ca wrote: Hi all, How do you get bolts that take a ludicrously long time to load (we're talking minutes here) to cooperate with Zookeeper? I may not be understanding my problem properly, but on my test cluster (**not** in local mode!) my bolt keeps getting restarted in the middle of its prepare() method -- which may take up to two minutes to return. The problem seems to be the Client session timed out, but I'm not knowledgable enough with Zookeeper to really know how to fix this. Here's a portion of logs from the supervisor affected. The STDIO messages come from a poorly-coded third party library that I have to use. 2014-01-17 23:19:28 o.a.z.ClientCnxn [INFO] Client session timed out, have not heard from server in 2747ms for sessionid 0x143a22eb4060078, closing socket connection and attempting reconnect 2014-01-17 23:19:28 b.s.d.worker [DEBUG] Doing heartbeat #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139768, :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]}, :port 6702} 2014-01-17 23:19:28 b.s.d.worker [DEBUG] Doing heartbeat #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139768, :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]}, :port 6702} 2014-01-17 23:19:28 c.n.c.f.s.ConnectionStateManager [INFO] State change: SUSPENDED 2014-01-17 23:19:28 c.n.c.f.s.ConnectionStateManager [WARN] There are no ConnectionStateListeners registered. 2014-01-17 23:19:28 b.s.cluster [WARN] Received event :disconnected::none: with disconnected Zookeeper. 2014-01-17 23:19:28 b.s.cluster [WARN] Received event :disconnected::none: with disconnected Zookeeper. 2014-01-17 23:19:28 STDIO [ERROR] done [7.2 sec]. 2014-01-17 23:19:28 STDIO [ERROR] Adding annotator lemma 2014-01-17 23:19:28 STDIO [ERROR] Adding annotator ner 2014-01-17 23:19:28 STDIO [ERROR] Loading classifier from edu/stanford/nlp/models/ner/english.all.3class.distsim.crf.ser.gz 2014-01-17 23:19:28 STDIO [ERROR] ... 2014-01-17 23:19:29 b.s.d.worker [DEBUG] Doing heartbeat #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139769, :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]}, :port 6702} 2014-01-17 23:19:29 b.s.d.worker [DEBUG] Doing heartbeat #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139769, :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]}, :port 6702} 2014-01-17 23:19:30 o.a.z.ClientCnxn [INFO] Opening socket connection to server zookeeper/192.168.50.3:2181 ^-- This is where the bolt gets restarted in its initialization. Thanks, Eddie
Re: http-client version conflict
We've done this with SLF4j and Guava as well without issues. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Thu, Feb 6, 2014 at 3:03 PM, Mark Greene m...@evertrue.com wrote: We had this problem as well. We modified our chef cookbook to just replace the older version with the newer one and storm didn't complain or have any other issues as a result. On Wed, Feb 5, 2014 at 10:31 AM, P. Taylor Goetz ptgo...@gmail.comwrote: Your best bet is probably to use the shade plugin to relocate the http-client package so it doesn't conflict with the version storm uses. Storm does this with the libtrhift dependency in storm-core: https://github.com/apache/incubator-storm/blob/master/storm-core/pom.xml#L220 (You can ignore the clojure transformer in that config, unless you have non-AOT clojure code that uses the http-client library). More information on using the shade plugin to do package relocations can be found here: http://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html - Taylor On Feb 4, 2014, at 4:27 PM, Vinay Pothnis vinay.poth...@gmail.com wrote: Hello, I am using storm version 0.9.0.1. My application depends on apache http-client version 4.3.2 - but storm depends on http-client version 4.1.1. What is the best way to override this dependency? Thanks Vinay
Re: ZeroMQ Exception causes Topology to get killed
What version of ZeroMQ are you running? You should be running 2.1.7 with nathan's provided fork of JZMQ. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Fri, Jan 10, 2014 at 9:09 PM, Gaurav Sehgal gsehg...@gmail.com wrote: Hi, I am getting the following exception in the cluster. These exceptions happen in the worker log; but they eventually cause to topology to die. Can anyone please share some inputs. java.lang.UnsatisfiedLinkError: org.zeromq.ZMQ$Socket.destroy()V at org.zeromq.ZMQ$Socket.destroy(Native Method) at org.zeromq.ZMQ$Socket.close(ZMQ.java:432) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:298) at backtype.storm.messaging.zmq.ZMQConnection.close(zmq.clj:45) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:298) at backtype.storm.daemon.worker$mk_refresh_connections$this__4293.invoke(worker.clj:252) at backtype.storm.daemon.worker$mk_refresh_connections$this__4293.invoke(worker.clj:218) at backtype.storm.timer$schedule_recurring$this__1776.invoke(timer.clj:69) at backtype.storm.timer$mk_timer$fn__1759$fn__1760.invoke(timer.clj:33) at backtype.storm.timer$mk_timer$fn__1759.invoke(timer.clj:26) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:662) Cheers! Gaurav
Re: Workers elasticity
Each machine can support a configurable number of workers. If a machine goes away, it'll attempt to reassign the orphaned workers to other machines using the fair scheduler. 1) Should you not have enough slots (worker slots), your topology will be a 'broken' state. However if you allow 4 workers per machine and run your topology with 8 workers, you can run 2 minimum machines and still support the topology. If you are to auto-scale your workers, you'll need a long cooldown time between changes. A rebalance isn't instant and must allow the topology to drain before reshuffling workers. A rebalance will wait {TUPLE_TIMEOUT_TIME} before rebalancing. Additionally, when adding workers you'll need to trigger a rebalance. 2) If workers slots, the topology will attempt to function but ultimately freeze as the send buffers to that worker fill. I'm not sure what you mean 'round-robin fashion to distribute load' -- the ShuffleGrouping will partition work across tasks on an even basis. 3) Yes, in storm.yaml, supervisor.slots.ports. By default it'll run with 4 slots per machine. See https://github.com/nathanmarz/storm/blob/master/conf/defaults.yaml#L77 Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Jan 6, 2014 at 11:08 AM, Spico Florin spicoflo...@gmail.com wrote: Hello! I'm newbie to storm and also to Amazon Cloud. I have the following scenario: 1. I have topology that runs on 3 workers on EC2. 2. Due to the increasing load, EC2 intantiates 2 new instances and I have to rebalance to 5 workers. 3. After the resource demand, EC2 released 2 instances and I'm forgetting to decrease the number of workers to 3. Questions: 1.So, in this case what is the behavior of the application? Will sign an error that there are more workers allocated then existing machines? Or will continue to run as nothing has happened? 2.More generally, what is the behavior of the application that declares more workers than the number of instances? Do we have a round robin fashion to distribute load among the workers? 3.Can I declare more workers on the same machine? If yes how? I look forward for your answers. Regards, Florin
Re: storm over WAN and NAT
Generally speaking, I don't know of many services that work exceedingly well over a WAN. Can you not do processing at each location and forward it on with a queue that isn't adverse to WAN links? On Dec 29, 2013 10:03 AM, Derrick Karimi derrick.kar...@gmail.com wrote: Hello, I have a requirement for real time data processing where data sources are spread over a wide geographic area on multiple networks. Are there any known previous deployments, or any forseen issues with deploying a storm cluster where machines are behind firewalls/using nat, and spread out such that latency issues and network reliability issues will be noticeable? -- --Derrick
Re: Guaranteeing message processing on strom fails
What spout are you using? Guarantees must be enforced by spout. For instance, the RabbitMQ spout doesn't ack an AMQP message until ack() is called for that message tag (or rejects if fail() is called). The spout must pass along a unique message identifier to enable this behavior. If a tuple goes missing somewhere along the line, fail() will be called after a timeout. If you kill the acker that tuple was tracked with, it's then up to the message queue or other impl to be able to replay that message. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Sun, Dec 29, 2013 at 11:39 PM, Michal Singer mic...@leadspace.comwrote: But what I see is that some of the tuples are not replayed. I print out all the tuples and some don’t arrive when I kill a worker process. Thanks, Michal *From:* Michael Rose [mailto:mich...@fullcontact.com] *Sent:* Sunday, December 29, 2013 7:26 PM *To:* user@storm.incubator.apache.org *Subject:* Re: Guaranteeing message processing on strom fails You are not guaranteed that tuples make it, only that if they go missing or processing fails it will replayed from the spout at least once execution On Dec 29, 2013 4:47 AM, Michal Singer mic...@leadspace.com wrote: Hi, I am trying to test the guaranteeing of messages on storm: I have two nodes: 1. Node-A contains: supervisor, ui, zookeeper, nimbus 2. Node-B contains: supervisor I have 1 spout and 2 bolts (I am actually running the word counter test) I send about 17 messages. The first bolt is on Node-B The second bolt is divided to 4 executors which are divided between the nodes. I kill the worker on node-B and I expect the worker on the other node to get the data which was supposed to be sent to Node-B. But the worker is raised on node-B and the data is sent there accept for one touple which is missing. 1. According to the ui – it is very difficult to see what is going on cause I guess there are messages resent and it is hard to know exactly what happened. 2. According to my output – one message is missing which was supposed to go to node-B which I killed it’s worker. I defined anchors and I sent acks. So what am I missing? Why is there data loss? Thanks, Michal
Re: Spring bolts
Make a base spring bolt, in your prepare method inject the members. That's the best I've come up with, as prepare happens server side whereas topology config and static initializers happen at deploy time client side. On Dec 25, 2013 7:51 AM, Michal Singer mic...@leadspace.com wrote: Hi, I am trying to understand how to use beans in spring as bolts/spouts. If I have the definition in spring which is initialized once the bolt or spout is initialized. But when creating a topology I need to do: new Bolt()…. And cannot get it from spring. So what is the right way to do this? Thanks, Michal
RE: Spring bolts
Yes, you'll need a Spring context in prepare. Given you have multiple bolts per JVM, its worth ensuring only one creates it in prepare then shares that context. We do this with Guice injectors and double checked locks. Each bolt uses the singleton injector to inject its members. I imagine Spring has a similar concept once you have a context. Life cycle of bolts is quite strange in Storm given they're made before deployment and serialized. There's quite a few gotchas. Bolt constructors can't be trusted, thus prepare. There may be a spring storm example out there somewhere. Merry Christmas! On Dec 25, 2013 8:17 AM, Michal Singer mic...@leadspace.com wrote: I am not sure I understand. Spring beans are defined in the spring configuration files. How can I inject them in the members. What I thought to do is that the bolts will not be spring beans and in the prepare method I will initialize the spring context. This way, the bolts will call other spring beans which are not bolts and initialized in spring. But of course this is a very limited solution. *From:* Michael Rose [mailto:mich...@fullcontact.com] *Sent:* Wednesday, December 25, 2013 5:06 PM *To:* user@storm.incubator.apache.org *Subject:* Re: Spring bolts Make a base spring bolt, in your prepare method inject the members. That's the best I've come up with, as prepare happens server side whereas topology config and static initializers happen at deploy time client side. On Dec 25, 2013 7:51 AM, Michal Singer mic...@leadspace.com wrote: Hi, I am trying to understand how to use beans in spring as bolts/spouts. If I have the definition in spring which is initialized once the bolt or spout is initialized. But when creating a topology I need to do: new Bolt()…. And cannot get it from spring. So what is the right way to do this? Thanks, Michal
Re: Emitted ≠ Acked + Failed
The statistics are sampled, but in general should be +/- 20 tuples of where they should be. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Dec 23, 2013 at 9:53 PM, churly lin chury...@gmail.com wrote: Is it possible that because the statistics storm-ui provided is not very precise? 2013/12/23 churly lin chury...@gmail.com Hi all: I have a question about the running topology's statistics. I found that, for Spout, the number spout emitted is unequal to the number spout acked plus the number spout failed.Actually, the emitted is lower than acked plus failed. Is this normal? If so, why there is some emitted tuples that neither being acked nor being faild? Thanks.
Re: Example of bolt emitting more than one stream
https://gist.github.com/Xorlev/8058947 This is the...gist...of it. :) Hope this helps! Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Fri, Dec 20, 2013 at 10:36 AM, Pete Carlson p...@tetraconcepts.comwrote: I read on the Storm concepts wiki i.e., https://github.com/nathanmarz/storm/wiki/Concepts that a bolt can emit more than one stream. Can anyone point me to an example that declares multiple streams using the declareStream method of OutputFieldsDeclarer, and specifies which stream to emit with the emit method on OutputCollector? Thanks, Pete