Re: When i set TOPOLOGY_TICK_TUPLE_FREQ_SECS into my Storm config, I get Non-system tuples should never be sent to __system bolt

2014-06-20 Thread Chris Bedford
Hi.

after poking around a bit I found that I went about things wrong.  This
article helped me get my bearings:
http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/

The trick is to not set TOPOLOGY_TICK_TUPLE_FREQ_SECS globally, but instead
to set it in every bolt that you want to receive ticks, by implementing
getComponentConfiguration  something like this





  @Override
  public MapString, Object getComponentConfiguration() {
MapString, Object conf = new HashMapString, Object();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
return conf;
  }




Hope this is helpful for other people seeking to incorporate ticks  (the
clock kind)  into their topologies.






On Thu, Jun 19, 2014 at 1:08 AM, Chris Bedford ch...@buildlackey.com
wrote:



 Hello, Storm experts:

 I'm trying to use the system 'ticks' in Storm by setting
 TOPOLOGY_TICK_TUPLE_FREQ_SECS and then acting on the
 received tick tuples.  However, I find that when I set
 TOPOLOGY_TICK_TUPLE_FREQ_SECS  into the config
 of even a very very simple topology  (one dummy spout that just emits to
 nowhere), i get the following error:

 Non-system tuples should never be sent to __system bolt


 I see the following in the log:


 10665 [Thread-21-__system] INFO  backtype.storm.daemon.executor -
 Processing received message source: __system:-1, stream: __tick, id: {}, [1]


 followed by this trace:

 java.lang.RuntimeException: java.lang.RuntimeException: Non-system tuples
 should never be sent to __system bolt.
 at
 backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
 ~[storm-core-0.9.0-rc2.jar:na]
  at
 backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
 ~[storm-core-0.9.0-rc2.jar:na]
 at
 backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
 ~[storm-core-0.9.0-rc2.jar:na]
  at
 backtype.storm.daemon.executor$fn__3495$fn__3507$fn__3554.invoke(executor.clj:729)
 ~[storm-core-0.9.0-rc2.jar:na]
 at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403)
 ~[storm-core-0.9.0-rc2.jar:na]
  at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
 at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60]
 Caused by: java.lang.RuntimeException: Non-system tuples should never be
 sent to __system bolt.
 at backtype.storm.metric.SystemBolt.execute(SystemBolt.java:132)
 ~[storm-core-0.9.0-rc2.jar:na]
  at
 backtype.storm.daemon.executor$fn__3495$tuple_action_fn__3497.invoke(executor.clj:614)
 ~[storm-core-0.9.0-rc2.jar:na]
 at
 backtype.storm.daemon.executor$mk_task_receiver$fn__3418.invoke(executor.clj:385)
 ~[storm-core-0.9.0-rc2.jar:na]
  at
 backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43)
 ~[storm-core-0.9.0-rc2.jar:na]
 at
 backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
 ~[storm-core-0.9.0-rc2.jar:na]
  ... 6 common frames omitted


 I certainly did not wire anything to send tuples to the system bolt, as
 you can see from the simple test topology i'm including below.
 I'm wondering if there is any other set up that i need to do.  I'm also
 enclosing most of the log output (sorry it is long) just in case
 that is helpful.

 Thanks in advance to whoever can shed light on this...
   - chris


The CODE   --  a testNG test in groovy... but it can be changed to
 java with a few semicolons here and there ... 

 package com.example
 import backtype.storm.Config
 import backtype.storm.LocalCluster
 import backtype.storm.spout.SpoutOutputCollector
 import backtype.storm.task.TopologyContext
 import backtype.storm.topology.OutputFieldsDeclarer
 import backtype.storm.topology.TopologyBuilder
 import backtype.storm.topology.base.BaseRichSpout
 import backtype.storm.tuple.Fields
 import backtype.storm.tuple.Values
 import backtype.storm.utils.Utils
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 import org.testng.annotations.Test
 /*
  * Author: cbedford
  * Date: 6/18/14
  * Time: 10:05 PM
  */
 public class TickTest {

 public static class DummySpout extends BaseRichSpout {
 public static Logger LOG =
 LoggerFactory.getLogger(DummySpout.class);
 boolean _isDistributed;
 SpoutOutputCollector _collector;


 private static Integer count = 0;

 public DummySpout() {
 this(true);
 }

 public DummySpout(boolean isDistributed) {
 _isDistributed = isDistributed;
 }

 public void open(Map conf, TopologyContext context,
 SpoutOutputCollector collector) {
 _collector = collector;
 }

 public void close() {}

 public void nextTuple() {
 Utils.sleep(100);
 count += 1;
 final Values values =
 new Values(
 commandFoo,
 sourceFoo);
 _collector.emit(values);
 }

 public void

When i set TOPOLOGY_TICK_TUPLE_FREQ_SECS into my Storm config, I get Non-system tuples should never be sent to __system bolt

2014-06-19 Thread Chris Bedford
]
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.daemon.executor$fn__3495$fn__3507$fn__3554.invoke(executor.clj:729)
~[storm-core-0.9.0-rc2.jar:na]
at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403)
~[storm-core-0.9.0-rc2.jar:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60]
Caused by: java.lang.RuntimeException: Non-system tuples should never be
sent to __system bolt.
at backtype.storm.metric.SystemBolt.execute(SystemBolt.java:132)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.daemon.executor$fn__3495$tuple_action_fn__3497.invoke(executor.clj:614)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.daemon.executor$mk_task_receiver$fn__3418.invoke(executor.clj:385)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
~[storm-core-0.9.0-rc2.jar:na]
... 6 common frames omitted
10721 [Thread-21-__system] INFO  backtype.storm.util - Halting process:
(Worker died)

-- 
Chris Bedford

Founder  Lead Lackey
Build Lackey Labs:  http://buildlackey.com
Go Grails!: http://blog.buildlackey.com


Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?

2014-06-02 Thread Chris Bedford
Yes.. if i used prepare or open on spouts or bolts it would work, but
unfortunately it would be a bit brittle.  I'd have to include a spout or
bolt just for initializing my invariant code... i'd rather do that when the
topology is activated on the worker.. so this seems like a good use of an
 activated()method on the StormTopology class   (where activated()
would be called after the StormTopology is deserialized by the worker node
process).

But, if there is no such method, I will make do with what is there.

thanks for your response.

chris



On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant vaill...@animetrics.com
wrote:

 The bolt base classes have a prepare method:


 https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html

 and the spout base classes have a similar activate method:


 https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html

 Is that sufficient for your needs or were you thinking of something
 different?

 Marc

 On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote:
  Hi there -
 
  I would like to set up some state that spouts and bolts share, and I'd
 like to
  prepare this state when the StormTopology gets 'activated' on a worker.
 
  it would be great if the StormTopology had something like a prepare or
 open
  method to indicate when it is starting.  I looked but i could find no
 such API.
Maybe I should submit an enhancement request ?
 
  Thanks in advance for your responses,
-  Chris
 
 
 
  [ if anyone is curious, the shared state is for all my application code
 to
  check or not check invariants.  the invariant checking takes additional
 time,
  so we don't want to do it in production.. but during testing/development
 it
  helps catch bugs].
 
  --
  Chris Bedford
 
  Founder  Lead Lackey
  Build Lackey Labs:  http://buildlackey.com
  Go Grails!: http://blog.buildlackey.com
 
 




-- 
Chris Bedford

Founder  Lead Lackey
Build Lackey Labs:  http://buildlackey.com
Go Grails!: http://blog.buildlackey.com


Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?

2014-06-02 Thread Chris Bedford
This looks promising. thanks.

hope you don't mind one more question  --  if i create my own
implementation of ITaskHook and add it do the config as you illustrated in
prev. msg..will the prepare() method of my implementation be called
exactly once shortly after StormTopology is deserialized by the worker node
process ?

 - chris




On Mon, Jun 2, 2014 at 7:09 PM, Michael Rose mich...@fullcontact.com
wrote:

 You don't have to include a specific bolt for init code. It's not
 difficult to push your init code into a separate class and call it from
 your bolts, lock on that class, run init, and then allow other instances to
 skip over it.

 Without changing bolt/spout code, I've taken to including a task hook for
 init code (e.g. properties / Guice). Check out BaseTaskHook, it's easily
 extendible and can be included pretty easily too:

 stormConfig.put(Config.TOPOLOGY_AUTO_TASK_HOOKS,
 Lists.newArrayList(MyTaskHook.class.getName()));

 Michael Rose (@Xorlev https://twitter.com/xorlev)
 Senior Platform Engineer, FullContact http://www.fullcontact.com/
 mich...@fullcontact.com


 On Mon, Jun 2, 2014 at 7:25 PM, Chris Bedford ch...@buildlackey.com
 wrote:

 Yes.. if i used prepare or open on spouts or bolts it would work, but
 unfortunately it would be a bit brittle.  I'd have to include a spout or
 bolt just for initializing my invariant code... i'd rather do that when the
 topology is activated on the worker.. so this seems like a good use of an
  activated()method on the StormTopology class   (where activated()
 would be called after the StormTopology is deserialized by the worker node
 process).

 But, if there is no such method, I will make do with what is there.

 thanks for your response.

 chris



 On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant vaill...@animetrics.com
 wrote:

 The bolt base classes have a prepare method:


 https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html

 and the spout base classes have a similar activate method:


 https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html

 Is that sufficient for your needs or were you thinking of something
 different?

 Marc

 On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote:
  Hi there -
 
  I would like to set up some state that spouts and bolts share, and I'd
 like to
  prepare this state when the StormTopology gets 'activated' on a worker.
 
  it would be great if the StormTopology had something like a prepare or
 open
  method to indicate when it is starting.  I looked but i could find no
 such API.
Maybe I should submit an enhancement request ?
 
  Thanks in advance for your responses,
-  Chris
 
 
 
  [ if anyone is curious, the shared state is for all my application
 code to
  check or not check invariants.  the invariant checking takes
 additional time,
  so we don't want to do it in production.. but during
 testing/development it
  helps catch bugs].
 
  --
  Chris Bedford
 
  Founder  Lead Lackey
  Build Lackey Labs:  http://buildlackey.com
  Go Grails!: http://blog.buildlackey.com
 
 




 --
 Chris Bedford

 Founder  Lead Lackey
 Build Lackey Labs:  http://buildlackey.com
 Go Grails!: http://blog.buildlackey.com






-- 
Chris Bedford

Founder  Lead Lackey
Build Lackey Labs:  http://buildlackey.com
Go Grails!: http://blog.buildlackey.com


Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?

2014-06-01 Thread Chris Bedford
Hi there -

I would like to set up some state that spouts and bolts share, and I'd like
to prepare this state when the StormTopology gets 'activated' on a worker.

it would be great if the StormTopology had something like a prepare or open
method to indicate when it is starting.  I looked but i could find no such
API.   Maybe I should submit an enhancement request ?

Thanks in advance for your responses,
  -  Chris



[ if anyone is curious, the shared state is for all my application code to
check or not check invariants.  the invariant checking takes additional
time, so we don't want to do it in production.. but during
testing/development it helps catch bugs].

-- 
Chris Bedford

Founder  Lead Lackey
Build Lackey Labs:  http://buildlackey.com
Go Grails!: http://blog.buildlackey.com


Best way to get JMX access to storm metrics for spout tuples emitted / acked | bolt tuples emitted, latency etc.

2014-03-17 Thread Chris Bedford
Hi, I'm interested in getting metrics via JMX on not onlycontainer
level  factors (such as # of garbage collects, heap usage,etc.),   but
also metrics that describe how spouts and bolts are performing  (e.g., # of
tuples emitted, # transferred -- the same kind of stuff that the storm UI
shows.)

I ran across this project :
 :
https://github.com/ooyala/metrics_storm/blob/master/src/ooyala/common/metrics_storm/MetricsStorm.scala


I was just wondering if this is the best best to pursue.  Does anyone have
any concrete experience with this that they can share?
thanx!

 chris


Re: Best way to get JMX access to storm metrics for spout tuples emitted / acked | bolt tuples emitted, latency etc.

2014-03-17 Thread Chris Bedford
Thanks, Noel and Michael !


On Mon, Mar 17, 2014 at 5:55 PM, Noel Milton Vega 
nmv...@computingarchitects.com wrote:

  I used Nimbus Thrift on my end.

 The one thing to note about that approach is that, if you're planning to
 plot
 those metrics (say, on Graphite), the results aren't very interesting
 because the
 metrics -- emitted, transferred, acks, etc. -- are aggregates and simply
 monotonically
 increase, which lead to not so insightful graphs (... more-or-less lines
 that
 go up from left-to-right forever).

 However, I took consecutive metrics samples from Nimbus, say,
 every 5 seconds, and plotted the difference between the two samples
 divided by 5 to plot
 average quantities instead. That was insightful. So while (except for
 latency
 type metrics) Nimbus doesn't provide you with average rates for
 emits/transferred/acks
 (and you won't see them on the UI either), you can easily derive them
 manually
 and send that to Graphite (or to your charting platform). If using
 Graphite, btw.,
 you also have the option of letting Graphite calculate the averages on the
 back-end
 for you, too (giving you options).

 Finally, for application specific metrics exposed via Java Mbeans,
 consider Codahale's
 Metric's API (in-band), and JMXtrans (out-of-band).

 Noel Milton Vega
 Dimension Data, LLC.
 nmv...@didata.us
 nmv...@computingarchitects.com



  On 03/17/2014 08:30 PM, Michael Rose wrote:

 Depending on what you need, you could use the Thrift interface to Nimbus
 to grab the statistics. The UI project does do some calculations on the raw
 metrics, so it's not quite the same. The algorithms it uses aren't too
 difficult to replicate.

  We ended up building something very similar to what Ooyala did,
 customized for our specific needs, it's an excellent pattern.

  Michael Rose (@Xorlev https://twitter.com/xorlev)
 Senior Platform Engineer, FullContact http://www.fullcontact.com/
 mich...@fullcontact.com


 On Mon, Mar 17, 2014 at 6:21 PM, Chris Bedford ch...@buildlackey.comwrote:



 Hi, I'm interested in getting metrics via JMX on not onlycontainer
 level  factors (such as # of garbage collects, heap usage,etc.),   but
 also metrics that describe how spouts and bolts are performing  (e.g., # of
 tuples emitted, # transferred -- the same kind of stuff that the storm UI
 shows.)

  I ran across this project :
  :
 https://github.com/ooyala/metrics_storm/blob/master/src/ooyala/common/metrics_storm/MetricsStorm.scala


  I was just wondering if this is the best best to pursue.  Does anyone
 have any concrete experience with this that they can share?
 thanx!

   chris






-- 
Chris Bedford

Founder  Lead Lackey
Build Lackey Labs:  http://buildlackey.com
Go Grails!: http://blog.buildlackey.com


when running a negative test that forces exception in my spout i find that the JVM abruptly exits. Any way to intercept this ?

2014-03-17 Thread Chris Bedford
Hi there..
I tried writing a negative testng test which starts my topology in  local
mode, then submits it.
I provide parameters that deliberately cause my code called by my spout to
choke and throw
a RuntimeException.

This causes my test runner  (maven's surefire) to complain that

  The forked VM terminated without saying properly goodbye. VM crash or
System.exit called


This isn't completely mission critical to get this test done... But I'm
curious ! Does any
one have an idea how to write a negative test that causes an exception in a
spout or bolt ?

How does one trap  these seemingly fatal errors? I've considered using
dependencies in testNg, running a separate thread that waits for a bit and
then checks the log to verify the exception was thrown..

All of it comes to naught when the JVM  says  bye bye.

any ideas much appreciated ..

thanks !
 chris


Wondering where my System.out.println()'s go, also why can't i see results of logging at error() level to my class's slf4j logger

2014-03-06 Thread Chris Bedford
Hi there -

I have a problem with my  println and logger output not appearing any where
i can find it when I deploy a topology to my one node
cluster.  I don't have this issue when i debug the topology in local mode.
   In local mode i see all the output.

As a simple example I have modified the ExclamationTopology in the storm
starter project and tried to figure out where the output
was going.   My modifications are shown below (in MODIFIED
ExclamationTopology section).




So, my first question is:
   where should i be looking for std out output, and my logger output ?



I have looked in the following places:
 1)  the console where i launch the topology


 2)  the captured standard output of nimbus and supervisor processes  (that
is /tmp/*.log), given the processes are launched as below:

$STORMDIR/bin/storm nimbus   /tmp/nimbus.log 21  
$STORMDIR/bin/storm supervisor   /tmp/supervisor.log 21  

 3)  All logfiles generated under  $STORMDIR/logs/*




My second question, is how is storm internal logger output being sent to
stdout in the first place ?


I am seeing lots of logger output going to my console that looks like this:
9059 [Thread-18-count] INFO  backtype.storm.daemon.task - Emitting:
count default [snow, 20]


However, when I look in $STORMDIR/logback/cluster.xml ,  I don't see
any appender configured that would go
to stdout...

I would assume that logback would require a config line like this:

  appender name=STDOUT
class=ch.qos.logback.core.ConsoleAppender


But I see no such line in cluster.xml

This leads me to believe that there is some additional mechanism to
configure logging that I don't understand.

Any leads much much appreciated !



thanks
Chris









MODIFIED ExclamationTopology


public class ExclamationTopology {

   public static Logger LOG = LoggerFactory.getLogger(ExclamationTopology
.class);


  public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;

@Override
public void prepare(Map conf, TopologyContext context, OutputCollector
collector) {
  _collector = collector;
}

@Override
public void execute(Tuple tuple) {
 System.out.println(tuple:  + tuple);
 LOG.error(tuple: logger  + tuple);
  _collector.emit(tuple, new Values(tuple.getString(0) + !!!));
  _collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields(word));
}


  }

  public static void main(String[] args) throws Exception {
System.out.println( STARING THE PROGRAM);
 LOG.error( -logger -- STARING THE PROGRAM);
TopologyBuilder builder = new TopologyBuilder();


etc. etc.   the rest is the same as the 'stock version..






-- 
Chris Bedford

Founder  Lead Lackey
Build Lackey Labs:  http://buildlackey.com
Go Grails!: http://blog.buildlackey.com