Re: Implications of no ackers -- and queue explosion

2013-11-10 Thread Michael Rose
As far as we can tell, if you run without ackers, the Linux OOM killer is
what eventually prevents Storm from flooding the topology anymore. ZMQ will
happily accept tuples from upstream and buffer them off-heap until the Java
component can drain the tuples (which may never happen). Same applies for
any kind of topology WITH ackers but with too much geometric tuple growth.

It's true that disabling ackers will improve performance, but at that point
it's kind of just up to you ensuring that downstream components are fast
enough to keep up with upstream traffic. We aren't trying to send 300k
tuple/s/node, so we find the overhead of ackers to be a more than
acceptable cost.

Michael

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Sun, Nov 10, 2013 at 12:15 PM, Philip O'Toole  wrote:

> This is a question about Storm 0.82.
>
> What are the implications of running without any acking? I realise that
> reliability is then not guaranteed, but without ackers I understand that
> the "topology.max.spout.pending" doesn't apply. But the Storm docs state
> that it is highly-desirable to set this value to prevent "queue explosion".
>
>
> https://github.com/nathanmarz/storm/wiki/Running-topologies-on-a-production-cluster
>
> Yet the docs also say that by disabling acking, one can improve
> performance.
>
> https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing
>
> So, is running without ackers something to be considered for production
> systems? And if so, what throttles the Storm system from flooding the
> topology with too many tuples (which I presumes eats up memory, heap,
> whatever)?
>
> Thanks,
>
> Philip
>


Re: Future of Logback?

2013-11-11 Thread Michael Rose
I believe when Jon says "log4j" he refers to log4j2. Log4j2 is yet another
successor to log4j, which claims to solve issues in logback. I wasn't able
to discern a difference without log4j's usage of Disruptor (3.x).

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Mon, Nov 11, 2013 at 2:41 PM, Patricio Echagüe wrote:

> Logback if I'm not wrong was created by the creators of log4j and it's
> more efficient and modern. What would be the rationale to switch to log4j?
>
> Sent from my Nexus 4.
> On Nov 11, 2013 6:14 PM, "P. Taylor Goetz"  wrote:
>
>> No, and I doubt there would be any pressure to switch to log4j. If you
>> look at the logback website[1] there are many Apache projects using it.
>>
>> [1] http://logback.qos.ch
>>
>>
>> On Nov 11, 2013, at 4:09 PM, Jon  wrote:
>>
>> > With the switch of Storm to Apache, are there any plans to move away
>> from Logback?...mainly pressure from Apache to switch to Apache Log4j2?
>> Just curious as this sort of pressures our application to use one logger or
>> another for non-Storm components.
>>
>>


Re: Question on storm topology, bolts with multiple threads

2013-11-26 Thread Michael Rose
Why 'strongly advise against'? We've found a lot of good reasons for
running a thread pool, especially when attempting to control latency and
queue delay.

For us, it really depends on the variability in latency of the web request.
If it's roughly even, it's worth just using more bolt instances. If your
IOs can be a little more variable, we've found it better to have a pool per
bolt and run less bolts. This way, an IO that's 3x longer won't slow down
other tuples. Of course, this is predicated on not saturating your
downstream service and a desire to more evenly spread load.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Tue, Nov 26, 2013 at 4:22 PM, Randy Buck  wrote:

> I would strongly advise against running a threadpool within a storm task
> (bolt or spout).  It would be much simpler to add more bolts or spouts to
> the topology.  See this page for more details on parallelism within a storm
> topology:
> https://github.com/nathanmarz/storm/wiki/Understanding-the-parallelism-of-a-Storm-topology
> .
>
>
> --
> Randy Buck
>
>
> On Tue, Nov 26, 2013 at 2:05 PM, Mike Chen  wrote:
>
>> Hi,
>>
>> I am considering using storm for a finance app I'm writing. I have a
>> queue of stock tickers (~10,000) as a spout, and I would like to process
>> them in parallel (process first involves a web request, then subsequent
>> tasks can be executed on bolts down the pipeline). This question is mainly
>> for the first web request that has to happen for each ticker. Is it better
>> to
>>
>> - have a single bolt, that has its own thread pool, that processes
>> tickers asynchronously (I'm guessing this is not the case)
>> - have n bolts, assign 10,000 / n tickers to each bolt (evenly) and have
>> a threadpool of size 10,000 / n on each bolt so that worst case, it can
>> process all those tickers asynchronously?
>> - something else I am not seeing?
>>
>> If its the middle choice, how do you find the appropriate number for n?
>>
>> Thanks,
>> -Mike
>>
>
>


Re: Metric_Storm

2013-12-13 Thread Michael Rose
You add it as a task hook, e.x.

Scala:
config.put(Config.TOPOLOGY_AUTO_TASK_HOOKS,
List(classOf[MetricsStormHooks].getName).asJava)

Java:
List> taskHooks = new ArrayList<>();
taskHooks.add(MetricsStormHooks.class.getName());
config.put(Config.TOPOLOGY_AUTO_TASK_HOOKS, taskHooks);

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Fri, Dec 13, 2013 at 8:40 AM, Patricio Echagüe wrote:

> It seems to be that you just instantiate MetricsStorm and call
> initiateWebConsole(port).
>
> I also posted a while ago something similar.
> http://patricioechague.blogspot.com/2013/10/different-strategies-to-monitor-storm.html
>
> Sent from my Nexus 4.
> On Dec 13, 2013 3:33 AM, "Cheng Xuntao"  wrote:
>
>> Hi, all,
>>
>> I am a starter. I want to test Storm using the metrics (
>> https://github.com/ooyala/metrics_storm). But I found no guide which I
>> do need! How to integrate the metrics into Storm or does the latest Storm
>> already included it? My question should be silly but I do appreciate your
>> help!!
>>
>> Thanks!!
>>
>> Best Regards,
>> Cheng Xuntao
>>
>>


Re: Proper way to ACK in a chain of bolts

2013-12-16 Thread Michael Rose
Hi Adrian,

The way everything works, you don't need to worry about acking the spout
tuple in bolt 2, as bolt 1 acks that AFTER bolt 1 emits anchored to bolt 2.
Anchoring (emit(tuple, new Values(...)) tells the acker task to predicate
the success of the spout tuple on the results of bolt 2. Acking sends an
update to the acker task.

Storm generates a "tuple tree" if you will as you emit from bolt to bolt,
so until you've acked the entire "tree" (in your case, bolt 2), it won't
ack the spout tuple. That's the brilliance of Storm, each bolt only needs
to worry about emitting tuples and acking if its particular function was
successful.

I strongly suggest you read up on
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processingand
how this actually works. It also gets into advanced topics such as
streaming joins where N tuples become 1 tuple and how that works under the
anchoring scheme.

Your spout also needs a unique ID attached to each spout tuple to implement
ack/fail semantics. Generally this is a message ID of some sort. Anything
that can be used to reference a particular message or offset.

To update your example:



Spout (IRichSpout / BaseRichSpout):

val someVal = queue.dequeue()

 _collector.emit(new Values(someVal), uniqueIdGenerator(someVal))   >
acker knows that bolt 1 must ack to complete the tree

  ^ You need a
unique ID (this can be a message ID from your queue) for reliability to be
enabled



Bolt1 (IRichBolt / BaseRichBolt):

def execute(tuple: Tuple) {

_collector.emit(tuple, new Values("stuff")) ---> acker knows that bolt
1 and bolt 2 must ack to complete

_collector.ack(tuple) > acker knows bolt 1 completed, bolt 2 outstanding

}



Bolt2 (IRichBolt / BaseRichBolt):

def execute(tuple2: Tuple) {

_collector.emit(tuple2, new Values("foo")) ---> If this is the end, you
don't need this. Otherwise, acker knows that boltN must ack to complete

_collector.ack(tuple2) --> acker knows bolt 2 completed, boltN still
outstanding

}




Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Mon, Dec 16, 2013 at 3:43 PM, Adrian Mocanu wrote:

>  Hi
>
> Just want to make sure I got how Ack-ing works in Storm.
>
> I have 1 spout and 2 bolts chained together. Spout emits tuple to Bolt1
> which in turn will emit a tuple to Bolt 2. I want Bolt 2 to ack the initial
> tuple sent from Spout and I'm not sure how.
>
>
>
> In order to guarantee fault tolerance (ie: tuples are resent) I want to
> ack in bolt 2 the tuple emitted by Spout just in case it fails somewhere in
> the process so it can be resent.
>
>
>
> Consider this example:
>
>
>
> Spout:
>
>  _collector.emit(new Values(queue.dequeue())
>
>
>
> Bolt1:
>
> def execute(tuple: Tuple) {
>
> _collector.emit(tuple, new Values("stuff"))
>
> }
>
>
>
> At this point tuple is the tuple sent by the spout. I can ack it here w no
> probs. Now add another bolt which listens in on tuples emitted by Bolt1.
>
>
>
> Bolt2:
>
> def execute(tuple2: Tuple) {
>
> _collector.emit(tuple2, new Values("foo"))
>
> }
>
>
>
> At this point the tuple in tuple2 is the tuple sent from Bolt1 (the one
> that has string "stuff" in it).
>
> So if I send an ack in Bolt2 this will ack the tuple from Bolt1 not the
> one sent from Spout. Correct?
>
>
>
> How can I ack the tuple that was sent from the spout? Should I piggy back
> the initial spout on all the other spouts so I can retrieve it in the last
> Bolt and ack it?
>
>
>
> I read Nathan's tutorials and I got the impression that I could ack the
> tuple received in Bolt1 (from Spout) right there after emitting tuple2.
> This would link the newly emitted tuple2 to the original tuple sent by
> Spout so when Bolt2 acks tuple 2 it actually acks the original tuple from
> the Spout. Is this true?
>
>
>
> Let me know if I'm missing something in my explanation.
>
> Thanks a lot!
>
>
>
>
>


Re: Bigger or Smaller Workers?

2013-12-18 Thread Michael Rose
It really comes down to your use case, perhaps you can comment on what
you're doing.

Personally, we run smaller workers and more of them. Mainly because having
more JVMs helps us avoid internal contention on strangely-locked JVM
internals. We have on average 2-3 workers per machine with moderately sized
heaps.

I can't say I'd think the overhead is too much more to have extra workers
if you're doing shuffles or fields grouping most of the time anyways.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Wed, Dec 18, 2013 at 8:54 PM, Jon Logan  wrote:

> Has anyone done much experimenting on optimal worker sizes? I'm basically
> unsure if it is better to run with more, smaller workers, or fewer, larger
> workers. Right now, I'm using ~3GB workers, and around 5 or so per machine.
> Would it be better to reduce this number?
>
> The main issues that come to mind are
>
> If larger workers
> - if one crashes, more data is lost
> - more GC issues for larger heap sizes
>
> if smaller workers
> - more overhead
> - more threads used
> - less local shuffling capability
> - more load on ZK/nimbus(?)
>
>
> Thoughts?
>


Re: Example of bolt emitting more than one stream

2013-12-20 Thread Michael Rose
https://gist.github.com/Xorlev/8058947

This is the...gist...of it. :) Hope this helps!

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Fri, Dec 20, 2013 at 10:36 AM, Pete Carlson wrote:

>
> I read on the Storm concepts wiki i.e.,
> https://github.com/nathanmarz/storm/wiki/Concepts
>
> that a bolt can emit more than one stream.
>
> Can anyone point me to an example that declares multiple streams using the
> declareStream method of OutputFieldsDeclarer, and specifies which stream to
> emit with the emit method on OutputCollector?
>
> Thanks,
>
> Pete
>
>


RE: Tick Tuple

2013-12-23 Thread Michael Rose
The issue is you've multiplied ticktuplems by 1000 vs dividing. So you're
probably not waiting long enough ;)

If you need an alternate spout, see storm contrib. There's a halfway decent
one.
On Dec 23, 2013 7:58 AM, "Adrian Mocanu"  wrote:

>  If anyone is interested, I’ve decided to make my own Tick Spout.
>
>
>
> *From:* Adrian Mocanu [mailto:amoc...@verticalscope.com]
> *Sent:* December-20-13 4:39 PM
> *To:* user@storm.incubator.apache.org
> *Subject:* Tick Tuple
>
>
>
> Hi everybody,
>
> I’m trying to have sent tick tuples to a bolt I have.
>
>
>
> In the bolt I have these methods
>
> def execute(tuple: Tuple){
>
> isTickTuple(tuple) match {
>
>   case true =>
>
> println("TICK TUPLE FOUND")
>
>   case false =>
>
> println("regular TUPLE FOUND")
>
> }
>
> }
>
>
>
>   override def getComponentConfiguration(): java.util.Map[String,AnyRef] =
> {
>
> val conf = new Config()
>
> val tickFrequencyInSeconds = sendPeriodMs * 1000
>
> conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
> Long.box(tickFrequencyInSeconds))
>
> conf
>
> }
>
>
>
> def isTickTuple(tuple:Tuple):Boolean = {
>
> tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
>
>   tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)
>
> }
>
>
>
> The code never prints “TICK TUPLE FOUND” but in the log I do see that a
> tick tuple was received
>
>   executor [INFO] Processing received message source: __system:-1, stream:
> __tick, id: {}, [30]
>
>
>
> Why can’t I find these tick tuples when running, but they appear in the
> log? Do tick tuples  have some special logic which makes them transparent
> to the bolt?
>
> I use storm 0.8.X
>
>
>
> -Adrian
>
>
>


Re: Emitted ≠ Acked + Failed

2013-12-23 Thread Michael Rose
The statistics are sampled, but in general should be +/- 20 tuples of where
they should be.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Mon, Dec 23, 2013 at 9:53 PM, churly lin  wrote:

> Is it possible that because the statistics storm-ui provided is not very
> precise?
>
>
> 2013/12/23 churly lin 
>
>> Hi all:
>> I have a question about the running topology's statistics.
>> I found that, for Spout, the number spout emitted is unequal to the
>> number spout acked plus the number spout failed.Actually, the emitted is
>> lower than acked plus failed.
>> Is this normal? If so, why there is some emitted tuples that neither
>> being acked nor being faild?
>> Thanks.
>>
>
>


Re: Spring bolts

2013-12-25 Thread Michael Rose
Make a base spring bolt, in your prepare method inject the members. That's
the best I've come up with, as prepare happens server side whereas topology
config and static initializers happen at deploy time client side.
On Dec 25, 2013 7:51 AM, "Michal Singer"  wrote:

> Hi, I am trying to understand how to use beans in spring as bolts/spouts.
>
> If I have the definition in spring which is initialized once the bolt or
> spout is initialized.
>
> But when creating a topology I need to do: new Bolt()….
>
> And cannot get it from spring.
>
> So what is the right way to do this?
>
>
>
> Thanks, Michal
>


RE: Spring bolts

2013-12-25 Thread Michael Rose
Yes, you'll need a Spring context in prepare. Given you have multiple bolts
per JVM, its worth ensuring only one creates it in prepare then shares that
context.

We do this with Guice injectors and double checked locks.

Each bolt uses the singleton injector to inject its members. I imagine
Spring has a similar concept once you have a context.

Life cycle of bolts is quite strange in Storm given they're made before
deployment and serialized. There's quite a few gotchas. Bolt constructors
can't be trusted, thus prepare.

There may be a spring storm example out there somewhere.

Merry Christmas!
On Dec 25, 2013 8:17 AM, "Michal Singer"  wrote:

> I am not sure I understand.
>
> Spring beans are defined in the spring configuration files. How can I
> inject them in the members.
>
> What I thought to do is that the bolts will not be spring beans and in the
> prepare method I will initialize the spring context.
>
> This way, the bolts will call other spring beans which are not bolts and
> initialized in spring. But of course this is a very limited solution.
>
>
>
>
>
> *From:* Michael Rose [mailto:mich...@fullcontact.com]
> *Sent:* Wednesday, December 25, 2013 5:06 PM
> *To:* user@storm.incubator.apache.org
> *Subject:* Re: Spring bolts
>
>
>
> Make a base spring bolt, in your prepare method inject the members. That's
> the best I've come up with, as prepare happens server side whereas topology
> config and static initializers happen at deploy time client side.
>
> On Dec 25, 2013 7:51 AM, "Michal Singer"  wrote:
>
> Hi, I am trying to understand how to use beans in spring as bolts/spouts.
>
> If I have the definition in spring which is initialized once the bolt or
> spout is initialized.
>
> But when creating a topology I need to do: new Bolt()….
>
> And cannot get it from spring.
>
> So what is the right way to do this?
>
>
>
> Thanks, Michal
>


Re: How to verify the statue of running topology / Storm+Graphite not working / Logs attached

2013-12-28 Thread Michael Rose
Check firewall settings and hosts.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Fri, Dec 27, 2013 at 6:51 AM, 程训焘  wrote:

> Hi, all,
>
> I am having a problem about verifying the storm's running statue and let
> it work properly with my Graphite.
>
> Previously, my zeromq and jzmq installed have a word mismatch problem and
> then I upgraded my JVM to the 64bit version. No mismatch errors any more.
> But, now, although the debug is enable, the worker logs only have
> information about the zookeeper and has no info about the running topology
> which is the storm.starter.WordCountTopology.
>
> And the topology reports metrics normally to Carbon-aggregator in local
> mode but the Carbon can receive nothing when the topology runs on the
> cluster. So my Graphite shows no data.
>
> What are possible directions shall I consider? What are the possible
> problems???
>
> I attached my logs and configs. Any help will be appreciated!! Thanks in
> advance!!!
>
> Regards,
> Cheng Xuntao
>


Re: Guaranteeing message processing on strom fails

2013-12-29 Thread Michael Rose
You are not guaranteed that tuples make it, only that if they go missing or
processing fails it will replayed from the spout "at least once execution"
On Dec 29, 2013 4:47 AM, "Michal Singer"  wrote:

> Hi, I am trying to test the guaranteeing of messages on storm:
>
>
>
> I have two nodes:
>
> 1.   Node-A contains: supervisor, ui, zookeeper, nimbus
>
> 2.   Node-B contains: supervisor
>
> I have 1 spout and 2 bolts (I am actually running the word counter test)
>
>
>
> I send about 17 messages.
>
> The first bolt is on Node-B
>
> The second bolt is divided to 4 executors which are divided between the
> nodes.
>
>
>
> I kill the worker on node-B and I expect the worker on the other node to
> get the data which was supposed to be sent to Node-B.
>
> But the worker is raised on node-B and the data is sent there accept for
> one touple which is missing.
>
>
>
> 1.   According to the ui – it is very difficult to see what is going
> on cause I guess there are messages resent and it is hard to know exactly
> what happened.
>
> 2.   According to my output – one message is missing which was
> supposed to go to node-B which I killed it’s worker.
>
>
>
> I defined anchors and I sent acks.
>
>
>
> So what am I missing? Why is there data loss?
>
>
>
> Thanks, Michal
>
>
>


Re: storm over WAN and NAT

2013-12-29 Thread Michael Rose
Generally speaking, I don't know of many services that work exceedingly
well over a WAN.

Can you not do processing at each location and forward it on with a queue
that isn't adverse to WAN links?
On Dec 29, 2013 10:03 AM, "Derrick Karimi"  wrote:

> Hello,
>
> I have a requirement for real time data processing where data sources are
> spread over a wide geographic area on  multiple networks.  Are there any
> known previous deployments, or any forseen issues with deploying a storm
> cluster where machines are behind firewalls/using nat, and spread out such
> that latency issues and network reliability issues will be noticeable?
>
> --
> --Derrick
>


Re: Guaranteeing message processing on strom fails

2013-12-29 Thread Michael Rose
What spout are you using? Guarantees must be enforced by spout. For
instance, the RabbitMQ spout doesn't ack an AMQP message until ack() is
called for that message tag (or rejects if fail() is called).

The spout must pass along a unique message identifier to enable this
behavior.

If a tuple goes missing somewhere along the line, fail() will be called
after a timeout. If you kill the acker that tuple was tracked with, it's
then up to the message queue or other impl to be able to replay that
message.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Sun, Dec 29, 2013 at 11:39 PM, Michal Singer wrote:

> But what I see is that some of the tuples are not replayed. I print out
> all the tuples and some don’t arrive when I kill a worker process.
>
> Thanks, Michal
>
>
>
> *From:* Michael Rose [mailto:mich...@fullcontact.com]
> *Sent:* Sunday, December 29, 2013 7:26 PM
> *To:* user@storm.incubator.apache.org
> *Subject:* Re: Guaranteeing message processing on strom fails
>
>
>
> You are not guaranteed that tuples make it, only that if they go missing
> or processing fails it will replayed from the spout "at least once
> execution"
>
> On Dec 29, 2013 4:47 AM, "Michal Singer"  wrote:
>
> Hi, I am trying to test the guaranteeing of messages on storm:
>
>
>
> I have two nodes:
>
> 1.   Node-A contains: supervisor, ui, zookeeper, nimbus
>
> 2.   Node-B contains: supervisor
>
> I have 1 spout and 2 bolts (I am actually running the word counter test)
>
>
>
> I send about 17 messages.
>
> The first bolt is on Node-B
>
> The second bolt is divided to 4 executors which are divided between the
> nodes.
>
>
>
> I kill the worker on node-B and I expect the worker on the other node to
> get the data which was supposed to be sent to Node-B.
>
> But the worker is raised on node-B and the data is sent there accept for
> one touple which is missing.
>
>
>
> 1.   According to the ui – it is very difficult to see what is going
> on cause I guess there are messages resent and it is hard to know exactly
> what happened.
>
> 2.   According to my output – one message is missing which was
> supposed to go to node-B which I killed it’s worker.
>
>
>
> I defined anchors and I sent acks.
>
>
>
> So what am I missing? Why is there data loss?
>
>
>
> Thanks, Michal
>
>
>


Re: Workers elasticity

2014-01-06 Thread Michael Rose
Each machine can support a configurable number of workers. If a machine
goes away, it'll attempt to reassign the orphaned workers to other machines
using the fair scheduler.

1) Should you not have enough slots (worker slots), your topology will be a
'broken' state. However if you allow 4 workers per machine and run your
topology with 8 workers, you can run 2 minimum machines and still support
the topology.

If you are to auto-scale your workers, you'll need a long cooldown time
between changes. A rebalance isn't instant and must allow the topology to
drain before reshuffling workers. A rebalance will wait
{TUPLE_TIMEOUT_TIME} before rebalancing. Additionally, when adding workers
you'll need to trigger a rebalance.

2) If workers > slots, the topology will attempt to function but ultimately
freeze as the send buffers to that worker fill. I'm not sure what you mean
'round-robin fashion to distribute load' -- the ShuffleGrouping will
partition work across tasks on an even basis.

3) Yes, in storm.yaml, supervisor.slots.ports. By default it'll run with 4
slots per machine. See
https://github.com/nathanmarz/storm/blob/master/conf/defaults.yaml#L77

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Mon, Jan 6, 2014 at 11:08 AM, Spico Florin  wrote:

> Hello!
>  I'm newbie to storm and also to Amazon Cloud. I have the following
> scenario:
>
>   1. I have topology that runs on 3 workers on EC2.
>   2. Due to the increasing load, EC2 intantiates 2 new instances and I
> have to rebalance to 5 workers.
>   3. After the resource demand, EC2 released 2 instances and  I'm
> forgetting to decrease the number of workers to 3.
>
> Questions:
> 1.So, in this case what is the behavior of the application? Will sign an
> error that there are more workers allocated then existing machines? Or will
> continue to run as nothing has happened?
>
> 2.More generally, what is the behavior of the application that declares
> more workers than the number of instances? Do we have a round robin fashion
> to distribute load among the workers?
>
> 3.Can I declare more workers on the same machine? If yes how?
>
> I look forward for your answers.
>
> Regards,
>   Florin
>
>


Re: Storm Performance

2014-01-10 Thread Michael Rose
Post your code.  Even Dev mode is far faster for us.
On Jan 10, 2014 8:44 AM, "Klausen Schaefersinho" 
wrote:

> > I've benched storm at 1.8 million tuples per second on a big (24 core)
> box using local or shuffle grouping between a spout and bolt.
> Production or development mode?
>
> > you're only seeing 10 events per second make sure you don't have any
> sleeps
> Yeah I checked  for sleeps etc. and stripped down my code. Now my bolts do
> nothing and the spout just creates random data...
>
>
>
>
> On Fri, Jan 10, 2014 at 4:37 PM, Nathan Leung  wrote:
>
>> I've benched storm at 1.8 million tuples per second on a big (24 core)
>> box using local or shuffle grouping between a spout and bolt. If you're
>> only seeing 10 events per second make sure you don't have any sleeps
>> (whether in your code or elsewhere e.g. a library or triggered due to lack
>> of data in the storm spout). Also note that the default sleep period in the
>> spout when there is no data (in storm 0.9) is I believe 1ms so even if
>> you're hitting this condition you should see much more than 10 events per
>> second.
>>  On Jan 10, 2014 10:19 AM, "Jon Logan"  wrote:
>>
>>> Why would you benchmark local mode? It's intended for debugging and
>>> development purposes, not actual production use...
>>>
>>>
>>> per the website, Storm itself is  benchmarked at > 1 million tuples per
>>> second per node.
>>>
>>>
>>> On Fri, Jan 10, 2014 at 8:40 AM, Klausen Schaefersinho <
>>> klaus.schaef...@gmail.com> wrote:
>>>
 Hi,

 how is the performance of a real cluster compared to a local
 development cluster? I was trying to benchmark Storm now and found  that it
 only manages to consume 10 events per second in a simple topology which
 only consists out of i spout and one bolt where the spout creates random
 objects and the bolt acknowledge the tuple.

 10 events terrible slow, and I am not sure if this is related to the
 develop mode.

 Cheers,

 Klaus

>>>
>>>
>


Re: ZeroMQ Exception causes Topology to get killed

2014-01-10 Thread Michael Rose
What version of ZeroMQ are you running?

You should be running 2.1.7 with nathan's provided fork of JZMQ.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Fri, Jan 10, 2014 at 9:09 PM, Gaurav Sehgal  wrote:

> Hi,
>  I am getting the following exception in the cluster. These exceptions
> happen in the worker log; but they eventually cause to topology to die. Can
> anyone please share some inputs.
>
>
> java.lang.UnsatisfiedLinkError: org.zeromq.ZMQ$Socket.destroy()V
> at org.zeromq.ZMQ$Socket.destroy(Native Method)
> at org.zeromq.ZMQ$Socket.close(ZMQ.java:432)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:298)
> at backtype.storm.messaging.zmq.ZMQConnection.close(zmq.clj:45)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:298)
> at
> backtype.storm.daemon.worker$mk_refresh_connections$this__4293.invoke(worker.clj:252)
> at
> backtype.storm.daemon.worker$mk_refresh_connections$this__4293.invoke(worker.clj:218)
> at
> backtype.storm.timer$schedule_recurring$this__1776.invoke(timer.clj:69)
> at backtype.storm.timer$mk_timer$fn__1759$fn__1760.invoke(timer.clj:33)
> at backtype.storm.timer$mk_timer$fn__1759.invoke(timer.clj:26)
> at clojure.lang.AFn.run(AFn.java:24)
> at java.lang.Thread.run(Thread.java:662)
>
>
> Cheers!
> Gaurav
>
>
>


Re: http-client version conflict

2014-02-06 Thread Michael Rose
We've done this with SLF4j and Guava as well without issues.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Thu, Feb 6, 2014 at 3:03 PM, Mark Greene  wrote:

> We had this problem as well. We modified our chef cookbook to just replace
> the older version with the newer one and storm didn't complain or have any
> other issues as a result.
>
>
> On Wed, Feb 5, 2014 at 10:31 AM, P. Taylor Goetz wrote:
>
>> Your best bet is probably  to use the shade plugin to relocate the
>> http-client package so it doesn't conflict with the version storm uses.
>>
>> Storm does this with the libtrhift dependency in storm-core:
>>
>>
>> https://github.com/apache/incubator-storm/blob/master/storm-core/pom.xml#L220
>>
>> (You can ignore the clojure transformer in that config, unless you have
>> non-AOT clojure code that uses the http-client library).
>>
>> More information on using the shade plugin to do package relocations can
>> be found here:
>>
>>
>> http://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html
>>
>> - Taylor
>>
>> On Feb 4, 2014, at 4:27 PM, Vinay Pothnis 
>> wrote:
>>
>> > Hello,
>> >
>> > I am using storm version 0.9.0.1.
>> > My application depends on apache http-client version 4.3.2 - but storm
>> depends on http-client version 4.1.1.
>> >
>> > What is the best way to override this dependency?
>> >
>> > Thanks
>> > Vinay
>>
>>
>


Re: Bolts with long instantiation times -- A.K.A. Zookeeper shenanigans

2014-02-18 Thread Michael Rose
You may need to configure your cluster to give it more time to start up.
Additionally, knowing how long it can take to load the Stanford NLP models,
make sure you're only doing it in a single bolt instance (e.g. static
initializer or double-check synch) and sharing it between all your bolt
instances.

supervisor.worker.start.timeout.secs 120
supervisor.worker.timeout.secs 60

I'd try tuning your worker start timeout here. Try setting it up to 300s
and (again) ensuring your prepare method only initializes expensive
resources once, then shares them between instances in the JVM.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Tue, Feb 18, 2014 at 1:45 PM, Eddie Santos  wrote:

> Hi all,
>
> How do you get bolts that take a ludicrously long time to load (we're
> talking minutes here) to cooperate with Zookeeper?
>
> I may not be understanding my problem properly, but on my test cluster
> (**not** in local mode!) my bolt keeps getting restarted in the middle of
> its prepare() method -- which may take up to two minutes to return.
>
> The problem seems to be the " Client session timed out", but I'm not
> knowledgable enough with Zookeeper to really know how to fix this.
>
> Here's a portion of logs from the supervisor affected. The STDIO messages
> come from a poorly-coded third party library that I have to use.
>
> 2014-01-17 23:19:28 o.a.z.ClientCnxn [INFO] Client session timed out,
> have not heard from server in 2747ms for sessionid 0x143a22eb4060078,
> closing socket connection and attempting reconnect
> 2014-01-17 23:19:28 b.s.d.worker [DEBUG] Doing heartbeat
> #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139768,
> :storm-id "nlptools-test-1-139740", :executors #{[3 3] [6 6] [-1 -1]},
> :port 6702}
> 2014-01-17 23:19:28 b.s.d.worker [DEBUG] Doing heartbeat
> #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139768,
> :storm-id "nlptools-test-1-139740", :executors #{[3 3] [6 6] [-1 -1]},
> :port 6702}
> 2014-01-17 23:19:28 c.n.c.f.s.ConnectionStateManager [INFO] State
> change: SUSPENDED
> 2014-01-17 23:19:28 c.n.c.f.s.ConnectionStateManager [WARN] There are
> no ConnectionStateListeners registered.
> 2014-01-17 23:19:28 b.s.cluster [WARN] Received event
> :disconnected::none: with disconnected Zookeeper.
> 2014-01-17 23:19:28 b.s.cluster [WARN] Received event
> :disconnected::none: with disconnected Zookeeper.
> 2014-01-17 23:19:28 STDIO [ERROR] done [7.2 sec].
> 2014-01-17 23:19:28 STDIO [ERROR] Adding annotator lemma
> 2014-01-17 23:19:28 STDIO [ERROR] Adding annotator ner
> 2014-01-17 23:19:28 STDIO [ERROR] Loading classifier from
> edu/stanford/nlp/models/ner/english.all.3class.distsim.crf.ser.gz
> 2014-01-17 23:19:28 STDIO [ERROR] ...
> 2014-01-17 23:19:29 b.s.d.worker [DEBUG] Doing heartbeat
> #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139769,
> :storm-id "nlptools-test-1-139740", :executors #{[3 3] [6 6] [-1 -1]},
> :port 6702}
> 2014-01-17 23:19:29 b.s.d.worker [DEBUG] Doing heartbeat
> #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139769,
> :storm-id "nlptools-test-1-139740", :executors #{[3 3] [6 6] [-1 -1]},
> :port 6702}
> 2014-01-17 23:19:30 o.a.z.ClientCnxn [INFO] Opening socket connection
> to server zookeeper/192.168.50.3:2181
>
>   ^-- This is where the bolt gets restarted in its initialization.
>
> Thanks,
> Eddie
>


Re: JDBC Connections

2014-02-20 Thread Michael Rose
We generally instantiate a pool per JVM, where the maxActive is (# of bolts
using JDBC/num workers+1) (e.g. 64 bolts, 4 workers, 17 conns/JVM).

Pooling is our preferred strategy as the pool will shrink (and thus use
less memory on the corresponding SQL server) if it's not being utilized. In
either case, t's pushing the management, verification, and reestablishment
of broken connections into the pool (which is also why we have 1 extra conn
-- for when a conn is tied up running a validation query or is being
reestablished).

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Thu, Feb 20, 2014 at 8:28 AM, Richards Peter wrote:

> Hi Pablo,
>
> Your approach is fine. Alternative approach proposed by Brian can be used
> if you can group the relevant bolt instance/es, which are communicating to
> the database, into the appropriate worker process/es. However with the
> alternative approach you shouldn't end up creating separate pools in all
> the workers used by the topology.
>
> Hope this helps.
>
> Richards Peter.
>
>
>
>
> On Thu, Feb 20, 2014 at 8:33 PM, Pablo Acuña  wrote:
>
>> I keep one connection per bolt and for now it works just fine with many
>> bolts. I would also be interested in hearing from someone else and share
>> experiences.
>>
>> For now, I open the connection in the method prepare (and close it in
>> cleanup), but to be completely honest, I'm not 100% sure if this is the
>> best approach.
>>
>> cheers,
>> Pablo.
>>
>>
>> On Wed, Feb 19, 2014 at 4:41 PM, Brian O'Neill wrote:
>>
>>> Yes.  We use JDBI to access MySQL.
>>>
>>> We've had success with a shared connection pool. (one per JVM)
>>> The bolts in the JVM share the pool.
>>>
>>> But either approach should work (pool per bolt, or connection per bolt).
>>> It just depends at what level you want to manage your connections.
>>>
>>> We do it at the worker level.  (n connections per worker)
>>>
>>> -brian
>>>
>>> ---
>>>
>>> Brian O'Neill
>>>
>>> Chief Technology Officer
>>>
>>>
>>> *Health Market Science*
>>>
>>> *The Science of Better Results*
>>>
>>> 2700 Horizon Drive * King of Prussia, PA * 19406
>>>
>>> M: 215.588.6024 * @boneill42 <http://www.twitter.com/boneill42>  *
>>>
>>> healthmarketscience.com
>>>
>>>
>>> This information transmitted in this email message is for the intended
>>> recipient only and may contain confidential and/or privileged material. If
>>> you received this email in error and are not the intended recipient, or the
>>> person responsible to deliver it to the intended recipient, please contact
>>> the sender at the email above and delete this email and any attachments and
>>> destroy any copies thereof. Any review, retransmission, dissemination,
>>> copying or other use of, or taking any action in reliance upon, this
>>> information by persons or entities other than the intended recipient is
>>> strictly prohibited.
>>>
>>>
>>>
>>>
>>> From: Klausen Schaefersinho 
>>> Reply-To: 
>>> Date: Wednesday, February 19, 2014 at 5:58 AM
>>> To: 
>>> Subject: JDBC Connections
>>>
>>> Hi,
>>>
>>> one of my bolt will need to write to a MySql data base. Does anybody has
>>> some experience with this? What are the best practices? Use an connection
>>> pool? Or keep one connection open per bolt?
>>>
>>> Cheers,
>>>
>>> klaus
>>>
>>
>>
>


Re: Tuning and nimbus at 99%

2014-03-02 Thread Michael Rose
Are you running Zookeeper on the same machine as the Nimbus box?

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Sun, Mar 2, 2014 at 6:16 PM, Sean Solbak  wrote:

> This is the first step of 4. When I save to db I'm actually saving to a
> queue, (just using db for now).  The 2nd step we index the data and 3rd we
> do aggregation/counts for reporting.  The last is a search that I'm
> planning on using drpc for.  Within step 2 we pipe certain datasets in real
> time to the clients it applies to.  I'd like this and the drpc to be sub 2s
> which should be reasonable.
>
> Your right that I could speed up step 1 by not using trident but our
> requirements seem like a good use case for the other 3 steps.  With many
> results per second batching should effect performance a ton if the batch
> size is small enough.
>
> What would cause nimbus to be at 100% CPU with the topologies killed?
>
> Sent from my iPhone
>
> On Mar 2, 2014, at 5:46 PM, Sean Allen 
> wrote:
>
> Is there a reason you are using trident?
>
> If you don't need to handle the events as a batch, you are probably going
> to get performance w/o it.
>
>
> On Sun, Mar 2, 2014 at 2:23 PM, Sean Solbak  wrote:
>
>> Im writing a fairly basic trident topology as follows:
>>
>> - 4 spouts of events
>> - merges into one stream
>> - serializes the object as an event in a string
>> - saves to db
>>
>> I split the serialization task away from the spout as it was cpu
>> intensive to speed it up.
>>
>> The problem I have is that after 10 minutes there is over 910k tuples
>> emitted/transfered but only 193k records are saved.
>>
>> The overall load of the topology seems fine.
>>
>> - 536.404 ms complete latency at the topolgy level
>> - The highest capacity of any bolt is 0.3 which is the serialization one.
>> - each bolt task has sub 20 ms execute latency and sub 40 ms process
>> latency.
>>
>> So it seems trident has all the records internally, but I need these
>> events as close to realtime as possible.
>>
>> Does anyone have any guidance as to how to increase the throughput?  Is
>> it simply a matter of tweeking max spout pending and the batch size?
>>
>> Im running it on 2 m1-smalls for now.  I dont see the need to upgrade it
>> until the demand on the boxes seems higher.  Although CPU usage on the
>> nimbus box is pinned.  Its at like 99%.  Why would that be?  Its at 99%
>> even when all the topologies are killed.
>>
>> We are currently targeting processing 200 million records per day which
>> seems like it should be quite easy based on what Ive read that other people
>> have achieved.  I realize that hardware should be able to boost this as
>> well but my first goal is to get trident to push the records to the db
>> quicker.
>>
>> Thanks in advance,
>> Sean
>>
>>
>
>
> --
>
> Ce n'est pas une signature
>
>


Re: Netty Errors, chain reaction, topology breaks down

2014-03-02 Thread Michael Rose
Right now we're having slow, off-heap memory leaks, unknown if these are
linked to Netty (yet). When the workers inevitably get OOMed, the topology
will rarely recover gracefully with similar Netty timeouts. Sounds like
we'll be heading back to 0mq.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Sun, Mar 2, 2014 at 5:44 PM, Sean Allen wrote:

> We have the same issue and after attempting a few fixes, we switched back
> to using 0mq for now.
>
>
> On Sun, Mar 2, 2014 at 2:46 PM, Drew Goya  wrote:
>
>> Hey All, I'm running a 0.9.0.1 storm topology in AWS EC2 and I
>> occasionally run into a strange and pretty catastrophic error.  One of my
>> workers is either overloaded or stuck and gets killed and restarted.  This
>> usually works fine but once in a while the whole topology breaks down, all
>> the workers are killed and restarted continually.  Looking through the logs
>> it looks like some netty errors on initialization kill the Async Loop.  The
>> topology is never able to recover, I have to kill it manually and relaunch
>> it.
>>
>> Is this something anyone else has come across?  Any tips? Config settings
>> I could change?
>>
>> This is a pastebin of the errors:  http://pastebin.com/XXZBsEj1
>>
>
>
>
> --
>
> Ce n'est pas une signature
>


Re: Zookeepr on different ports

2014-03-02 Thread Michael Rose
I'd recommend just using one Zookeeper instance if they're on the same
physical host. There's no reason why a development ZK ensemble needs 3
nodes.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Sun, Mar 2, 2014 at 10:15 AM, Arun Sethia  wrote:

> Hi,
>
> We have setup three zookeeper instances on one virtual machine, they
> are running on different ports (2181,2182,2183).
>
> Eventually in production we will have each instance on separate
> virtual machine and we can have same port (2181).
>
> We have seen we can configure multiple zookeeper instance (cluster)
> using storm.zookeeper.servers, and we can use storm.zookeeper.port to
> define a port.
>
> Since we have zookeeper on one machine on different ports
> (2181,2182,2183), but not able to configure different ports using
> storm.zookeeper.port.
>
> Any help will be great for us.
>
> Regards,
> Arun
>


Re: Tuning and nimbus at 99%

2014-03-02 Thread Michael Rose
Can you do a thread dump and pastebin it? It's a nice first step to figure
this out.

I just checked on our Nimbus and while it's on a larger machine, it's using
<1% CPU. Also look in your logs for any clues.


Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Sun, Mar 2, 2014 at 6:31 PM, Sean Solbak  wrote:

> No, they are on seperate machines.  Its a 4 machine cluster - 2 workers, 1
> nimbus and 1 zookeeper.
>
> I suppose I could just create a new cluster but Id like to know why this
> is occurring to avoid future production outages.
>
> Thanks,
> S
>
>
>
> On Sun, Mar 2, 2014 at 6:19 PM, Michael Rose wrote:
>
>> Are you running Zookeeper on the same machine as the Nimbus box?
>>
>>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>> mich...@fullcontact.com
>>
>>
>> On Sun, Mar 2, 2014 at 6:16 PM, Sean Solbak  wrote:
>>
>>> This is the first step of 4. When I save to db I'm actually saving to a
>>> queue, (just using db for now).  The 2nd step we index the data and 3rd we
>>> do aggregation/counts for reporting.  The last is a search that I'm
>>> planning on using drpc for.  Within step 2 we pipe certain datasets in real
>>> time to the clients it applies to.  I'd like this and the drpc to be sub 2s
>>> which should be reasonable.
>>>
>>> Your right that I could speed up step 1 by not using trident but our
>>> requirements seem like a good use case for the other 3 steps.  With many
>>> results per second batching should effect performance a ton if the batch
>>> size is small enough.
>>>
>>> What would cause nimbus to be at 100% CPU with the topologies killed?
>>>
>>> Sent from my iPhone
>>>
>>> On Mar 2, 2014, at 5:46 PM, Sean Allen 
>>> wrote:
>>>
>>> Is there a reason you are using trident?
>>>
>>> If you don't need to handle the events as a batch, you are probably
>>> going to get performance w/o it.
>>>
>>>
>>> On Sun, Mar 2, 2014 at 2:23 PM, Sean Solbak  wrote:
>>>
>>>> Im writing a fairly basic trident topology as follows:
>>>>
>>>> - 4 spouts of events
>>>> - merges into one stream
>>>> - serializes the object as an event in a string
>>>> - saves to db
>>>>
>>>> I split the serialization task away from the spout as it was cpu
>>>> intensive to speed it up.
>>>>
>>>> The problem I have is that after 10 minutes there is over 910k tuples
>>>> emitted/transfered but only 193k records are saved.
>>>>
>>>> The overall load of the topology seems fine.
>>>>
>>>> - 536.404 ms complete latency at the topolgy level
>>>> - The highest capacity of any bolt is 0.3 which is the serialization
>>>> one.
>>>> - each bolt task has sub 20 ms execute latency and sub 40 ms process
>>>> latency.
>>>>
>>>> So it seems trident has all the records internally, but I need these
>>>> events as close to realtime as possible.
>>>>
>>>> Does anyone have any guidance as to how to increase the throughput?  Is
>>>> it simply a matter of tweeking max spout pending and the batch size?
>>>>
>>>> Im running it on 2 m1-smalls for now.  I dont see the need to upgrade
>>>> it until the demand on the boxes seems higher.  Although CPU usage on the
>>>> nimbus box is pinned.  Its at like 99%.  Why would that be?  Its at 99%
>>>> even when all the topologies are killed.
>>>>
>>>> We are currently targeting processing 200 million records per day which
>>>> seems like it should be quite easy based on what Ive read that other people
>>>> have achieved.  I realize that hardware should be able to boost this as
>>>> well but my first goal is to get trident to push the records to the db
>>>> quicker.
>>>>
>>>> Thanks in advance,
>>>> Sean
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> Ce n'est pas une signature
>>>
>>>
>>
>
>
> --
> Thanks,
>
> Sean Solbak, BsC, MCSD
> Solbak Technologies Inc.
> 780.893.7326 (m)
>


Re: Tuning and nimbus at 99%

2014-03-02 Thread Michael Rose
I'm not seeing too much to substantiate that. What size heap are you
running, and is it near filled? Perhaps attach VisualVM and check for GC
activity.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Sun, Mar 2, 2014 at 6:54 PM, Sean Solbak  wrote:

> Here it is.  Appears to be some kind of race condition.
>
> http://pastebin.com/dANT8SQR
>
>
> On Sun, Mar 2, 2014 at 6:42 PM, Michael Rose wrote:
>
>> Can you do a thread dump and pastebin it? It's a nice first step to
>> figure this out.
>>
>> I just checked on our Nimbus and while it's on a larger machine, it's
>> using <1% CPU. Also look in your logs for any clues.
>>
>>
>> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>> mich...@fullcontact.com
>>
>>
>> On Sun, Mar 2, 2014 at 6:31 PM, Sean Solbak  wrote:
>>
>>> No, they are on seperate machines.  Its a 4 machine cluster - 2 workers,
>>> 1 nimbus and 1 zookeeper.
>>>
>>> I suppose I could just create a new cluster but Id like to know why this
>>> is occurring to avoid future production outages.
>>>
>>> Thanks,
>>> S
>>>
>>>
>>>
>>> On Sun, Mar 2, 2014 at 6:19 PM, Michael Rose wrote:
>>>
>>>> Are you running Zookeeper on the same machine as the Nimbus box?
>>>>
>>>>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>>>> mich...@fullcontact.com
>>>>
>>>>
>>>> On Sun, Mar 2, 2014 at 6:16 PM, Sean Solbak  wrote:
>>>>
>>>>> This is the first step of 4. When I save to db I'm actually saving to
>>>>> a queue, (just using db for now).  The 2nd step we index the data and 3rd
>>>>> we do aggregation/counts for reporting.  The last is a search that I'm
>>>>> planning on using drpc for.  Within step 2 we pipe certain datasets in 
>>>>> real
>>>>> time to the clients it applies to.  I'd like this and the drpc to be sub 
>>>>> 2s
>>>>> which should be reasonable.
>>>>>
>>>>> Your right that I could speed up step 1 by not using trident but our
>>>>> requirements seem like a good use case for the other 3 steps.  With many
>>>>> results per second batching should effect performance a ton if the batch
>>>>> size is small enough.
>>>>>
>>>>> What would cause nimbus to be at 100% CPU with the topologies killed?
>>>>>
>>>>> Sent from my iPhone
>>>>>
>>>>> On Mar 2, 2014, at 5:46 PM, Sean Allen 
>>>>> wrote:
>>>>>
>>>>> Is there a reason you are using trident?
>>>>>
>>>>> If you don't need to handle the events as a batch, you are probably
>>>>> going to get performance w/o it.
>>>>>
>>>>>
>>>>> On Sun, Mar 2, 2014 at 2:23 PM, Sean Solbak  wrote:
>>>>>
>>>>>> Im writing a fairly basic trident topology as follows:
>>>>>>
>>>>>> - 4 spouts of events
>>>>>> - merges into one stream
>>>>>> - serializes the object as an event in a string
>>>>>> - saves to db
>>>>>>
>>>>>> I split the serialization task away from the spout as it was cpu
>>>>>> intensive to speed it up.
>>>>>>
>>>>>> The problem I have is that after 10 minutes there is over 910k tuples
>>>>>> emitted/transfered but only 193k records are saved.
>>>>>>
>>>>>> The overall load of the topology seems fine.
>>>>>>
>>>>>> - 536.404 ms complete latency at the topolgy level
>>>>>> - The highest capacity of any bolt is 0.3 which is the serialization
>>>>>> one.
>>>>>> - each bolt task has sub 20 ms execute latency and sub 40 ms process
>>>>>> latency.
>>>>>>
>>>>>> So it seems trident has all the records internally, but I need these
>>>>>> events as close to realtime as possible.
>>>>>>
>>>>>> Does anyone have any guidance as to how to increase the throughput?
>>>>>>  Is it simply a matter of tweeking max spout pending and the batch size?
>>>>>>
>>>>>> Im running it on 2 m1-smalls for now.  I dont see the need to upgrade
>>>>>> it until the demand on the boxes seems higher.  Although CPU usage on the
>>>>>> nimbus box is pinned.  Its at like 99%.  Why would that be?  Its at 99%
>>>>>> even when all the topologies are killed.
>>>>>>
>>>>>> We are currently targeting processing 200 million records per day
>>>>>> which seems like it should be quite easy based on what Ive read that 
>>>>>> other
>>>>>> people have achieved.  I realize that hardware should be able to boost 
>>>>>> this
>>>>>> as well but my first goal is to get trident to push the records to the db
>>>>>> quicker.
>>>>>>
>>>>>> Thanks in advance,
>>>>>> Sean
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Ce n'est pas une signature
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Thanks,
>>>
>>> Sean Solbak, BsC, MCSD
>>> Solbak Technologies Inc.
>>> 780.893.7326 (m)
>>>
>>
>>
>
>
> --
> Thanks,
>
> Sean Solbak, BsC, MCSD
> Solbak Technologies Inc.
> 780.893.7326 (m)
>


Re: Tuning and nimbus at 99%

2014-03-02 Thread Michael Rose
The fact that the process is being killed constantly is a red flag. Also,
why are you running it as a client VM?

Check your nimbus.log to see why it's restarting.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Sun, Mar 2, 2014 at 7:50 PM, Sean Solbak  wrote:

>   uintx ErgoHeapSizeLimit = 0
> {product}
> uintx InitialHeapSize  := 27080896
>  {product}
> uintx LargePageHeapSizeThreshold= 134217728
> {product}
> uintx MaxHeapSize  := 698351616
> {product}
>
>
> so initial size of ~25mb and max of ~666 mb
>
> Its a client process (not server ie the command is "java -client
> -Dstorm.options...").  The process gets killed and restarted continously
> with a new PID (which makes getting the PID tough to get stats on).  I dont
> have VisualVM but if I run
>
> jstat -gc PID, I get
>
>  S0CS1CS0US1U  EC   EUOC OU   PC
>   PUYGC YGCTFGCFGCT GCT
> 832.0  832.0   0.0   352.9   7168.0   1115.9   17664.0 1796.0
> 21248.0 16029.6  50.268   0  0.0000.268
>
> At this point I'll likely just rebuild the cluster.  Its not in prod yet
> as I still need to tune it.  I should have wrote 2 separate emails :)
>
> Thanks,
> S
>
>
>
>
> On Sun, Mar 2, 2014 at 7:10 PM, Michael Rose wrote:
>
>> I'm not seeing too much to substantiate that. What size heap are you
>> running, and is it near filled? Perhaps attach VisualVM and check for GC
>> activity.
>>
>>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>> mich...@fullcontact.com
>>
>>
>> On Sun, Mar 2, 2014 at 6:54 PM, Sean Solbak  wrote:
>>
>>> Here it is.  Appears to be some kind of race condition.
>>>
>>> http://pastebin.com/dANT8SQR
>>>
>>>
>>> On Sun, Mar 2, 2014 at 6:42 PM, Michael Rose wrote:
>>>
>>>> Can you do a thread dump and pastebin it? It's a nice first step to
>>>> figure this out.
>>>>
>>>> I just checked on our Nimbus and while it's on a larger machine, it's
>>>> using <1% CPU. Also look in your logs for any clues.
>>>>
>>>>
>>>> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>>>> mich...@fullcontact.com
>>>>
>>>>
>>>> On Sun, Mar 2, 2014 at 6:31 PM, Sean Solbak  wrote:
>>>>
>>>>> No, they are on seperate machines.  Its a 4 machine cluster - 2
>>>>> workers, 1 nimbus and 1 zookeeper.
>>>>>
>>>>> I suppose I could just create a new cluster but Id like to know why
>>>>> this is occurring to avoid future production outages.
>>>>>
>>>>> Thanks,
>>>>> S
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Mar 2, 2014 at 6:19 PM, Michael Rose 
>>>>> wrote:
>>>>>
>>>>>> Are you running Zookeeper on the same machine as the Nimbus box?
>>>>>>
>>>>>>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>>>>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>>>>>> mich...@fullcontact.com
>>>>>>
>>>>>>
>>>>>> On Sun, Mar 2, 2014 at 6:16 PM, Sean Solbak  wrote:
>>>>>>
>>>>>>> This is the first step of 4. When I save to db I'm actually saving
>>>>>>> to a queue, (just using db for now).  The 2nd step we index the data and
>>>>>>> 3rd we do aggregation/counts for reporting.  The last is a search that 
>>>>>>> I'm
>>>>>>> planning on using drpc for.  Within step 2 we pipe certain datasets in 
>>>>>>> real
>>>>>>> time to the clients it applies to.  I'd like this and the drpc to be 
>>>>>>> sub 2s
>>>>>>> which should be reasonable.
>>>>>>>
>>>>>>> Your right that I could speed up step 1 by not using trident but our
>>>>>>> requirements seem like a good use case for the other 3 steps.  With many
>>>>>>> results per se

Re: Tuning and nimbus at 99%

2014-03-03 Thread Michael Rose
Otis,

I'm a fan of SPM for Storm, but there's other debugging that needs to be
done here if the process quits constantly.

Sean,

Since you're using storm-deploy, I assume the processes are running under
supervisor. It might be worth killing the supervisor by hand, then running
it yourself (ssh as storm, cd storm/daemon, supervise .) and seeing what
kind of errors you see.

Are your disks perhaps filled?

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Mon, Mar 3, 2014 at 6:49 PM, Otis Gospodnetic  wrote:

> Hi Sean,
>
> I don't think you can see the metrics you need to see with AWS CloudWatch.
>  Have a look at SPM for Storm.  You can share graphs from SPM directly if
> you want, so you don't have to grab and attach screenshots manually. See:
>
> http://blog.sematext.com/2014/01/30/announcement-apache-storm-monitoring-in-spm/+
> http://sematext.com/spm/
>
> My bet is that you'll see GC metrics spikes
>
> Otis
> --
> Performance Monitoring * Log Analytics * Search Analytics
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Mon, Mar 3, 2014 at 8:21 PM, Sean Solbak  wrote:
>
>> I just created a brand new cluster with storm-deploy command.
>>
>> lein deploy-storm --start --name storm-dev --commit
>> 1bcc169f5096e03a4ae117efc65c0f9bcfa2fa22
>>
>> I had a meeting, did nothing to the box, no topologies were run.  I came
>> back 2 hours later and nimbus was at 100% cpu.
>>
>> I'm running on an m1-small on the following ami - ami-58a3cf68. Im
>> unable to get a threaddump as the process is getting killed and restarted
>> too fast.  I did attach a 3 hour snapshot of the ec2 monitors.  Any
>> guidance would be much appreciated.
>>
>> Thanks,
>> S
>>
>>
>>
>>
>>
>>
>> On Sun, Mar 2, 2014 at 9:11 PM, Sean Solbak  wrote:
>>
>>> The only error in the logs is which happened over 10 days ago was.
>>>
>>> 2014-02-22 01:41:27 b.s.d.nimbus [ERROR] Error when processing event
>>> java.io.IOException: Unable to delete directory
>>> /mnt/storm/nimbus/stormdist/test-25-1393022928.
>>> at
>>> org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:981)
>>> ~[commons-io-1.4.jar:1.4]
>>> at
>>> org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1381)
>>> ~[commons-io-1.4.jar:1.4]
>>> at backtype.storm.util$rmr.invoke(util.clj:442)
>>> ~[storm-core-0.9.0.1.jar:na]
>>> at
>>> backtype.storm.daemon.nimbus$do_cleanup.invoke(nimbus.clj:819)
>>> ~[storm-core-0.9.0.1.jar:na]
>>> at
>>> backtype.storm.daemon.nimbus$fn__5528$exec_fn__1229__auto5529$fn__5534.invoke(nimbus.clj:896)
>>> ~[storm-core-0.9.0.1.jar:na]
>>> at
>>> backtype.storm.timer$schedule_recurring$this__3019.invoke(timer.clj:77)
>>> ~[storm-core-0.9.0.1.jar:na]
>>> at
>>> backtype.storm.timer$mk_timer$fn__3002$fn__3003.invoke(timer.clj:33)
>>> ~[storm-core-0.9.0.1.jar:na]
>>> at backtype.storm.timer$mk_timer$fn__3002.invoke(timer.clj:26)
>>> ~[storm-core-0.9.0.1.jar:na]
>>> at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.4.0.jar:na]
>>> at java.lang.Thread.run(Thread.java:701) ~[na:1.6.0_27]
>>>
>>> Its fine.  I can rebuild a new cluster.  Storm deploy makes it pretty
>>> easy.
>>>
>>> Thanks for you help on this!
>>>
>>> As for my other question.
>>>
>>> If my trident batch interval is 500ms and I keep the spout pending and
>>> batch size small enough, will I be able to get real time results (ie sub 2
>>> seconds)?  I've played with the various metrics (I literally have a
>>> spreadsheet of parameters to results) and haven't been able to get it.  Am
>>> I just doing it wrong?  What would the key parameters be?  The complete
>>> latency is 500 ms but trident seems to be way behind despite non of my
>>> bolts having a capacity > 0.6.  This may have to do with nimbus being
>>> throttled so I will report back.  But if there are people out there who
>>> have done this kind of thing, Id like to know if Im missing an obvious
>>> parameter or something.
>>>
>>> Thanks,
>>> S
>>>
>>>
>>>
>>> On Sun, Mar 2, 2014 at 8:09 PM, Michael Rose wrote:
>>>
>>>> The fact that the process is being killed constantly is a red

Re: Storm stream grouping examples

2014-03-05 Thread Michael Rose
What kind of comparisons are you looking for? How they functionally work?

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Wed, Mar 5, 2014 at 9:52 AM, Roberto Coluccio  wrote:

> Hello folks,
>
> I was unable to find any complete example (or, better, related work in the
> scientific literature) in which (almost) all the *stream grouping
> policies* have been used and compared. Do you have any reference you
> could please share with me?
>
> Thank you and best regards,
>
> Roberto Coluccio
>


Re: Storm stream grouping examples

2014-03-05 Thread Michael Rose
+1, localOrShuffle will be a winner, as long as it's evenly distributing
work. If 1 tuple could say produce a variable 1-100 resultant tuples (and
these results were expensive enough to process, e.g. IO), it might well be
worth shuffling vs. localShuffling.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Wed, Mar 5, 2014 at 11:19 AM, Nathan Leung  wrote:

> In my experience on a 1 Gb network localOrShuffleGrouping was a clear
> winner in terms of performance.  But I haven't tested with 10 Gb, and if
> you have substantial business logic then that becomes a bigger factor than
> serializing/transferring data on the network.  I think the performance of
> any given grouping is too dependent on your business logic; it will be
> difficult to quantify how well it performs in a canned benchmark.  And
> sometimes your business logic will define a grouping for you (e.g. fields
> grouping) whether it's the best performer or not.
>
>
> On Wed, Mar 5, 2014 at 1:05 PM, Roberto Coluccio <
> roberto.coluc...@gmail.com> wrote:
>
>> Hello Michael, thanks for your feedback.
>>
>> I'm looking for a performance comparison. I know that not all the
>> policies are "really comparable", but even obvious comparisons all listed
>> together could be a useful reference.
>>
>> Roberto
>>
>>
>> On Wed, Mar 5, 2014 at 6:58 PM, Michael Rose wrote:
>>
>>> What kind of comparisons are you looking for? How they functionally work?
>>>
>>> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>>> mich...@fullcontact.com
>>>
>>>
>>> On Wed, Mar 5, 2014 at 9:52 AM, Roberto Coluccio <
>>> roberto.coluc...@gmail.com> wrote:
>>>
>>>> Hello folks,
>>>>
>>>> I was unable to find any complete example (or, better, related work in
>>>> the scientific literature) in which (almost) all the *stream grouping
>>>> policies* have been used and compared. Do you have any reference you
>>>> could please share with me?
>>>>
>>>> Thank you and best regards,
>>>>
>>>> Roberto Coluccio
>>>>
>>>
>>>
>>
>


Re: Processing ack'd Tuples?

2014-03-05 Thread Michael Rose
https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/hooks/BaseTaskHook.java

Override this guy, apply the task hook in either your Storm Conf or in the
bolt its self.

List listOfHooks = Lists.newArrayList(MyTaskHook.class.getName());
stormConfig.put(Config.TOPOLOGY_AUTO_TASK_HOOKS, listOfHooks);

OR

In prepare(), topologyContext.addTaskTook()

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Wed, Mar 5, 2014 at 1:12 PM, Brian O'Neill  wrote:

>
> Sorry, we may have missed something obvious, but is there a way to count
> ack'd tuples?  We are looking for a hook that would give us access to tuple
> that has been fully processed.
>
> I'm guessing we could override the Spout implementation, but that seems a
> bit messy.
>
> We could make it the last thing in the topology, but if tuples were
> filtered along the way, we wouldn't see them.
>
> Has anyone done something similar?
>
> -brian
>
> --
> Brian ONeill
> CTO, Health Market Science (http://healthmarketscience.com)
> mobile:215.588.6024
> blog: http://brianoneill.blogspot.com/
> twitter: @boneill42
>


Re: Best way to get JMX access to storm metrics for spout tuples emitted / acked | bolt tuples emitted, latency etc.

2014-03-17 Thread Michael Rose
Depending on what you need, you could use the Thrift interface to Nimbus to
grab the statistics. The UI project does do some calculations on the raw
metrics, so it's not quite the same. The algorithms it uses aren't too
difficult to replicate.

We ended up building something very similar to what Ooyala did, customized
for our specific needs, it's an excellent pattern.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Mon, Mar 17, 2014 at 6:21 PM, Chris Bedford wrote:

>
>
> Hi, I'm interested in getting metrics via JMX on not only   " container
> level " factors (such as # of garbage collects, heap usage,etc.),   but
> also metrics that describe how spouts and bolts are performing  (e.g., # of
> tuples emitted, # transferred -- the same kind of stuff that the storm UI
> shows.)
>
> I ran across this project :
>  :
> https://github.com/ooyala/metrics_storm/blob/master/src/ooyala/common/metrics_storm/MetricsStorm.scala
>
>
> I was just wondering if this is the best best to pursue.  Does anyone have
> any concrete experience with this that they can share?
> thanx!
>
>  chris
>
>


Re: Server load - Topology optimization

2014-03-18 Thread Michael Rose
Well, I see you have 30 spouts instances and 3 bolt instances. Doesn't seem
like it's a huge bottleneck, but it is having to send over the wire much of
the time. Something to keep in mind for the future.

I'd be most suspicious of your spouts. All spouts on a single worker are
run single-threaded (in an event loop calling nextTuple()), so if you have
ANY blocking work that'll kill throughput. If AzureServiceBus is anything
like SQS, response times are not instant. We tend to have background
threads feeding a queue in our spouts for that reason, as we use SQS for
many of our topologies. Given 12 workers, you'd have ~3 spouts per machine
running single-threaded.

With spouts, you should attempt to maintain as little blocking work as
possible...if you're using a queue, you should be using Queue#poll() to
either return a value OR null, and only emit a tuple if an event is
available (and meets your criteria). Storm will handle rate limiting the
spouts with sleeps.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Tue, Mar 18, 2014 at 5:14 PM, David Crossland wrote:

>  Perhaps these screenshots might shed some light? I don't think there is
> much of a latency issue.  I'm really starting to suspect there is some
> consumption rate issue from the topic.
>
>  I set the spout to a high parallelism value as it did seem to improve
> throughput..
>
>  But if there is anything you can spot that would be grand
>
>  Thanks
>  David
>
>   *From:* Nathan Leung 
> *Sent:* ‎Tuesday‎, ‎18‎ ‎March‎ ‎2014 ‎21‎:‎14
> *To:* user@storm.incubator.apache.org
>
>  It could be bolt 3.  What is the latency like between your worker and
> your redis server?  Increasing the number of threads for bolt 3 will likely
> increase your throughput.  Bolt 1 and 2 are probably CPU bound, but bolt 3
> is probably restricted by your network access.  Also I've found that
> localOrShuffleGrouping can improve performance due to reduced network
> communications.
>
>
> On Tue, Mar 18, 2014 at 3:55 PM, David Crossland wrote:
>
>>  A bit more information then
>>
>>  There are 4 components
>>
>>  Spout - This is reading from an azure service bus topic/subscription.
>> A connection is created in the open() method of the spout, nextTuple does a
>> peek on the message, and invokes the following code;
>>
>> StringWriter writer = new StringWriter();
>> IOUtils.copy(message.getBody(), writer);
>> String messageBody = writer.toString();
>>
>>  It then deletes the message from the queue.
>>
>>  Overall nothing all that exciting..
>>
>>  Bolt 1 - Filtering
>>
>>  Parses the message body (json string) and converts it to an object
>> representation.  Filters out anything that isn't a monetise message.  It
>> then emits the monetise message object to the next bolt.  Monetise messages
>> account for ~ 0.03% of the total message volume.
>>
>>  Bolt 2 - transformation
>>
>>  Basically extracts from the monetise object the values that are
>> interesting and contracts a string which it emits
>>
>>  Bolt 3 - Storage
>>
>>  Stores the transformed string in Redis using the current date/time as
>> key.
>>
>>  -
>>
>>  Shuffle grouping is used with the topology
>>
>>  I ack every tuple irrespective of whether I emit the tuple or not.  It
>> should not be attempting to replay tuple.
>>
>>  -
>>
>>  I don't think Bolt 2/3 are the cause of the bottleneck.  They don't
>> have to process much data at all tbh.
>>
>>  I can accept that perhaps there is something inefficient with the
>> spout, perhaps it just can't read from the service bus quickly enough. I
>> will do some more research on this and have a chat with the colleague who
>> wrote this component.
>>
>>  I suppose I'm just trying to identify if I've configured something
>> incorrectly with respect to storm, whether I'm correct to relate the total
>> number of executors and tasks to the total number of cores I have
>> available.  I find it strange that I get a better throughput when I choose
>> an arbitrary large number for the parallelism hint than if I constrain
>> myself to a maximum that equates to the number of cores.
>>
>>  D
>>
>>   *From:* Nathan Leung 
>> *Sent:* ‎Tuesday‎, ‎18‎ ‎March‎ ‎2014 ‎18‎:‎38
>> *To:* user@storm.incubator.apache.org
>>
>>  In my experience storm is able to make good use of CPU resources, if
&

Re: Server load - Topology optimization

2014-03-19 Thread Michael Rose
messageId is any unique identifier of the message, such that when ack is
called on your spout you're returned the identifier to then mark the work
as complete in the source in the case it supports replay.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Wed, Mar 19, 2014 at 10:18 AM, David Crossland wrote:

>  Where is this messageId derived from?
>
>  I note there is a tuple.getMessageId() but there does not appear to be
> an overload to emit that accepts this?
>
>  There is an overload
>
>  
> *emit*<http://nathanmarz.github.io/storm/doc/backtype/storm/task/OutputCollector.html#emit(java.lang.String,%20java.util.List)>(java.lang.String
>  streamId,
> java.util.List tuple)
>
>  I don't think this is what you are referring to however.
>
>  My bolt execute method looks like this;  (sorry if I'm being obtuse,
> this is still very new.. )
>
> public void execute(Tuple tuple) {
> String msg = (String)tuple.getValue(0);
> if(msg == null)
> {
> logger.log(Level.ERROR, "Message is null");
> //acknowledge the tuple has failed and return
> _collector.fail(tuple);
> return;
> }
>
>  MessageProcessor processor = new MessageProcessor();
> EventMessage message = processor.processMessage(msg);
>
>  if(message == null)
> {
> logger.log(Level.DEBUG, "Message did not conform to a known
> event");
> logger.log(Level.DEBUG, msg);
> //acknowlege the tuple, but dont do anything with it, we dont
> have to emit
> _collector.ack(tuple);
> }
>
>  if(message instanceof MonetiseEvent)
> {
> logger.log(Level.DEBUG, "recieved monetise message");
> _collector.emit(new Values(message));
> _collector.ack(tuple);
> }
> }
>
>
>  D
>
>   *From:* Sean Zhong 
> *Sent:* ‎Wednesday‎, ‎19‎ ‎March‎ ‎2014 ‎15‎:‎39
> *To:* user@storm.incubator.apache.org
>
>  Hi David,
>
>
>  As I said, to get the ack count for spout, you need to add a message Id
> when emitting. Likes this,
> collector.emit(new Values(msg), messageId)
>   ^| here
>
>  If the message Id is null, then there will be ack message sent to spout.
>
>
>  Sean
>
>
>
> On Wed, Mar 19, 2014 at 4:49 PM, David Crossland wrote:
>
>>  I had noticed the ack count never increased for the spout.
>>
>>  The spout is a BaseRichSpout, I presumed the underlying code should
>> handle the ack and display appropriate metrics. But you can see from the
>> first bolt that all messages have been asked.. I don't know if this is an
>> issue.
>>
>>  Ive been doing a bit of reading around the service bus, if I understand
>> correctly it can push 20 messages per second per connection (I need to
>> confirm this with a colleague who has the experience with this..) If that's
>> the case then it tallies with the throughput I've been seeing (approx…).  I
>> suspect the problem isn't so much the topology, but the service bus
>> itself.
>>
>>  I'll come back if this doesn't pan out.
>>
>>  D
>>
>>   *From:* Sean Zhong 
>> *Sent:* ‎Wednesday‎, ‎19‎ ‎March‎ ‎2014 ‎00‎:‎45
>> *To:* user@storm.incubator.apache.org
>>
>>  The Spout is suspicous.
>>
>>  From the screenshots, you are using no-acked spout, there may also be
>> GC issue there.
>> Here is the suggestion:
>>
>>  Check whether you are making connection in the context of nextTuple. If
>> that is true, it means a large latency. You can check the spout latency by
>> enabling acked spout(when emmit, add a messageId. e.g. emit(new
>> Values(msg), messageId), and check the latency.
>> If this is the case, you need to create a seperate thread for data
>> connection in spout, and use a queue to buffer messages, then nextTuple
>> just pol the queue.
>>
>>
>>
>> On Wed, Mar 19, 2014 at 8:06 AM, Michael Rose wrote:
>>
>>>  Well, I see you have 30 spouts instances and 3 bolt instances. Doesn't
>>> seem like it's a huge bottleneck, but it is having to send over the wire
>>> much of the time. Something to keep in mind for the future.
>>>
>>>  I'd be most suspicious of your spouts. All spouts on a single worker
>>> are run single-threaded (in an event loop calling nextTuple()), so if you
&g

Re: Working with JSON

2014-03-31 Thread Michael Rose
It's much more efficient to deserialize it once then pass around POJOs.
JSON serialization is slow compared to Kryo. Our topologies tend to take in
JSON, then emit JSON to external systems at later phases, but all
intermediate stages are POJOs.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Mon, Mar 31, 2014 at 11:18 AM, Cody A. Ray  wrote:

> We have one spout which always emits JSON of the same form (this is a
> metric represented as a JSON object with timestamp, name, and value
> fields). Note this is the OpaqueTridentKafkaSpout from storm-kafka-0.8-plus
> which just emits a "bytes" array field, so we first convert it to a string.
>
> Stream stream = topology.newStream("spout1", spout)
> .each(new Fields("bytes"), new *BinaryToString*(), new Fields(
> "string"))
> .each(new Fields("string"), new *MetricJsonParser*(), new Fields(
> "timestamp", "metric", "value"))
> ...
>
> Each of these two functions are pretty simple:
> https://gist.github.com/codyaray/9897217. I'm using Codehaus' 
> Jettison<http://jettison.codehaus.org/>to parse the JSON.
>
> -Cody
>
>
> On Sat, Mar 29, 2014 at 5:30 PM, Tyson Norris  wrote:
>
>> We have a similar setup, and based on routing needs we plan to pass the
>> original JSON, plus some extra fields that will simplify routing via
>> fieldsGrouping.
>> e.g.
>> spout -> content (JSONObject)
>> bolt1 -> receives all spout tuples; execute() uses JSONObject ->
>> emit(JSONObject, String, String) (String values are parsed out of
>> JSONObject)
>> bolt2 -> receives bolt1 tuples based on fieldsGrouping; execute uses
>> JSONObject, String, String to perform some operation -> emit(JSONObject,
>> String) (String values are based on some logic)
>>
>> So while you could go either extreme (continually pass JSONObject value
>> as tuple, or parse the JSONObject and pass only decomposed values as
>> tuple), you can also do both, which would allow you to use fieldGrouping,
>> in case that is important (it is important for our case).
>>
>> Tyson
>>
>>
>> On Mar 29, 2014, at 1:44 PM, Software Dev 
>> wrote:
>>
>> > We actually have a spout that emits just 1 JSON string per tuple.
>> > Wondering what should be down downstream after we have the JSON string
>> >
>> > On Sat, Mar 29, 2014 at 12:20 PM, Andrew Neilson 
>> wrote:
>> >> my team's project has successfully used Jackson
>> >> (https://github.com/FasterXML/jackson) to deserialize a spout of JSON
>> arrays
>> >> into tuples, and I can recommend it. Though I'll warn you that it
>> takes a
>> >> little bit of work beyond the most basic usage (i.e.
>> mapper.readValue(json,
>> >> List.class)) to avoid dealing with type ambiguity.
>> >>
>> >>
>> >> On Sat, Mar 29, 2014 at 12:06 PM, Software Dev <
>> static.void@gmail.com>
>> >> wrote:
>> >>>
>> >>> Say we are receiving tuples of JSON from a spout. Should we just keep
>> >>> passing around the JSON string and deserialize it in each bolt or Is
>> >>> it best to break apart the JSON object into a bunch of fields that can
>> >>> be passed around.
>> >>>
>> >>> I'm thinking in terms of performance the latter may be "better"
>> >>> although it will slightly make the rest of the topology more complex.
>> >>>
>> >>> Also, what is a good JSON library to work with?
>> >>>
>> >>> Thanks
>> >>
>> >>
>>
>>
>
>
> --
> Cody A. Ray, LEED AP
> cody.a@gmail.com
> 215.501.7891
>


Re: Passing command line arguments to storm

2014-04-17 Thread Michael Rose
Yes. Ultimately, that runs the main method of MyTopology, so just like any
other main method you get String[] args.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Thu, Apr 17, 2014 at 5:36 PM, Software Dev wrote:

> Is it possible to pass arguments that adhere to the CommandLine interface?
>
> storm jar topology.jar com.test.MyTopology --config foo --remote
>


Re: Passing command line arguments to storm

2014-04-17 Thread Michael Rose
You could set the args in the StormConfig, then they'll show up in the UI
per-topology.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Thu, Apr 17, 2014 at 7:16 PM, Cody A. Ray  wrote:

> On a related note, is there any way to see the args with which an
> already-running topology was created? I've looked at the properties on the
> ui and dug through the storm directory on the nimbus to no avail.
>
> I could obviously log them out (but will need some identifier since we're
> multi-tenant topologies on this cluster). Just wanted to know if I was
> missing something built-in.
> On Apr 17, 2014 7:31 PM, "Michael Rose"  wrote:
>
>> Yes. Ultimately, that runs the main method of MyTopology, so just like
>> any other main method you get String[] args.
>>
>> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>> mich...@fullcontact.com
>>
>>
>> On Thu, Apr 17, 2014 at 5:36 PM, Software Dev 
>> wrote:
>>
>>> Is it possible to pass arguments that adhere to the CommandLine
>>> interface?
>>>
>>> storm jar topology.jar com.test.MyTopology --config foo --remote
>>>
>>
>>


Re: sending the same stream to two different bolts, possible?

2014-04-25 Thread Michael Rose
That's correct, the stream will be duplicated in the above case.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Fri, Apr 25, 2014 at 12:44 PM, David Novogrodsky <
david.novogrod...@gmail.com> wrote:

> All,
>
> This may be a simple question.  If I have a spout and two
> bolts, printerBolt and secondPrinterBolt;if I do something like this:
>
> builder.setBolt("secondPrinterBolt", new SecondPrinterBolt())
> .fieldsGrouping("criticalSeverityFilter", new
> Fields("severity"));
> builder.setBolt("printerBolt", new PrinterBolt())
> .fieldsGrouping("criticalSeverityFilter", new
> Fields("severity"));
> will both printer bolts be processing the same stream?  I mean if a tuple
> is sent to one printer bolt is it also send to the other printer bolt?
>
> David Novogrodsky
> david.novogrod...@gmail.com
> http://www.linkedin.com/in/davidnovogrodsky
>


Re: PDF processing use case in storm!!

2014-04-28 Thread Michael Rose
I'd be inclined to say that while you can make it work, your unit of work
(a whole PDF) makes it unsuitable to Storm. In general, the more you can
break down each operation the better Storm will work for you. You're likely
to get worse latency out of Storm due to the essentially random delegation
of tuples.

You could really abuse Storm if you wanted and use it as a distributed
application container with threadpools, I've done it. But you're really
going to see a better experience out of a webservice if it's live-mode
requests.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Mon, Apr 28, 2014 at 8:23 AM, Deepak Sharma wrote:

> We need parallelism here.
> As lot of users may be using the service at the same time.It may be for
> different file or the same file.
>
> Thanks
> Deepak
>
>
> On Mon, Apr 28, 2014 at 7:41 PM, Marc Vaillant wrote:
>
>> I think it's important to know whether or not some form of parallelism
>> (other than throughput) is required, otherwise a standard webservice
>> seems sufficient for this use case.
>>
>> On Mon, Apr 28, 2014 at 07:46:35AM -0400, Andrew Perepelytsya wrote:
>> > You can build request response type topologies via DRPC. However,
>> unless we're
>> > talking about processing numerous pdfs at once - bad fit, IMO.
>> >
>> > If there is parallelism required you might be better off with a custom
>> yarn app
>> > - looks like YAYA makes it tolerable top write.
>> >
>> > Andrew
>> >
>> > On Apr 28, 2014 2:41 AM, "Deepak Sharma"  wrote:
>> >
>> > Hi All,
>> > Just wanted to check if this can be valid storm use case.
>> > I want to write 1 simple storm topology which can read pdf file ,
>> process
>> > it , make some changes like convert it to doc and save the new file.
>> > I know this can be easily done in batch mode using hadoop.But we
>> want to do
>> > it in real time ,i.e. when the user demands it.
>> > We already do it using some java api but it takes lot of time in all
>> > conversions.
>> > Can this be achieved in Storm?If yes , Is there any pointer to any
>> examples
>> > similar to this use case?
>> >
>> >
>> > --
>> > Thanks
>> > Deepak
>> > www.bigdatabig.com
>> >
>> >
>> >
>> > CONFIDENTIALITY NOTICE
>> > NOTICE: This message is intended for the use of the individual or
>> entity to
>> > which it is addressed and may contain information that is confidential,
>> > privileged and exempt from disclosure under applicable law. If the
>> reader of
>> > this message is not the intended recipient, you are hereby notified
>> that any
>> > printing, copying, dissemination, distribution, disclosure or
>> forwarding of
>> > this communication is strictly prohibited. If you have received this
>> > communication in error, please contact the sender immediately and
>> delete it
>> > from your system. Thank You.
>>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Re: Is it a must to have /etc/hosts mapping or a DNS in a multinode setup?

2014-04-29 Thread Michael Rose
We don't use /etc/hosts mapping, we only use hostnames / ips.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Tue, Apr 29, 2014 at 8:29 AM, Derek Dagit  wrote:

> I have not tried it, but there is a config for this purpose:
>
> https://github.com/apache/incubator-storm/blob/
> dc4de425eef5701ccafe0805f08eeb011baae0fb/storm-core/src/jvm/
> backtype/storm/Config.java#L122-L131
> --
> Derek
>
>
> On 4/29/14, 0:41, Sajith wrote:
>
>> Hi all,
>>
>> Is it a must to have a /etc/hosts mapping or a DNS in a multinode storm
>> cluster? Can't supervisors talk to each other through ZooKeeper or nimbus
>> using IP addresses directly ?
>>
>>


Re: Machine specs

2014-04-30 Thread Michael Rose
In AWS, we're fans of c1.xlarges, m3.xlarges, and c3.2xlarges, but have
seen Storm successfully run on cheaper hardware.

Our Nimbus server is usually bored on a m1.large.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Wed, Apr 30, 2014 at 9:48 PM, Cody A. Ray  wrote:

> We use m1.larges in EC2 <http://aws.amazon.com/ec2/instance-types/> for
> both nimbus and supervisor machines (though the m1 family have been
> deprecated in favor of m3). Our use case is to do some pre-aggregation
> before persisting the data in a store. (The main bottleneck in this setup
> is the downstream datastore, but memory is the primary constraint on the
> worker machines due to the in-memory cache which wraps the trident state.)
>
> For what its worth, Infochimps 
> suggests<https://github.com/infochimps-labs/big_data_for_chimps/blob/master/25-storm%2Btrident-tuning.asciidoc>c1.xlarge
>  or m3.xlarge machines.
>
> Using the Amazon cloud machines as a reference, we like to use either the
> c1.xlarge machines (7GB ram, 8 cores, $424/month, giving the highest
> CPU-performance-per-dollar) or the m3.xlargemachines (15 GB ram, 4 cores,
> $365/month, the best balance of CPU-per-dollar and RAM-per-dollar). You
> shouldn’t use fewer than four worker machines in production, so if your
> needs are modest feel free to downsize the hardware accordingly.
>
> Not sure what others would recommend.
>
> -Cody
>
>
> On Wed, Apr 30, 2014 at 5:57 PM, Software Dev 
> wrote:
>
>> What kind of specs are we looking at for
>>
>> 1) Nimbus
>> 2) Workers
>>
>> Any recommendations?
>>
>
>
>
> --
> Cody A. Ray, LEED AP
> cody.a@gmail.com
> 215.501.7891
>


Re: How resilient is Storm w/o supervision

2014-05-02 Thread Michael Rose
In practice, we rarely have issues with our Storm processes. We have
Nimbus/supervisors that never restart for months. But when they do have
issues, it's very nice to have them under supervision. Sometimes it's not
even things under their control (e.g. OOM during high memory usage
scenarios).

For development, there shouldn't be an issues foregoing supervision.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Fri, May 2, 2014 at 12:41 PM, P. Taylor Goetz  wrote:

> I don’t think you need root to run supervisord:
>
> http://supervisord.org/running.html
>
> If you’re just testing something out, and don’t mind your cluster going
> down, then running without supervision is okay. But I would NEVER suggest
> someone run Storm’s daemons without supervision in a production environment.
>
> - Taylor
>
> On May 2, 2014, at 2:29 PM, Albert Chu  wrote:
>
> > I'm attempting to run Storm on a platform that I don't have root on.  So
> > I won't be able to run it under Redhat's supervisord that's already
> > installed.
> >
> > How resilient are the Storm daemons by themselves?  Are they reasonably
> > resilient or are they programmed to not handle even relatively simple
> > errors?
> >
> > I should probably say, this probably wouldn't be run in a production
> > environment.  Just trying to understand if the documentation writers are
> > saying, "you should really do this for production" or "it won't work if
> > you don't do this."
> >
> > Thanks,
> >
> > Al
> >
> > --
> > Albert Chu
> > ch...@llnl.gov
> > Computer Scientist
> > High Performance Systems Division
> > Lawrence Livermore National Laboratory
> >
> >
>
>


Re: [VOTE] Storm Logo Contest - Round 1

2014-05-16 Thread Michael Rose
#9 - 3
#5 - 2

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Fri, May 16, 2014 at 10:51 AM, Benjamin Black  wrote:

> #11 - 5 pts
>
>
> On Fri, May 16, 2014 at 7:43 AM, Brian O'Neill wrote:
>
>> #10 - 5 pts (Logo Entry No. 10 - Jennifer Lee)
>>
>>
>> -brian
>>
>> ---
>> Brian O'Neill
>> Chief Technology Officer
>>
>> Health Market Science
>> The Science of Better Results
>> 2700 Horizon Drive € King of Prussia, PA € 19406
>> M: 215.588.6024 € @boneill42 <http://www.twitter.com/boneill42>  €
>> healthmarketscience.com
>>
>> This information transmitted in this email message is for the intended
>> recipient only and may contain confidential and/or privileged material. If
>> you received this email in error and are not the intended recipient, or
>> the person responsible to deliver it to the intended recipient, please
>> contact the sender at the email above and delete this email and any
>> attachments and destroy any copies thereof. Any review, retransmission,
>> dissemination, copying or other use of, or taking any action in reliance
>> upon, this information by persons or entities other than the intended
>> recipient is strictly prohibited.
>>
>>
>>
>>
>>
>>
>>
>> On 5/15/14, 12:28 PM, "P. Taylor Goetz"  wrote:
>>
>> >This is a call to vote on selecting the top 3 Storm logos from the 11
>> >entries received. This is the first of two rounds of voting. In the first
>> >round the top 3 entries will be selected to move onto the second round
>> >where the winner will be selected.
>> >
>> >The entries can be viewed on the storm website here:
>> >
>> >http://storm.incubator.apache.org/blog.html
>> >
>> >VOTING
>> >
>> >Each person can cast a single vote. A vote consists of 5 points that can
>> >be divided among multiple entries. To vote, list the entry number,
>> >followed by the number of points assigned. For example:
>> >
>> >#1 - 2 pts.
>> >#2 - 1 pt.
>> >#3 - 2 pts.
>> >
>> >Votes cast by PPMC members are considered binding, but voting is open to
>> >anyone.
>> >
>> >This vote will be open until Thursday, May 22 11:59 PM UTC.
>> >
>> >- Taylor
>>
>>
>>
>


Re: Storm with video/audio streams

2014-05-19 Thread Michael Rose
No reason why you couldn't do it, but as far as I know it hasn't been done
before. You can send any kind of serializable data into a topology. You'd
probably need to emit frames from the spout.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Mon, May 19, 2014 at 6:13 AM, Suparno Datta wrote:

> Hi,
>
> I just started playing around with storm a few days back. I worked
> with some basic examples using the twitter streaming api. I was wondering
> If storm can also be used with live video/audio streaming api ( the youtube
> streaming api for example). I wanted to develop a app which does something
> like detecting and tracking faces from a live video stream using Storm. But
> i am not sure if this is even feasible. Most of the examples i see deal
> with text streams. Has any one here already tried/ thought of something
> like this ?
>
> Thanks,
>
> --
> Suparno Datta
>


Re: Which daemon tool for Storm nodes?

2014-05-21 Thread Michael Rose
We use upstart. Supervisord would also work. Just anything to keep an eye
on it and restart it if it dies (a very rare occurrence).

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Wed, May 21, 2014 at 7:09 PM, Connie Yang  wrote:

> Hi,
>
> Is there a recommended supervisor or daemon tool for Storm nodes?  I'm
> using Supervisor for Storm and Zookeeper.
>
> I heard some concern in using Supervisor, but I haven't researched into
> the matter.
>
> Any comments on this?
>
> Thanks,
> Connie
>


Re: Workers constantly restarted due to session timeout

2014-05-29 Thread Michael Rose
Do you have GC logging turned on? With a 60GB heap I could pretty easily
see stop-the-world GCs taking longer than the session timeout.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Thu, May 29, 2014 at 10:45 AM, Michael Dev 
wrote:

>  Derek,
>
> We are currently running with -Xmx60G and only about 20-30G of that has
> been observed to be used. I'm still observing workers restarted every 2
> minutes.
>
> What timeout is relevant to increase for the heartbeats in question? Is it
> be a config on the Zookeeper side we can increase to make our topology more
> resilient to these restarts?
>
> Michael
>
> > Date: Fri, 23 May 2014 15:50:50 -0500
> > From: der...@yahoo-inc.com
> > To: user@storm.incubator.apache.org
> > Subject: Re: Workers constantly restarted due to session timeout
>
> >
> > > 2) Is this expected behavior for Storm to be unable to keep up with
> heartbeat threads under high CPU or is our theory incorrect?
> >
> > Check your JVM max heap size (-Xmx). If you use too much, the JVM will
> garbage-collect, and that will stop everything--including the thread whose
> job it is to do the heartbeating.
> >
> >
> >
> > --
> > Derek
> >
> > On 5/23/14, 15:38, Michael Dev wrote:
> > > Hi all,
> > >
> > > We are seeing our workers constantly being killed by Storm with to the
> following logs:
> > > worker: 2014-05-23 20:15:08 INFO ClientCxn:1157 - Client session timed
> out, have not heard from the server in 28105ms for sessionid
> 0x14619bf2f4e0109, closing socket and attempting reconnect
> > > supervisor: 2014-05-23 20:17:30 INFO supervisor:0 - Shutting down and
> clearing state for id 94349373-74ec-484b-a9f8-a5076e17d474. Current
> supervisor time: 1400876250. State: :disallowed, Heartbeat:
> #backtype.storm.daemon.common.WorkerHeartbeat{{:time-secs 1400876249,
> :storm-id "test-46-1400863199", :executors #{[-1 -1]}, :port 6700}
> > >
> > > Eventually Storm decides to just kill the worker and restart it as you
> see in the supervisor log. We theorize this is the Zookeeper heartbeat
> thread and it is being choked out due to very high CPU load on the machine
> (near 100%).
> > >
> > > I have increased the connection timeouts in the storm.yaml config file
> yet Storm seems to continue to use some unknown value for the above client
> session timeout messages:
> > > storm.zookeeper.connection.timeout: 30
> > > storm.zookeeper.session.timeout: 30
> > >
> > > 1) What timeout config is appropriate for the above timeout message?
> > > 2) Is this expected behavior for Storm to be unable to keep up with
> heartbeat threads under high CPU or is our theory incorrect?
> > >
> > > Thanks,
> > > Michael
> > >
> > >
>


Re: New Committer/PPMC Member: Michael G. Noll

2014-05-29 Thread Michael Rose
Congrats!

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Thu, May 29, 2014 at 3:01 PM, Derek Dagit  wrote:

> Welcome Michael!
>
> --
> Derek
>
>
> On 5/29/14, 15:58, P. Taylor Goetz wrote:
>
>> The Podling Project Management Committee (PPMC) for Apache Storm has
>> asked Michael G. Noll to become a committer/PPMC member and we are pleased
>> to announce that he has accepted.
>>
>> Michael has contributed to Storm in many ways, including code patches,
>> community support, and high quality documentation and blog posts. We are
>> very excited to have him on board.
>>
>> Michael’s blog can be found here: http://www.michael-noll.com
>>
>> Please join me in welcoming Michael to the team.
>>
>> - Taylor
>>
>>


Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?

2014-06-02 Thread Michael Rose
You don't have to include a specific bolt for init code. It's not difficult
to push your init code into a separate class and call it from your bolts,
lock on that class, run init, and then allow other instances to skip over
it.

Without changing bolt/spout code, I've taken to including a task hook for
init code (e.g. properties / Guice). Check out BaseTaskHook, it's easily
extendible and can be included pretty easily too:

stormConfig.put(Config.TOPOLOGY_AUTO_TASK_HOOKS,
Lists.newArrayList(MyTaskHook.class.getName()));

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Mon, Jun 2, 2014 at 7:25 PM, Chris Bedford  wrote:

> Yes.. if i used prepare or open on spouts or bolts it would work, but
> unfortunately it would be a bit brittle.  I'd have to include a spout or
> bolt just for initializing my invariant code... i'd rather do that when the
> topology is activated on the worker.. so this seems like a good use of an
>  activated()method on the StormTopology class   (where activated()
> would be called after the StormTopology is deserialized by the worker node
> process).
>
> But, if there is no such method, I will make do with what is there.
>
> thanks for your response.
>
> chris
>
>
>
> On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant 
> wrote:
>
>> The bolt base classes have a prepare method:
>>
>>
>> https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html
>>
>> and the spout base classes have a similar activate method:
>>
>>
>> https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html
>>
>> Is that sufficient for your needs or were you thinking of something
>> different?
>>
>> Marc
>>
>> On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote:
>> > Hi there -
>> >
>> > I would like to set up some state that spouts and bolts share, and I'd
>> like to
>> > prepare this state when the StormTopology gets 'activated' on a worker.
>> >
>> > it would be great if the StormTopology had something like a prepare or
>> open
>> > method to indicate when it is starting.  I looked but i could find no
>> such API.
>> >   Maybe I should submit an enhancement request ?
>> >
>> > Thanks in advance for your responses,
>> >   -  Chris
>> >
>> >
>> >
>> > [ if anyone is curious, the shared state is for all my application code
>> to
>> > check or not check invariants.  the invariant checking takes additional
>> time,
>> > so we don't want to do it in production.. but during
>> testing/development it
>> > helps catch bugs].
>> >
>> > --
>> > Chris Bedford
>> >
>> > Founder & Lead Lackey
>> > Build Lackey Labs:  http://buildlackey.com
>> > Go Grails!: http://blog.buildlackey.com
>> >
>> >
>>
>
>
>
> --
> Chris Bedford
>
> Founder & Lead Lackey
> Build Lackey Labs:  http://buildlackey.com
> Go Grails!: http://blog.buildlackey.com
>
>
>


Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?

2014-06-02 Thread Michael Rose
No, it will be called per bolt instance. That's why init code needs to be
guarded behind a double-check lock to guarantee it only executes once per
JVM.

e.g.

private static volatile boolean initialized = false;

...


if (!initialized) {
   synchronized(MyInitCode.class) {
  if (!initialized) {
 // do stuff
 initialized = true;
  }
   }
}

Until there's a set of lifecycle hooks, that's about as good as I've cared
to make it.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Mon, Jun 2, 2014 at 8:27 PM, Chris Bedford  wrote:

> This looks promising. thanks.
>
> hope you don't mind one more question  --  if i create my own
> implementation of ITaskHook and add it do the config as you illustrated in
> prev. msg..will the prepare() method of my implementation be called
> exactly once shortly after StormTopology is deserialized by the worker
> node process ?
>
>  - chris
>
>
>
>
> On Mon, Jun 2, 2014 at 7:09 PM, Michael Rose 
> wrote:
>
>> You don't have to include a specific bolt for init code. It's not
>> difficult to push your init code into a separate class and call it from
>> your bolts, lock on that class, run init, and then allow other instances to
>> skip over it.
>>
>> Without changing bolt/spout code, I've taken to including a task hook for
>> init code (e.g. properties / Guice). Check out BaseTaskHook, it's easily
>> extendible and can be included pretty easily too:
>>
>> stormConfig.put(Config.TOPOLOGY_AUTO_TASK_HOOKS,
>> Lists.newArrayList(MyTaskHook.class.getName()));
>>
>> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>> mich...@fullcontact.com
>>
>>
>> On Mon, Jun 2, 2014 at 7:25 PM, Chris Bedford 
>> wrote:
>>
>>> Yes.. if i used prepare or open on spouts or bolts it would work, but
>>> unfortunately it would be a bit brittle.  I'd have to include a spout or
>>> bolt just for initializing my invariant code... i'd rather do that when the
>>> topology is activated on the worker.. so this seems like a good use of an
>>>  activated()method on the StormTopology class   (where activated()
>>> would be called after the StormTopology is deserialized by the worker node
>>> process).
>>>
>>> But, if there is no such method, I will make do with what is there.
>>>
>>> thanks for your response.
>>>
>>> chris
>>>
>>>
>>>
>>> On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant 
>>> wrote:
>>>
>>>> The bolt base classes have a prepare method:
>>>>
>>>>
>>>> https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html
>>>>
>>>> and the spout base classes have a similar activate method:
>>>>
>>>>
>>>> https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html
>>>>
>>>> Is that sufficient for your needs or were you thinking of something
>>>> different?
>>>>
>>>> Marc
>>>>
>>>> On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote:
>>>> > Hi there -
>>>> >
>>>> > I would like to set up some state that spouts and bolts share, and
>>>> I'd like to
>>>> > prepare this state when the StormTopology gets 'activated' on a
>>>> worker.
>>>> >
>>>> > it would be great if the StormTopology had something like a prepare
>>>> or open
>>>> > method to indicate when it is starting.  I looked but i could find no
>>>> such API.
>>>> >   Maybe I should submit an enhancement request ?
>>>> >
>>>> > Thanks in advance for your responses,
>>>> >   -  Chris
>>>> >
>>>> >
>>>> >
>>>> > [ if anyone is curious, the shared state is for all my application
>>>> code to
>>>> > check or not check invariants.  the invariant checking takes
>>>> additional time,
>>>> > so we don't want to do it in production.. but during
>>>> testing/development it
>>>> > helps catch bugs].
>>>> >
>>>> > --
>>>> > Chris Bedford
>>>> >
>>>> > Founder & Lead Lackey
>>>> > Build Lackey Labs:  http://buildlackey.com
>>>> > Go Grails!: http://blog.buildlackey.com
>>>> >
>>>> >
>>>>
>>>
>>>
>>>
>>> --
>>> Chris Bedford
>>>
>>> Founder & Lead Lackey
>>> Build Lackey Labs:  http://buildlackey.com
>>> Go Grails!: http://blog.buildlackey.com
>>>
>>>
>>>
>>
>
>
> --
> Chris Bedford
>
> Founder & Lead Lackey
> Build Lackey Labs:  http://buildlackey.com
> Go Grails!: http://blog.buildlackey.com
>
>
>


Re: Order of Bolt definition, catching ""that subscribes from non-existent component [ ...]"

2014-06-06 Thread Michael Rose
You can have a loop on a different stream. It's not always the best thing
to do (deadlock possibilities from buffers) but we have a production
topology that has that kind of pattern. In our case, one bolt acts as a
coordinator for recursive search.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Fri, Jun 6, 2014 at 2:28 PM, Abhishek Bhattacharjee <
abhishek.bhattacharje...@gmail.com> wrote:

> I am sorry for the late reply.
> Yes , you can't have a loop. You can have a chain though( which doesn't
> close upon itself ! ).
>
> Thanks :-)
>
>
>
> On Wed, May 7, 2014 at 12:50 PM, shahab  wrote:
>
>> Thanks Abhishek. But this also implies that we can not have a loop ( of
>> message processing stages) using Storm, right?
>>
>> best,
>> /Shahab
>>
>>
>> On Mon, May 5, 2014 at 9:45 PM, Abhishek Bhattacharjee <
>> abhishek.bhattacharje...@gmail.com> wrote:
>>
>>> I don't think what you are trying to do is achievable. Data in storm
>>> always move forward so you can't give it back to a bolt from which it
>>> originated. That is a bolt can subscribe from bolts which were created
>>> before it's creation. So, I think you can create another object of the A
>>> bolt say D and then assign the o/p of C to D.
>>>
>>>
>>> On Mon, May 5, 2014 at 8:11 PM, shahab  wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to define a topology as following:
>>>> S : is a spout
>>>> A,B,C : are bolts
>>>> --> : means emitting message
>>>>
>>>> S  -->A
>>>>  A  -->B
>>>> B -->C
>>>> C -->A
>>>>
>>>> I am declaring the Spouts and Bolts in the above order in my java code
>>>> , first S, then A , B and finally C.
>>>>
>>>> I am using  globalGrouping("BoltName", StreamID) for collecting
>>>> messages to be collected by each bolt,
>>>>
>>>> The problem is that I receive an error, while defining bolt "A" saying
>>>> "that subscribes from non-existent component [C]" .
>>>>
>>>> I guess the error is happening because component "C" is not defined yet!
>>>> but what could be the solution to this?
>>>>
>>>> best,
>>>> /Shahab
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> *Abhishek Bhattacharjee*
>>> *Pune Institute of Computer Technology*
>>>
>>
>>
>
>
> --
> *Abhishek Bhattacharjee*
> *Pune Institute of Computer Technology*
>


Re: Are Real-Time Game Servers a good use case for Storm

2014-06-08 Thread Michael Rose
You could make Storm do what you want, but it's not going to work well for
you. A normal client/server is vastly more suited to the type of workload
you want.

UDP may have less overhead, but overall a stall in processing is much more
costly. In a datacenter, TCP is the way to go for reliable communications.
UDP is popular between game client & server because of packet loss's effect
on TCP RTT, and packet loss is common between consumers and game servers.
Not as much between DC nodes.

Storm's support for other languages isn't exactly anything special. You
could effect the same interface in non-Storm code. Again, Storm can do
processing in low-latency situations (<100ms), but it's not what you want.
You really, really don't want Storm for this application. A custom
application (yes, you can indeed use Netty UDP) will be much much better
for you.

If your game server is just running business logic, a totally stateless set
of servers is really the way to go.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Sun, Jun 8, 2014 at 9:07 PM, Ted Dunning  wrote:

>
> Why do you think that UDP is faster?
>
>
>
>
> On Sun, Jun 8, 2014 at 6:27 PM, joe roberts  > wrote:
>
>>  To make it faster!
>>
>>
>> On 6/8/2014 8:27 PM, Ted Dunning wrote:
>>
>>
>> On Sun, Jun 8, 2014 at 12:12 PM, joe roberts <
>> carl.roberts.zap...@gmail.com> wrote:
>>
>>> Also, it seems Storm uses TCP via ZeroMQ by default -Is that right?  And
>>> if so, can it be switched to use UDP or UDT instead, perhaps by replacing
>>> ZeroMQ with Netty?
>>>
>>
>> Why would you want that?
>>
>>
>>
>>
>


Re: [VOTE] Storm Logo Contest - Final Round

2014-06-09 Thread Michael Rose
#9 - 5pts

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Mon, Jun 9, 2014 at 2:49 PM, Bryan Stone <
bryan.st...@synapse-wireless.com> wrote:

>   #9 – 5 pts
>
>  *Bryan Stone*
>  *Staff Cloud Architect*
>  *Cloud Platform Lead*
>
> 6723 Odyssey Drive
>  Huntsville, AL 35806
>
> (256) 924-3976 // direct
>  (256) 724-0113 // cell
>  www.synapse-wireless.com
>
>   On 6/9/14, 1:38 PM, "P. Taylor Goetz"  wrote:
>
>   This is a call to vote on selecting the winning Storm logo from the 3
> finalists.
>
> The three candidates are:
>
>   * [No. 6 - Alec Bartos](
> http://storm.incubator.apache.org/2014/04/23/logo-abartos.html)
>  * [No. 9 - Jennifer Lee](
> http://storm.incubator.apache.org/2014/04/29/logo-jlee1.html)
>  * [No. 10 - Jennifer Lee](
> http://storm.incubator.apache.org/2014/04/29/logo-jlee2.html)
>
> VOTING
>
> Each person can cast a single vote. A vote consists of 5 points that can
> be divided among multiple entries. To vote, list the entry number, followed
> by the number of points assigned. For example:
>
> #1 - 2 pts.
> #2 - 1 pt.
> #3 - 2 pts.
>
> Votes cast by PPMC members are considered binding, but voting is open to
> anyone. In the event of a tie vote from the PPMC, votes from the community
> will be used to break the tie.
>
> This vote will be open until Monday, June 16 11:59 PM UTC.
>
> - Taylor
>
>
> ==
>
> This e-mail, including any attachments, is intended for the exclusive use
> of the person(s) to which it is addressed and may contain proprietary,
> confidential and/or privileged information. If the reader of this e-mail is
> not the intended recipient or his or her authorized agent, any review, use,
> printing, copying, disclosure, dissemination or distribution of this e-mail
> is strictly prohibited. If you think that you have received the e-mail in
> error, please notify the sender immediately by return e-mail, delete this
> communication and destroy all copies.
>
> ==
>


Re: Storm/Hbase Bolt Performance

2014-06-09 Thread Michael Rose
Just as a side note, I've seen capacity numbers >6. The calculation for
capacity is somewhat flawed, and does not represent a true percentage
capacity, merely a relative measure next to your other bolts.

Maybe that's something we can improve, I'll log a JIRA if there isn't
already one.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Mon, Jun 9, 2014 at 6:58 PM, Jon Logan  wrote:

> Are you sure you are looking at the right figure? Capacity should not be >
> 1. High values indicate that you may want to increase parallelism on that
> step. Low values indicate something else is probably bottlenecking your
> topology. If you could send a screenshot of the Storm UI that could be
> helpful.
>
>
> I've had good luck with YourKit...just remotely attach to a running worker.
>
>
> On Mon, Jun 9, 2014 at 8:53 PM, Justin Workman 
> wrote:
>
>> The capacity indicates they are being utilized. Capacity hovers around
>> .800 and busts to 1.6 or so when we see spikes of tuples or restart the
>> topology.
>>
>> Recommendations on profilers?
>>
>> Sent from my iPhone
>>
>> On Jun 9, 2014, at 6:50 PM, Jon Logan  wrote:
>>
>> Are your HBase bolts being saturated? If not, you may want to increase
>> the number of pending tuples, as that could cause things to be artificially
>> throttled.
>>
>> You should also try attaching a profiler to your bolt, and see what's
>> holding it up. Are you doing batched puts (or puts being committed on a
>> separate thread)? That could also cause substantial improvements.
>>
>>
>> On Mon, Jun 9, 2014 at 8:11 PM, Justin Workman 
>> wrote:
>>
>>> In response to a comment from P. Taylor Goetz on another thread..."I
>>> can personally verify that it is possible to process 1.2+ million
>>> (relatively small) messages per second with a 10-15 node cluster — and that
>>> includes writing to HBase, and other components (I don’t have the hardware
>>> specs handy, but can probably dig them up)."
>>>
>>> I would like to know what special knobs people are tuning in both Storm
>>> and Hbase to achieve this level of throughput. Things I would be interested
>>> in would be Hbase cluster sizes, is the cluster shared with map reduce load
>>> as well, bolt parallelism and any other knobs people have adjusted to get
>>> this level of write throughput to Hbase from Storm.
>>>
>>> Maybe this isn't the right group, but we are struggling getting more
>>> than about 2000 tuples/sec writting to Hbase. I think I know some of the
>>> bottlenecks, but would love to know what others in teh community are tuning
>>> to get this level of performance.
>>>
>>> Our messages are roughly 300-500k and we are running on a 6 node Storm
>>> cluster running on virtual machines (our first bottleneck, which we will be
>>> replacing with 10 relatively beefy physical nodes), a parallelism of 40 for
>>> our storage bolt.
>>>
>>> Any hints on Hbase or Storm optimizations that can be done to help
>>> increase the throughput to Hbase would be greatly appreciated.
>>>
>>> Thanks
>>> Justin
>>>
>>
>>
>


Re: resource aware task scheduling in Storm cluster

2014-06-10 Thread Michael Rose
Out of curiosity, what kind of changes have you been making?

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Tue, Jun 10, 2014 at 9:36 PM, Jason Jackson  wrote:

> Hi Alex,
>
> We're using a slightly modified version of https://github.com/nathanmarz/
> storm-mesos in production. Currently there's no one "supporting"
> storm-mesos, nor have we been open sourcing our changes in this project at
> this point in time.
>
> // Jason
>
>
> On Mon, Mar 3, 2014 at 3:00 PM, Alexander S. Klimov <
> alexk...@microsoft.com> wrote:
>
>>  Hi guys,
>>
>>
>>
>> I found this solution for resource aware task scheduling in Storm:
>>
>> https://github.com/nathanmarz/storm-mesos
>>
>>
>>
>> What is the status of this project? Is this still supported? Would it be
>> recommended to be used in production?
>>
>>
>>
>> Thanks,
>> Alex
>>
>
>


Re: Storm initialization on startup

2014-06-11 Thread Michael Rose
We tend to push that kind of initialization logic into the prepare() method
of our bolts. Most times, you only need one bolt to do it, so you can push
all of your initialization logic into a different class and guard the init
with double-check locks. e.g.:


@Override
public void prepare(Map config, TopologyContext topologyContext,
OutputCollector outputCollector) {
this.statefulService = StatefulService.getInstance(config);
}

...

class StatefulService {

private static volatile StatefulService INSTANCE = null;

public StatefulService getInstance(Map config) {
if (INSTANCE == null) {
   synchronized(StatefulService.class) {
  if (INSTANCE == null) {
  INSTANCE = new StatefulService(config); // do some init
  }
   }
}

return INSTANCE;
}
}

Other patterns are pretty similar. Guard the init code, call it in all your
bolt's prepare methods. The first executor to start per JVM wins.

Storm has some gotchas (bolts are serialized, so do your init in
prepare()), but in general things that work in a normal Java application
will end up working in Storm.

Michael



Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Wed, Jun 11, 2014 at 1:23 AM, Aleksandar Stojadinovic <
aleksandar.stojadino...@nissatech.com> wrote:

> Hi,
>
> Thanks for your answer. It sounds like a good idea. I can put the
> parameters (read from database) in the configuration map in my main
> function and then read them in the configurator. Not too ideal, but a whole
> lot cleaner then any other I thought of. I'll try something like that. Yet
> I'm still open for new answers because I guess I'm not the first and only
> with this problem. There can certainly be a pattern derived.
>
> Best regards,
> Aleksandar
>
>
>
> --
> From: qincui...@gmail.com
> Subject: Re: Storm initialization on startup
> Date: Tue, 10 Jun 2014 20:07:40 +0200
> To: user@storm.incubator.apache.org
>
>
> Hi Aleksandar,
>  If I understand you correctly, you want to configure your stateful
> component. You could try to use the Config class in Storm, something like
>  "Config.put(key,value)". Then in the prepare function of bolt you can get
> the parameters you set up,like conf.get(key). Hope it will help you.
>
> Best,
> Cui Qin
>
> Sent from my iPhone
>
> On 10 Jun 2014, at 15:11, Aleksandar Stojadinovic <
> aleksandar.stojadino...@nissatech.com> wrote:
>
>  Hello,
>
> I have a topology with two bolt types (among others) which I am not
> certain about how will work. One of the bolts has a custom made stateful
> component  The other bolt type, the "configurator" configures that
> component under certain conditions and on input from a spout, prepares a
> configuration, and communicates with a database in the process. That works
> fine, but the point that bothers me is that I have to configure my stateful
> component on start-up with some values in the database, but I want to keep
> the logic in that bolt clear, without database access. So my solution is to
> access the database and read the configuration on the "configurator"
> startup and emit it. But what if the stateful bolt is not up yet? Will the
> messages get persisted (no sign of something like that in the Storm
> documents), or the prepare command is executed after the topology is set up?
>
> In short, how to handle this situation?
>
> *Best regards*
>
>


Re: Non-DRPC topologies and number of assigned workers

2014-06-11 Thread Michael Rose
A topology can run in as many workers as you assign at launch time, DRPC or
not.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Wed, Jun 11, 2014 at 1:08 PM, Nima Movafaghrad <
nima.movafagh...@oracle.com> wrote:

> Hi Community,
>
>
>
> Wondering if a non-drpc topology can run in multiple workers or is it just
> gonna run within one?
>
>
>
> Thanks,
>
> Nima
>


Re: hot swap of topology

2014-06-16 Thread Michael Rose
Hi Aaron,

We do rolling deploys of our topologies by appending the build number to
each topology.

storm-topology-1 is active
Jenkins submits storm-topology-2
Allow storm-topology-2 to become active & check health (or else halt)
Deactivate storm-topology-1 & wait a few minutes (opportunity to halt and
reactivate)
Kill storm-topology-1

This works for us as we don't have any critical in-memory state that isn't
checkpointed to a persistent store, and the vast majority of our work can
be replayed safely.

Michael

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Mon, Jun 16, 2014 at 7:32 AM, Aaron Zimmerman <
azimmer...@sproutsocial.com> wrote:

> Has anyone done any work with redeploying a topology with minimal
> downtime? I'm imagining a new storm command, or maybe a new function of the
> StormSubmitter class that:
>
> uploads the new code to the cluster,
> initializes bolts and spouts,
> turns off the old spouts,
> turns on the new spouts
> waits for old cluster to finish anything in flight
> kills the old topology.
>
> I have a deploy process in place that does this manually but this prevents
> the use of the Isolation Scheduler, since I have to rename the topology
> something different to have multiple running at once.
>
> I couldn't find a related JIRA,  but I seem to recall some discussion in
> the past and I dont want to duplicate work.
>
> Thanks,
>
> Aaron Zimmerman
>


RE: Extracting Performance Metrics

2014-06-16 Thread Michael Rose
What kind of issues does Metrics have that leads you to recommend
HdrHistogram?
On Jun 16, 2014 6:57 PM, "Dan"  wrote:

> Be careful when using Coda Hale's Metrics package when measuring latency.
> Consider using Gil Tene's
> High Dynamic Range Histogram instead:
>
> http://hdrhistogram.github.io/HdrHistogram/
>
> -Dan
>
> --
> From: and...@parsely.com
> Date: Mon, 16 Jun 2014 18:20:11 -0400
> Subject: Re: Extracting Performance Metrics
> To: user@storm.incubator.apache.org
>
> Also, I came across this presentation by Visible Measures which actually
> walks through a lot of great options covering most of what you want to know
> about:
>
> http://files.meetup.com/5809742/storm%20monitoring.pdf
>
> One other thing to be aware of is that in Storm 0.9.2 (forthcoming
> release), there is a new REST API used by the Storm UI for gathering some
> of these metrics:
>
> https://github.com/apache/incubator-storm/pull/101
> https://issues.apache.org/jira/browse/STORM-205
>
>
> On Mon, Jun 16, 2014 at 6:13 PM, Andrew Montalenti 
> wrote:
>
> I haven't used it yet, but a lot of people get pointed to metrics_storm:
>
> https://github.com/ooyala/metrics_storm
>
> With this blog post that discusses it:
>
> http://engineering.ooyala.com/blog/open-sourcing-metrics-storm
>
> Michael Noll also has a nice blog post about streaming Storm 0.9 metrics
> to Graphite:
>
>
> http://www.michael-noll.com/blog/2013/11/06/sending-metrics-from-storm-to-graphite/
>
> Currently, when we use Storm, we do a lot of custom metrics in Graphite
> using statsd, as described in this post (not about Storm, but about
> Graphite/statsd):
>
> http://codeascraft.com/2011/02/15/measure-anything-measure-everything/
>
>
>
>
> On Mon, Jun 16, 2014 at 4:37 PM, Anis Nasir  wrote:
>
> Dear all,
>
> I am running a cluster with 1 kafka + 1 nimbus + 10 supervisor + 1
> zookeeper nodes. I am executing multiple topologies on the cluster and I
> want to extract different metrics that I am mentioning below. Can someone
> help me by recommending tools that I can use to extract this information.
>
>
> Per Topology
>  - Throughput
>  - Latency
>
> Per Spout or Bolt
>  - Throughput
>  - Latency
>  - Execution Time
>  - Queuing Time
>  - Number of Messages Processed
>
> Regards
> Anis
>
>
>
>


Re: Topology Memory leaks

2014-06-17 Thread Michael Rose
I've run into similar leaks with one of our topologies. ZMQ vs. Netty
didn't make any difference for us. We'd been looking into the Netty-based
HTTP client we're using as a suspect, but maybe it is Storm.

8 workers, 1.5GB heap, CMS collector, Java 1.7.0_25-b15, Storm 0.9.0.1

What kinds of things do your topologies do?

One thing we'd observed is a bump in direct buffers. Usually starts around
100. Java can't account for the memory used, but the size & count of the
allocations as shown by pmap is suspicious.

...
7f30ac1bc000  63760K -[ anon ]
7f30b000864K rw---[ anon ]
7f30b00d8000  64672K -[ anon ]
7f30b400620K rw---[ anon ]
7f30b409b000  64916K -[ anon ]
7f30b800   1780K rw---[ anon ]
7f30b81bd000  63756K -[ anon ]
7f30bc00   1376K rw---[ anon ]
7f30bc158000  64160K -[ anon ]
7f30c000   1320K rw---[ anon ]
...

  "buffers":{
 "direct":{
"count":721,
"memoryUsed":16659150,
"totalCapacity":16659150
 },
 "mapped":{
"count":0,
"memoryUsed":0,
    "totalCapacity":0
 }
  },

Do you have a similar bump in direct buffer counts?

Michael

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Tue, Jun 17, 2014 at 11:15 AM, Indra Nath Bardhan <
indranath.bard...@gmail.com> wrote:

> Hi All,
>
> We have a topology which is running on 16 workers with 2GB heap each.
>
> However we see that the topology worker RES memory usage keeps on piling
> up i.e., starting at 1.1 G and keeps growing over and beyond the 2G mark
> till it overwhelms the entire node.
>
> This possibly indicates that
>
> 1) we either have slowly consuming bolts and thus need throttling in spout
> 2) OR a memory leak in the ZMQ buffer allocation or some of the JNI code.
>
> Based on responses in certain other discussions, we tried making our
> topology reliable and make use of the MAX_SPOUT_PENDING to throttle the
> spouts. However this did not yield us much value, trying with a value of
> 1000 & 100, we see the same growth in the memory usage, although a bit
> slower in the later case.
>
> We also did a pmap of the offending pids and did not see much memory usage
> by the native lib*so files.
>
> Is there any way to identify the source of this native leak OR fix this ?
> We need some urgent help on this.
>
> [NOTE: Using Storm - 0.9.0_wip21]
>
> Thanks,
> Indra
>
>
>
>


Re: Storm trident, multiple workers nothing happens

2014-06-17 Thread Michael Rose
In a single worker, you don't incur serialization or network overhead.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Tue, Jun 17, 2014 at 11:09 PM, Romain Leroux 
wrote:

> Still netty performances were clearly better for me (when using only 1
> worker since multiple workers didn't work with netty)
>
>
> 2014-06-18 1:33 GMT+09:00 Danijel Schiavuzzi :
>
> I had a similar problem, with Netty my Trident transactional topology was
>> getting stuck after several occurences of Kafka spout restarting due to
>> Kafka SocketTimeouts (I can reproduce this bug by blocking access to Kafka
>> from the Supervisor machines with iptables, only a few tries are needed to
>> reproduce it). Reverted to ZMQ and now it works flawlessly.
>>
>> I'll prepare a reproducible test case and fill a JIRA bug report ASAP.
>>  On Jun 17, 2014 3:51 PM, "Romain Leroux"  wrote:
>>
>>> As I read in different topics, here also simply switching back to ZeroMQ
>>> solved the issue ...
>>>
>>>
>>> 2014-06-13 21:58 GMT+09:00 Romain Leroux :
>>>
>>>> After tuning a trident topology (kafka->storm->cassandra) to run on 1
>>>> worker (so on 1 server), it works really well.
>>>>
>>>> I tried to deploy it using 2 workers on 1 server or 2 workers on 2
>>>> servers.
>>>> The result is the same, nothing happens, no tuples are emitted and no
>>>> messages in the logs.
>>>>
>>>> A quick profiling showed me that :
>>>>
>>>> 77% of CPU time is main-SendThread(a.zookeeper.hostname:2181)
>>>> org.apache.zookeeper.ClientCnx$sendThreadrun()
>>>> sun.nio.ch.SelectorImpl.select()
>>>>
>>>> The rest mainly come from 2 threads "New I/O"
>>>> org.jboss.netty.channel.socket.nio.SelectorUtil.select()
>>>> sun.nio.ch.SelectorImpl.select()
>>>>
>>>> Therefore I am wondering if the problem can come from one of the
>>>> followings :
>>>>
>>>> - Zookeeper cluster version is 3.4.6, which is different from the 3.3.x
>>>> used by Storm 0.9.1-incubating ?
>>>> But that is strange because there are absolutely no problem when using
>>>> the same settings but with only 1 worker
>>>>
>>>> - Communication layer is netty, which can be not working well with my
>>>> hardware ? (is this possible?)
>>>> In case of 1 worker only netty seems not to be too much involved (no
>>>> inter worker communication)
>>>> Maybe changing to ZeroMQ ?
>>>>
>>>> Has someone faced similar issue ? Any pointer ? Or anything in
>>>> particular to monitor / profile ?
>>>>
>>>
>>>
>


Re: Storm's performance limits to 1000 tuples/sec

2014-06-25 Thread Michael Rose
On serialization, make sure your custom classes are registered with Kryo
otherwise it may use Java serialization (slow)
On Jun 25, 2014 10:30 AM, "Robert Turner"  wrote:

> Serialisation across workers might be your problem, if you can use the
> "localOrShuffle" grouping and arrange that the number of spouts and bolts
> is a multiple of the number of workers then this will minimise the
> serialisation across workers. If there is only one counting bolt for the
> topology then tuples are serialised and sent to the worker with the single
> counting bolt. A better approach might be to have a single counting bolt
> per worker and aggregate those periodically.
>
> Regards
>Rob Turner.
>
>
> On 24 June 2014 15:10,  wrote:
>
>> Hi all,
>>
>> I face a critical problem about performance of my storm topology. I can
>> only process 1000 tuples/sec from kafka by kafkaSpout. I use standard storm
>> to set my topology(not trident), and my topology information is as follows:
>> [Machines]
>> I have 1 nimbus and 3 supervisors and each with 2-core CPU in GCE(google
>> compute engine)
>> Number of workers:12
>> Number of executers:51
>> [Topology]
>> Number of kafkaSpout: 13(fetch 13 topics from kafka brokers)
>> Number of Bolts: 12(There are 5 mysql-dumper bolt here)
>>
>> KafkaSpout(topic) emits to boltA and boltB
>> boltA(parallelism=9): parse the avro tuple from kafkaSpout
>> boltB(parallelism=1): Counting number of bolt only
>>
>> Ifound sometimes boltA's capacity is 1 or above in storm UI, and my 5
>> mysql-dumper bolt's execute latency is more than 300ms(other bolts are less
>> than 10ms). In addition, my complete latency of these kafkaspouts is more
>> than 2000ms in the beggining, but it drops to 1000ms after a while.
>>
>> I found this topology can only process 1000 tuples/s or less, but my goal
>> is to process 1 tuples/s. Is any wrong of my topology config? Actually,
>> my topology is doing simple thing like counting and dumping to mysql only.
>> It seems storm not to have a good performance as it says(million of tuples
>> in a second in 10-node cluster). Can anyone give me some suggestion?
>>
>> Thanks a lot.
>>
>> Best regards,
>> James
>
>
>
>
> --
> Cheers
>Rob.
>


Re: Choosing where your tasks run in Storm

2014-07-04 Thread Michael Rose
You can make it happen with a custom scheduler, see this article (sorry for
mangling, getting this link through SpamAssassin on the group was a
nightmare):

 xumingming  sinaapp 
 885/twitter-storm-how-to-develop-a-pluggable-scheduler/

But it's nothing I've seriously attempted before, the existing schedulers
are in Clojure. It's not impossible to do for sure, but like Andrew said it
might well just be easier to have separate clusters that share ZK clusters.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Fri, Jul 4, 2014 at 10:28 AM, Andrew Montalenti 
wrote:

> I don't think this is possible right now, though I have thought about the
> same thing before. It *might* be true that Storm's support for YARN could
> eventually lead to this kind of thing, but I don't know much about it. For
> now, you're best off having separate Storm clusters for different classes
> of machines. You could consider putting Kafka queues between them to ensure
> cross-topology message reliability guarantees. (e.g. have your I/O bound
> topology read from kafka and write to kafka, and have your CPU-bound
> topology read from the Kafka topic produced by the first queue).
>
> ---
> Andrew Montalenti
> Co-Founder & CTO
> http://parse.ly
>
> On Fri, Jul 4, 2014 at 7:59 AM, jeff saremi 
> wrote:
>
>> I'm wondering if this concept applies to Storm and if there's a way to do
>> this.
>>
>> I'd like to limit the machines that certain spouts or bolts run on. There
>> are many reasons for this. But for one let's assume that I have a bolt
>> that is just a proxy for some legacy service. I want to monitor that
>> service by way of the bolt and use it in my topology.
>> Another way of looking at it is that I want to have a topology that spans
>> different "classes" of machines.
>> Let's say I have 3 classes of machines: small, medium, and large. Some
>> topologies are limited to only one class of machines however some other
>> topologies need to span two or more classes of machines.
>> How can I do this in storm?
>> Thanks
>> Jeff
>>
>
>


Re: Does Storm support JDK7 ??

2014-07-14 Thread Michael Rose
You need only run the existing releases on JDK 7 or 8.
On Jul 14, 2014 7:15 AM, "Haralds Ulmanis"  wrote:

> Actually now I've  customized a bit storm and recompiled as I needed some
> changes in it.
> But initially I just downloaded and run.
>
>
>
>
> On 14 July 2014 14:02, Adrianos Dadis  wrote:
>
>> Hi Haralds,
>>
>> Have you build it with JDK8 and run with JDK8, or you just downloaded
>> Storm (which is build with JDK6) and run it with JDK8?
>>
>>
>>
>>
>> On Mon, Jul 14, 2014 at 3:24 PM, Haralds Ulmanis 
>> wrote:
>>
>>> Do not know about jdk7, I'm running on jdk8 and seems fine.
>>>
>>>
>>> On 14 July 2014 13:11, Veder Lope  wrote:
>>>
 Storm is a great project!!! We are trying to build a project, where
 Storm holds a crucial role in our architecture.

 As I see in pom.xml (in maven-compiler-plugin), source and target are
 set to Java 1.6.
 1) Is Storm compatible with JDK7?
 2) I know we can download Storm (build for JDK6) and run it using JDK7,
 but there are a few incompatibilities* between JDK7 and JDK6. Will these
 incompatibilities affect Storm or not?
 3) Do you plan to move to JDK7?
 4) What is the restriction that holds are back to JDK6? (we are now
 stuck to JDK6 compile and runtime because of Storm)
 5) Can we just build Storm with JDK7 (alter both source and target in
 pom.xml) and then use JDK7 for runtime or not? Have you seen any errors
 with this road?


 *incompatibilities: Check this:
 http://www.oracle.com/technetwork/java/javase/compatibility-417013.html#incompatibilities

 Regards,
 Adrianos Dadis.


>>>
>>
>


Re: Distribute Spout output among all bolts

2014-07-16 Thread Michael Rose
Maybe we can help with your topology design if you let us know what you're
doing that requires you to shuffle half of the whole stream output to each
of the two different types of bolts.

If bolt b1 and bolt b2 are both instances of ExampleBolt (and not two
different types) as above, there's no point to doing this. Setting the
parallelism will make sure that data is partitioned across machines (by
default, setting parallelism sets tasks = executors = parallelism).

Unfortunately, I don't know of any way to do this other than shuffling the
output to a new bolt, e.g. bolt "b0" a 'RouterBolt', then having bolt b0
round-robin the received tuples between two streams, then have b1 and b2
shuffle over those streams instead.



Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Wed, Jul 16, 2014 at 5:40 PM, Andrew Xor 
wrote:

> ​
> Hi Tomas,
>
>  As I said in my previous mail the grouping is for a bolt *task* not for
> the actual number of spawned bolts; for example let's say you have two
> bolts that have a parallelism hint of 3 and these two bolts are wired to
> the same spout. If you set the bolts as such:
>
> tb.setBolt("b1", new ExampleBolt(), 2 /* p-hint
> */).shuffleGrouping("spout1");
> tb.setBolt("b2", new ExampleBolt(), 2 /* p-hint
> */).shuffleGrouping("spout1");
>
> Then each of the tasks will receive half of the spout tuples but each
> actual spawned bolt will receive all of the tuples emitted from the spout.
> This is more evident if you set up a counter in the bolt counting how many
> tuples if has received and testing this with no parallelism hint as such:
>
> tb.setBolt("b1", new ExampleBolt(),).shuffleGrouping("spout1");
> tb.setBolt("b2", new ExampleBolt()).shuffleGrouping("spout1");
>
> Now you will see that both bolts will receive all tuples emitted by
> spout1.
>
> Hope this helps.
>
> ​
> ​Andrew.​
>
>
> On Thu, Jul 17, 2014 at 2:33 AM, Tomas Mazukna 
> wrote:
>
>> Andrew,
>>
>> when you connect your bolt to your spout you specify the grouping. If you
>> use shuffle grouping then any free bolt gets the tuple - in my experience
>> even in lightly loaded topologies the distribution amongst bolts is pretty
>> even. If you use all grouping then all bolts receive a copy of the tuple.
>> Use shuffle grouping and each of your bolts will get about 1/3 of the
>> workload.
>>
>> Tomas
>>
>>
>> On Wed, Jul 16, 2014 at 7:05 PM, Andrew Xor 
>> wrote:
>>
>>> H
>>> ​i,
>>>
>>>  I am trying to distribute the spout output to it's subscribed bolts
>>> evenly; let's say that I have a spout that emits tuples and three bolts
>>> that are subscribed to it. I want each of the three bolts to receive 1/3
>>> rth of the output (or emit a tuple to each one of these bolts in turns).
>>> Unfortunately as far as I understand all bolts will receive all of the
>>> emitted tuples of that particular spout regardless of the grouping defined
>>> (as grouping from my understanding is for bolt *tasks* not actual bolts).
>>>
>>>  I've searched a bit and I can't seem to find a way to accomplish
>>> that...​ is there a way to do that or I am searching in vain?
>>>
>>> Thanks.
>>>
>>
>>
>>
>> --
>> Tomas Mazukna
>> 678-557-3834
>>
>
>


Re: Distribute Spout output among all bolts

2014-07-16 Thread Michael Rose
It doesn't say so, but if you have 4 workers, the 4 executors will be
shared evenly over the 4 workers. Likewise, 16 will partition 4 each. The
only case where a worker will not get a specific executor is when there are
less executors than workers (e.g. 8 workers, 4 executors), 4 of the workers
will receive an executor but the others will not.

It sounds like for your case, shuffle+parallelism is more than sufficient.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Wed, Jul 16, 2014 at 5:53 PM, Andrew Xor 
wrote:

> Hey Stephen, Michael,
>
>  Yea I feared as much... as searching the docs and API did not surface any
> reliable and elegant way of doing that unless you had a "RouterBolt". If
> setting the parallelism of a component is enough for load balancing the
> processes across different machines that are part of the Storm cluster then
> this would suffice in my use case. Although here
> <https://storm.incubator.apache.org/documentation/Understanding-the-parallelism-of-a-Storm-topology.html>
> the documentation says executors are threads and it does not explicitly say
> anywhere that threads are spawned across different nodes of the cluster...
> I want to avoid the possibility of these threads only spawning locally and
> not in a distributed fashion among the cluster nodes..
>
> Andrew.
>
>
> On Thu, Jul 17, 2014 at 2:46 AM, Michael Rose 
> wrote:
>
>> Maybe we can help with your topology design if you let us know what
>> you're doing that requires you to shuffle half of the whole stream output
>> to each of the two different types of bolts.
>>
>> If bolt b1 and bolt b2 are both instances of ExampleBolt (and not two
>> different types) as above, there's no point to doing this. Setting the
>> parallelism will make sure that data is partitioned across machines (by
>> default, setting parallelism sets tasks = executors = parallelism).
>>
>> Unfortunately, I don't know of any way to do this other than shuffling
>> the output to a new bolt, e.g. bolt "b0" a 'RouterBolt', then having bolt
>> b0 round-robin the received tuples between two streams, then have b1 and b2
>> shuffle over those streams instead.
>>
>>
>>
>> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>> mich...@fullcontact.com
>>
>>
>> On Wed, Jul 16, 2014 at 5:40 PM, Andrew Xor 
>> wrote:
>>
>>> ​
>>> Hi Tomas,
>>>
>>>  As I said in my previous mail the grouping is for a bolt *task* not for
>>> the actual number of spawned bolts; for example let's say you have two
>>> bolts that have a parallelism hint of 3 and these two bolts are wired to
>>> the same spout. If you set the bolts as such:
>>>
>>> tb.setBolt("b1", new ExampleBolt(), 2 /* p-hint
>>> */).shuffleGrouping("spout1");
>>> tb.setBolt("b2", new ExampleBolt(), 2 /* p-hint
>>> */).shuffleGrouping("spout1");
>>>
>>> Then each of the tasks will receive half of the spout tuples but each
>>> actual spawned bolt will receive all of the tuples emitted from the spout.
>>> This is more evident if you set up a counter in the bolt counting how many
>>> tuples if has received and testing this with no parallelism hint as such:
>>>
>>> tb.setBolt("b1", new ExampleBolt(),).shuffleGrouping("spout1");
>>> tb.setBolt("b2", new ExampleBolt()).shuffleGrouping("spout1");
>>>
>>> Now you will see that both bolts will receive all tuples emitted by
>>> spout1.
>>>
>>> Hope this helps.
>>>
>>> ​
>>> ​Andrew.​
>>>
>>>
>>> On Thu, Jul 17, 2014 at 2:33 AM, Tomas Mazukna 
>>> wrote:
>>>
>>>> Andrew,
>>>>
>>>> when you connect your bolt to your spout you specify the grouping. If
>>>> you use shuffle grouping then any free bolt gets the tuple - in my
>>>> experience even in lightly loaded topologies the distribution amongst bolts
>>>> is pretty even. If you use all grouping then all bolts receive a copy of
>>>> the tuple.
>>>> Use shuffle grouping and each of your bolts will get about 1/3 of the
>>>> workload.
>>>>
>>>> Tomas
>>>>
>>>>
>>>> On Wed, Jul 16, 2014 at 7:05 PM, Andrew Xor <
>>>> andreas.gramme...@gmail.com> wrote:
>>>>
>>>>> H
>>>>> ​i,
>>>>>
>>>>>  I am trying to distribute the spout output to it's subscribed bolts
>>>>> evenly; let's say that I have a spout that emits tuples and three bolts
>>>>> that are subscribed to it. I want each of the three bolts to receive 1/3
>>>>> rth of the output (or emit a tuple to each one of these bolts in turns).
>>>>> Unfortunately as far as I understand all bolts will receive all of the
>>>>> emitted tuples of that particular spout regardless of the grouping defined
>>>>> (as grouping from my understanding is for bolt *tasks* not actual bolts).
>>>>>
>>>>>  I've searched a bit and I can't seem to find a way to accomplish
>>>>> that...​ is there a way to do that or I am searching in vain?
>>>>>
>>>>> Thanks.
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Tomas Mazukna
>>>> 678-557-3834
>>>>
>>>
>>>
>>
>


Re: Acking is delayed by 5 seconds (in disruptor queue ?)

2014-07-18 Thread Michael Rose
Run your producer code in another thread to fill a LBQ, poll that with
nextTuple instead.

You should never be blocking yourself inside a spout.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Fri, Jul 18, 2014 at 1:03 PM, Itai Frenkel  wrote:

>  Hello again,
>
>
>  Attached is a simplified reproduction (without the ShellSpout, but the
> concepts are the same).
>
>
>  It seems that ack() and nextTuple() are always called on the same
> thread. That means that there is an inherent tradeoff.
>
> Either nextTuple sleeps a few ms  (and then the ShellSpout would serialize
> alot of nextTuple messages)
>
> or nextTuple can sleep but then the ack is delayed.
>
>
>  Is there a way around this limitation?
>
>
>  Itai
>  --
> *From:* Itai Frenkel 
> *Sent:* Thursday, July 17, 2014 9:42 PM
> *To:* user@storm.incubator.apache.org
> *Subject:* Acking is delayed by 5 seconds (in disruptor queue ?)
>
>   Hello,
>
>  I have noticed that an ack takes 5 seconds to pass from the bolt to the
> spout (see debug log below). It is a simple topology with 1 spout, 1 bolt
> and 1 acker all running on the same worker. The spout and the bolt are
> ShellSpout and ShellBolt respectively.
>
>  It looks like the message is delayed in the LMAX disruptor​ queue.
>  How can I reduce this delay to ~1ms ?
>
>  Regards,
>  Itai
>
>
>  2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to
> tuple 2759481868963667531
> 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack
> [-357211617823660063 -3928495599512172728]
> 2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Bolt sent ack to
> tuple 2759481868963667531
> 2014-07-17 18:30:30 b.s.d.executor [INFO] Processing received message
> source: bolt:2, stream: __ack_ack, id: {}, [-357211617823660063
> -3928495599512172728]
> 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting direct: 3; __acker
> __ack_ack [-357211617823660063]
> 2014-07-17 18:30:35 b.s.d.executor [INFO] Processing received message
> source: __acker:1, stream: __ack_ack, id: {}, [-357211617823660063]
> 2014-07-17 18:30:35 b.s.d.executor [INFO] Acking message 1138
>
>
>
>


Re: Acking is delayed by 5 seconds (in disruptor queue ?)

2014-07-18 Thread Michael Rose
I have no experience with multilang spouts, however my impression from the
docs is that you should be handling your own multiplexing if you're writing
a shellspout. Otherwise if you block for 5 seconds emitting a tuple, you
cannot process an ack until that's done. I'd experiment with that, if you
change the sleep.spout.wait time to be 500ms and you don't block in your
spout (instead returning "sync") it should back off just as it does with a
normal spout (see
https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java,
"sync" is a no-op).

The post you linked to was mine, and for a long time that was true
(especially 0.6 and 0.7). Since Storm 0.8, the spout wait strategy will do
automatic backoffs when no tuples are emitted. The only time I've
intentionally blocked in a spout after 0.8.0 is to control throughout (e.g.
only allow 10/s during development). I've never built a multilang spout
before.

Spouts, like bolts, run in a single-threaded context so blocking at all
prevents acks/fails/emits from being done until the thread is unblocked.
That is why it's best to have another thread dealing with IO and
asynchronously feeding a concurrent data structure the spout can utilize.
For example, in our internal Amazon SQS client our IO thread continuously
fetches up to 10 messages per get and shoves them into a
LinkedBlockingQueue (until full, then it blocks the IO thread only until
the spout emits clear up room).

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Fri, Jul 18, 2014 at 1:34 PM, Itai Frenkel  wrote:

>  So can you please explain this sentence from the multilang documentation?
>
>
>  "Also like ISpout, if you have no tuples to emit for a next, you should
> sleep for a small amount of time before syncing. ShellSpout will not
> automatically sleep for you"
>
> https://storm.incubator.apache.org/documentation/Multilang-protocol.html
>
>
>  I read it as: "Unless you sleep a small amount of time before syncing,
> the ShellSpout would serialize one "nextTuple" message per 1ms (see
> configuration below) which would require much more CPU cycles"
>
> topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy"
> topology.sleep.spout.wait.strategy.time.ms: 1
>
> You can also refer to the answer here, which refers to regular Spouts
> doing sleep as well:
>
> https://groups.google.com/forum/#!topic/storm-user/OSjaVgTK5m0
>
>
>  Regards,
>
> Itai
>
>
>
>  --
> *From:* Michael Rose 
> *Sent:* Friday, July 18, 2014 10:18 PM
> *To:* user@storm.incubator.apache.org
> *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)
>
>  Run your producer code in another thread to fill a LBQ, poll that with
> nextTuple instead.
>
>  You should never be blocking yourself inside a spout.
>
>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> mich...@fullcontact.com
>
>
> On Fri, Jul 18, 2014 at 1:03 PM, Itai Frenkel  wrote:
>
>>  Hello again,
>>
>>
>>  Attached is a simplified reproduction (without the ShellSpout, but the
>> concepts are the same).
>>
>>
>>  It seems that ack() and nextTuple() are always called on the same
>> thread. That means that there is an inherent tradeoff.
>>
>> Either nextTuple sleeps a few ms  (and then the ShellSpout would
>> serialize alot of nextTuple messages)
>>
>> or nextTuple can sleep but then the ack is delayed.
>>
>>
>>  Is there a way around this limitation?
>>
>>
>>  Itai
>>  --
>> *From:* Itai Frenkel 
>> *Sent:* Thursday, July 17, 2014 9:42 PM
>> *To:* user@storm.incubator.apache.org
>> *Subject:* Acking is delayed by 5 seconds (in disruptor queue ?)
>>
>>Hello,
>>
>>  I have noticed that an ack takes 5 seconds to pass from the bolt to the
>> spout (see debug log below). It is a simple topology with 1 spout, 1 bolt
>> and 1 acker all running on the same worker. The spout and the bolt are
>> ShellSpout and ShellBolt respectively.
>>
>>  It looks like the message is delayed in the LMAX disruptor​ queue.
>>  How can I reduce this delay to ~1ms ?
>>
>>  Regards,
>>  Itai
>>
>>
>>  2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to
>> tuple 2759481868963667531
>> 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack
>> [-35721

Re: Acking is delayed by 5 seconds (in disruptor queue ?)

2014-07-18 Thread Michael Rose
1)

Lets say we have it set at a limit of 100 items. The LBQ currently has 97
items in it. The SQS client runs again, pulls 10 messages, and successfully
inputs 3. The other 7 are blocked waiting for the queue to clear out. No
new HTTP requests are made to SQS while this LBQ is full (essentially this
is just called in a loop). nextTuple() eventually comes around, 6 blocked
for insertion. etc. etc. until all 6 blocked messages are inserted into the
LBQ. At this point, we call out to the SQS client again to fetch another 10
(and hopefully the LBQ has not completely drained by the time the SQS
client returns another 1-10 messages).

This model isn't picky about which SQS client is grabbing messages. SQS
doesn't guarantee order (or single delivery) anyways. In one of our
topologies, we have 8 spout instances consuming the same SQS queue for
throughput purposes. Maybe I've misunderstood your question though. We
normally don't have multiple topologies consuming the same queue, but
depending on the data there's no reason we couldn't.

Also in this model, we don't use a blocking poll method, if the LBQ is
empty, we skip emission and let Storm handle backoff if it wants to (see
below).

2) By backoff I mean, your spout hasn't emitted in a while, it's going to
slow down on calling nextTuple() to not busy-wait your entire CPU. If
you're at maxSpoutPending limit, nextTuple is also not called until an ack
or fail has been received (which is one of the reasons its so important to
not block in the spout as much as possible).

Storm will use the
https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java
when you don't emit. By default, it waits only 1ms before calling again
(not exponential, just fixed), which is enough to prevent 10-100k/s polling
behavior but not significantly increase latency -- if you're ever not
emitting, you can probably afford to sleep for 1ms. We actually have it set
to 10ms, given that 99.9% of the time we'll have a message ready for
processing.

An alternative to this is the
https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/NothingEmptyEmitStrategy.java
if you do see an impact on your throughput--but I've never needed this.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Fri, Jul 18, 2014 at 2:41 PM, Itai Frenkel  wrote:

>  Thanks for the thorough and quick answer.
>
>
>  Two follow up questions. The context is performing low latency stream
> manipulation (hopefully process a message the moment it arrives, maybe a
> few milliseconds later).
>
>
>  1. What happens if the LBQ contains 10 items, while the Storm topology
> does not call nextTuple because of backoffs ? Wouldn't it be better of for
> another Amazon SQS client to handle these items? Or are you assuming a
> single Storm topology is the sole handler of these items ?
>
>
>  2. If by backoff, you mean storm topology cannot handle any more
> messages, or maxSpoutPending is reached, then ignore this question. If by
> backoff you mean exponential backoff then I am worried about a message
> arriving to the queue and nextTuple is not called for a long time (more
> than a few milliseconds).
>
>
>  Regards,
>
> Itai
>  --
> *From:* Michael Rose 
> *Sent:* Friday, July 18, 2014 11:27 PM
>
> *To:* user@storm.incubator.apache.org
> *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)
>
>  I have no experience with multilang spouts, however my impression from
> the docs is that you should be handling your own multiplexing if you're
> writing a shellspout. Otherwise if you block for 5 seconds emitting a
> tuple, you cannot process an ack until that's done. I'd experiment with
> that, if you change the sleep.spout.wait time to be 500ms and you don't
> block in your spout (instead returning "sync") it should back off just as
> it does with a normal spout (see
> https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java,
> "sync" is a no-op).
>
> The post you linked to was mine, and for a long time that was true
> (especially 0.6 and 0.7). Since Storm 0.8, the spout wait strategy will do
> automatic backoffs when no tuples are emitted. The only time I've
> intentionally blocked in a spout after 0.8.0 is to control throughout (e.g.
> only allow 10/s during development). I've never built a multilang spout
> before.
>
>  Spouts, like bolts, run in a single-threaded context so blocking at all
> prevents acks/fails/emits from being done until the thread is u

Re: Acking is delayed by 5 seconds (in disruptor queue ?)

2014-07-18 Thread Michael Rose
Worth clarifying for anyone else in this thread that a LBQ separating
production from consumption is not a default thing in Storm, it's something
we cooked up to prefetch elements from slow/batching resources.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Fri, Jul 18, 2014 at 3:16 PM, Itai Frenkel  wrote:

>  Got it! thanks.
>  ------
> *From:* Michael Rose 
> *Sent:* Saturday, July 19, 2014 12:10 AM
>
> *To:* user@storm.incubator.apache.org
> *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)
>
>  1)
>
>  Lets say we have it set at a limit of 100 items. The LBQ currently has
> 97 items in it. The SQS client runs again, pulls 10 messages, and
> successfully inputs 3. The other 7 are blocked waiting for the queue to
> clear out. No new HTTP requests are made to SQS while this LBQ is full
> (essentially this is just called in a loop). nextTuple() eventually comes
> around, 6 blocked for insertion. etc. etc. until all 6 blocked messages are
> inserted into the LBQ. At this point, we call out to the SQS client again
> to fetch another 10 (and hopefully the LBQ has not completely drained by
> the time the SQS client returns another 1-10 messages).
>
>  This model isn't picky about which SQS client is grabbing messages. SQS
> doesn't guarantee order (or single delivery) anyways. In one of our
> topologies, we have 8 spout instances consuming the same SQS queue for
> throughput purposes. Maybe I've misunderstood your question though. We
> normally don't have multiple topologies consuming the same queue, but
> depending on the data there's no reason we couldn't.
>
>  Also in this model, we don't use a blocking poll method, if the LBQ is
> empty, we skip emission and let Storm handle backoff if it wants to (see
> below).
>
>  2) By backoff I mean, your spout hasn't emitted in a while, it's going
> to slow down on calling nextTuple() to not busy-wait your entire CPU. If
> you're at maxSpoutPending limit, nextTuple is also not called until an ack
> or fail has been received (which is one of the reasons its so important to
> not block in the spout as much as possible).
>
>  Storm will use the
> https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java
> when you don't emit. By default, it waits only 1ms before calling again
> (not exponential, just fixed), which is enough to prevent 10-100k/s polling
> behavior but not significantly increase latency -- if you're ever not
> emitting, you can probably afford to sleep for 1ms. We actually have it set
> to 10ms, given that 99.9% of the time we'll have a message ready for
> processing.
>
>  An alternative to this is the
> https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/NothingEmptyEmitStrategy.java
> if you do see an impact on your throughput--but I've never needed this.
>
>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> mich...@fullcontact.com
>
>
> On Fri, Jul 18, 2014 at 2:41 PM, Itai Frenkel  wrote:
>
>>  Thanks for the thorough and quick answer.
>>
>>
>>  Two follow up questions. The context is performing low latency stream
>> manipulation (hopefully process a message the moment it arrives, maybe a
>> few milliseconds later).
>>
>>
>>  1. What happens if the LBQ contains 10 items, while the Storm topology
>> does not call nextTuple because of backoffs ? Wouldn't it be better of for
>> another Amazon SQS client to handle these items? Or are you assuming a
>> single Storm topology is the sole handler of these items ?
>>
>>
>>  2. If by backoff, you mean storm topology cannot handle any more
>> messages, or maxSpoutPending is reached, then ignore this question. If by
>> backoff you mean exponential backoff then I am worried about a message
>> arriving to the queue and nextTuple is not called for a long time (more
>> than a few milliseconds).
>>
>>
>>  Regards,
>>
>> Itai
>>  --
>> *From:* Michael Rose 
>> *Sent:* Friday, July 18, 2014 11:27 PM
>>
>> *To:* user@storm.incubator.apache.org
>> *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)
>>
>>   I have no experience with multilang spouts, however my impression from
>> the docs is that you should be handling your own multiplexing if you're
>> writing a shellspout. Otherw

Re: Fetching data from a REST api into storm.

2014-07-24 Thread Michael Rose
Anything you can do in normal Java code, you can do inside a Storm bolt.
Check out any Java HTTP client (OkHttp, Apache HTTP Client, JerseyClient,
URLConnection etc.) and use it to fetch the data you need.

If your REST call takes a long time, it might be worth doing so
asynchronously and emitting the results downstream when the thread returns.
Just make sure that you're synchronizing your OutputCollector when emitting
from a multithreaded context.

Hope that helps.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Thu, Jul 24, 2014 at 10:47 AM, Yogini Gulkotwar <
yogini.gulkot...@flutura.com> wrote:

> Hello friends,
>
> I am trying to fetch data from one of my tools which has a REST api
> feature. So basically, I need to know if it is possible to make calls to
> the web api of my tool and fetch data.
>
> It would be great if someone could share some example code where something
> similar has been achieved.
>
>
>
> Thanks & Regards,
> Yogini Gulkotwar│Data Scientist
>
>


Re: Storm useable for video processing??

2014-07-31 Thread Michael Rose
There's no reason you couldn't. If you look in the archives there was
someone else who'd managed to do some video processing with Storm.

If you make things work, consider sharing a blog post -- that'd be really
great stuff! :)

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Thu, Jul 31, 2014 at 6:15 PM, Andrew Xor 
wrote:

> Hey,
>
>  Well it depends on your expectations really and your application's needs.
> If you can indeed transform the video feed (frames/blocks w/e) into
> break-able tuples then you could process it using storm. Although I am
> afraid that you will have to set a realistic view on your expectations
> because usually video processing and feature extraction is quite taxing on
> hardware.  This (imho) will result in a (very) noticeable latency increase
> in your processing tuple throughput... but other than that you could use
> storm; after all video is just data albeit a lot.
>
> ​Hope this helped.​
>
>
> On Fri, Aug 1, 2014 at 2:10 AM, Patrick Wiener 
> wrote:
>
>> Hello everybody,
>>
>> I’ll ask straightforward:
>> *Has anybody heard or read something about Storm being used for real-time
>> video/image processing? and could provide some information (sources, code,
>> …)*
>>
>> Reason: I’m working on a project concerning live-stream analysis of video
>> data. It should be investigated wether Storm could be a promising approach.
>>
>> Thank you very much in advance!
>> Patrick
>>
>
>


Re: Implementing a barrier mechanism in storm

2014-07-31 Thread Michael Rose
It's another case of a streaming join. I've done this before, there aren't
too many gotchas, other than you need a datastructure which purges stale
unresolved joins beyond the tuple timeout time (I used a Guava cache for
this).

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Thu, Jul 31, 2014 at 10:44 PM, Varun Vijayaraghavan 
wrote:

> That's interesting. Note that I have not used such a pattern before - and
> have done something similar. I have not used trident - so this probably
> will not answer your last question completely :)
>
> If you set up the topology such that links between bolt {1, 2, 3} and
> final bolt is stream grouped by "msgId" - you could keep the partially
> processed results in memory (or in a persisted state somewhere) - till you
> see the processed result for all the bolts.
>
> I would also expire msgIds which have not seen further results for beyond
> a certain threshold time.
>
> What do you think?
>
>
>
> On Fri, Aug 1, 2014 at 12:35 AM, Spico Florin 
> wrote:
>
>> Hello!
>>   I have a case study where the same message (identified by an id) is
>> spread over a couple of processing bolts and a final bolt should act as a
>> barrier. This final bolt should do its job on the message ID only when all
>> the upfront bolts have finished their process on the message.
>>
>> As sketch is bellow
>>
>>
>> Spout -> (msgid1, payload)->process bolt 1 (msgId1, payloadb1)|
>>   ->process bolt 2(msgId1,
>> payloadb2)  |->final bolt(msg1,finalp)
>>   ->process bolt 3(msgId1,
>> payloadb3)  |
>>
>> So the final bolt should not start a work on a message id till the
>> message was not processed by all the 3 processing bolts.
>>   My question are:
>> 1. Can be these case viable for storm?
>> 2. If the answer for the first question is yes, how can I achieve this
>> request?
>> 3. Is possible to achieve this request without using trident?
>> I look forward for your answers.
>> Thanks,
>>   Florin
>>
>
>
>
> --
> - varun :)
>


Re: Implementing a barrier mechanism in storm

2014-08-01 Thread Michael Rose
In the prepare method you receive a copy of the topology context which can
tell you all of the stream-components you're subscribed to.

You could make it a static field or just fields group on messageId
On Aug 1, 2014 12:26 AM, "Spico Florin"  wrote:

> Hello!
>   Thanks for your reply. I was thinking to use such the cache mechanism
> (the guava cache as a static field in the final bolt). The question that I
> have now, is how do you keep track of the bolts upfront? Suppose that you
> have 3 bolts then you are counting the number of bolts that were processing
> one message Map?
> Can you send the name of the bolts that have been processed the message to
> the final bolt, and in the final bolt to check if the the list of all
> processing bolts is the same?
>
> What is the best approach here?
>
> Thanks .
> Regards,
>   Florin
>
> On Fri, Aug 1, 2014 at 7:51 AM, Michael Rose 
> wrote:
>
>> It's another case of a streaming join. I've done this before, there
>> aren't too many gotchas, other than you need a datastructure which purges
>> stale unresolved joins beyond the tuple timeout time (I used a Guava cache
>> for this).
>>
>> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>> mich...@fullcontact.com
>>
>>
>> On Thu, Jul 31, 2014 at 10:44 PM, Varun Vijayaraghavan <
>> varun@gmail.com> wrote:
>>
>>> That's interesting. Note that I have not used such a pattern before -
>>> and have done something similar. I have not used trident - so this probably
>>> will not answer your last question completely :)
>>>
>>> If you set up the topology such that links between bolt {1, 2, 3} and
>>> final bolt is stream grouped by "msgId" - you could keep the partially
>>> processed results in memory (or in a persisted state somewhere) - till you
>>> see the processed result for all the bolts.
>>>
>>> I would also expire msgIds which have not seen further results for
>>> beyond a certain threshold time.
>>>
>>> What do you think?
>>>
>>>
>>>
>>> On Fri, Aug 1, 2014 at 12:35 AM, Spico Florin 
>>> wrote:
>>>
>>>> Hello!
>>>>   I have a case study where the same message (identified by an id) is
>>>> spread over a couple of processing bolts and a final bolt should act as a
>>>> barrier. This final bolt should do its job on the message ID only when all
>>>> the upfront bolts have finished their process on the message.
>>>>
>>>> As sketch is bellow
>>>>
>>>>
>>>> Spout -> (msgid1, payload)->process bolt 1 (msgId1, payloadb1)|
>>>>   ->process bolt 2(msgId1,
>>>> payloadb2)  |->final bolt(msg1,finalp)
>>>>   ->process bolt 3(msgId1,
>>>> payloadb3)  |
>>>>
>>>> So the final bolt should not start a work on a message id till the
>>>> message was not processed by all the 3 processing bolts.
>>>>   My question are:
>>>> 1. Can be these case viable for storm?
>>>> 2. If the answer for the first question is yes, how can I achieve this
>>>> request?
>>>> 3. Is possible to achieve this request without using trident?
>>>> I look forward for your answers.
>>>> Thanks,
>>>>   Florin
>>>>
>>>
>>>
>>>
>>> --
>>> - varun :)
>>>
>>
>>
>


Re: Question on failing ack

2014-08-25 Thread Michael Rose
Hi Kushan,

Depending on the Kafka spout you're using, it could be doing different
things when it failed. However, if it's running reliably, the Cassandra
insertion failures would have forced a replay from the spout until they had
completed.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com


On Mon, Aug 25, 2014 at 4:42 PM, Kushan Maskey <
kushan.mas...@mmillerassociates.com> wrote:

> I have set up topology to load a very large volume of data. Recently I
> just loaded about 60K records and found out that there are some failed acks
> on few spouts but non on the bolts. Storm completed running and seem to
> look stable. Initially i started with a lesser amount of data like about
> 500 records  successfully and then increased up to 60K where i saw the
> failed acks.
>
> Questions:
> 1. Does that mean that the spout was not able to read some messages from
> Kafka? Since there are no failed ack on the bolts as per UI, what ever the
> message received has been successfully processed by the bolts.
> 2. how do i interpret the numbers of failed acks like this acked:315500
>  and failed: 2980.
> Does this mean that 2980 records failed to be processed? Is this is the
> case then, how do I avoid this from happening because I will be loosing
> 2980 records.
> 3. I also see that few of the records failed to be inserted into Cassandra
> database. What is the best way to reprocess the data again as it is quite
> difficult to do it through the batch process that I am currently running.
>
> LMK, thanks.
>
> --
> Kushan Maskey
> 817.403.7500
>


Re: Storm performing very slow

2014-09-22 Thread Michael Rose
Storm is not your bottleneck. Check your Storm code to 1) ensure you're
parallelizing your writes and 2) you're batching writes to your external
resources if possible. Some quick napkin math shows you only doing 110
writes/s, which seems awfully low.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
mich...@fullcontact.com

On Mon, Sep 22, 2014 at 8:05 PM, Kushan Maskey <
kushan.mas...@mmillerassociates.com> wrote:

> I am trying to load 20 M records into Cassandra database through
> Kafka-Storm. I am able to post all the data in 5 mins into Kafka. But
> reading it from storm and inserting into Cassandra, Couch and Solr is kind
> of very slow. It has been running for past 5 hours and so far only 2
> Million records.
>
> How do I make the storm perform faster? Coz in this pace it will take
> couple of days to load all the data.
>
> --
> Kushan Maskey
>
>