Re: When i set TOPOLOGY_TICK_TUPLE_FREQ_SECS into my Storm config, I get Non-system tuples should never be sent to __system bolt
Hi. after poking around a bit I found that I went about things wrong. This article helped me get my bearings: http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/ The trick is to not set TOPOLOGY_TICK_TUPLE_FREQ_SECS globally, but instead to set it in every bolt that you want to receive ticks, by implementing getComponentConfiguration something like this @Override public MapString, Object getComponentConfiguration() { MapString, Object conf = new HashMapString, Object(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); return conf; } Hope this is helpful for other people seeking to incorporate ticks (the clock kind) into their topologies. On Thu, Jun 19, 2014 at 1:08 AM, Chris Bedford ch...@buildlackey.com wrote: Hello, Storm experts: I'm trying to use the system 'ticks' in Storm by setting TOPOLOGY_TICK_TUPLE_FREQ_SECS and then acting on the received tick tuples. However, I find that when I set TOPOLOGY_TICK_TUPLE_FREQ_SECS into the config of even a very very simple topology (one dummy spout that just emits to nowhere), i get the following error: Non-system tuples should never be sent to __system bolt I see the following in the log: 10665 [Thread-21-__system] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [1] followed by this trace: java.lang.RuntimeException: java.lang.RuntimeException: Non-system tuples should never be sent to __system bolt. at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90) ~[storm-core-0.9.0-rc2.jar:na] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61) ~[storm-core-0.9.0-rc2.jar:na] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) ~[storm-core-0.9.0-rc2.jar:na] at backtype.storm.daemon.executor$fn__3495$fn__3507$fn__3554.invoke(executor.clj:729) ~[storm-core-0.9.0-rc2.jar:na] at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403) ~[storm-core-0.9.0-rc2.jar:na] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60] Caused by: java.lang.RuntimeException: Non-system tuples should never be sent to __system bolt. at backtype.storm.metric.SystemBolt.execute(SystemBolt.java:132) ~[storm-core-0.9.0-rc2.jar:na] at backtype.storm.daemon.executor$fn__3495$tuple_action_fn__3497.invoke(executor.clj:614) ~[storm-core-0.9.0-rc2.jar:na] at backtype.storm.daemon.executor$mk_task_receiver$fn__3418.invoke(executor.clj:385) ~[storm-core-0.9.0-rc2.jar:na] at backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43) ~[storm-core-0.9.0-rc2.jar:na] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) ~[storm-core-0.9.0-rc2.jar:na] ... 6 common frames omitted I certainly did not wire anything to send tuples to the system bolt, as you can see from the simple test topology i'm including below. I'm wondering if there is any other set up that i need to do. I'm also enclosing most of the log output (sorry it is long) just in case that is helpful. Thanks in advance to whoever can shed light on this... - chris The CODE -- a testNG test in groovy... but it can be changed to java with a few semicolons here and there ... package com.example import backtype.storm.Config import backtype.storm.LocalCluster import backtype.storm.spout.SpoutOutputCollector import backtype.storm.task.TopologyContext import backtype.storm.topology.OutputFieldsDeclarer import backtype.storm.topology.TopologyBuilder import backtype.storm.topology.base.BaseRichSpout import backtype.storm.tuple.Fields import backtype.storm.tuple.Values import backtype.storm.utils.Utils import org.slf4j.Logger import org.slf4j.LoggerFactory import org.testng.annotations.Test /* * Author: cbedford * Date: 6/18/14 * Time: 10:05 PM */ public class TickTest { public static class DummySpout extends BaseRichSpout { public static Logger LOG = LoggerFactory.getLogger(DummySpout.class); boolean _isDistributed; SpoutOutputCollector _collector; private static Integer count = 0; public DummySpout() { this(true); } public DummySpout(boolean isDistributed) { _isDistributed = isDistributed; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() {} public void nextTuple() { Utils.sleep(100); count += 1; final Values values = new Values( commandFoo, sourceFoo); _collector.emit(values); } public void
When i set TOPOLOGY_TICK_TUPLE_FREQ_SECS into my Storm config, I get Non-system tuples should never be sent to __system bolt
] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61) ~[storm-core-0.9.0-rc2.jar:na] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) ~[storm-core-0.9.0-rc2.jar:na] at backtype.storm.daemon.executor$fn__3495$fn__3507$fn__3554.invoke(executor.clj:729) ~[storm-core-0.9.0-rc2.jar:na] at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403) ~[storm-core-0.9.0-rc2.jar:na] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60] Caused by: java.lang.RuntimeException: Non-system tuples should never be sent to __system bolt. at backtype.storm.metric.SystemBolt.execute(SystemBolt.java:132) ~[storm-core-0.9.0-rc2.jar:na] at backtype.storm.daemon.executor$fn__3495$tuple_action_fn__3497.invoke(executor.clj:614) ~[storm-core-0.9.0-rc2.jar:na] at backtype.storm.daemon.executor$mk_task_receiver$fn__3418.invoke(executor.clj:385) ~[storm-core-0.9.0-rc2.jar:na] at backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43) ~[storm-core-0.9.0-rc2.jar:na] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) ~[storm-core-0.9.0-rc2.jar:na] ... 6 common frames omitted 10721 [Thread-21-__system] INFO backtype.storm.util - Halting process: (Worker died) -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com
Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?
Yes.. if i used prepare or open on spouts or bolts it would work, but unfortunately it would be a bit brittle. I'd have to include a spout or bolt just for initializing my invariant code... i'd rather do that when the topology is activated on the worker.. so this seems like a good use of an activated()method on the StormTopology class (where activated() would be called after the StormTopology is deserialized by the worker node process). But, if there is no such method, I will make do with what is there. thanks for your response. chris On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant vaill...@animetrics.com wrote: The bolt base classes have a prepare method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html and the spout base classes have a similar activate method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html Is that sufficient for your needs or were you thinking of something different? Marc On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote: Hi there - I would like to set up some state that spouts and bolts share, and I'd like to prepare this state when the StormTopology gets 'activated' on a worker. it would be great if the StormTopology had something like a prepare or open method to indicate when it is starting. I looked but i could find no such API. Maybe I should submit an enhancement request ? Thanks in advance for your responses, - Chris [ if anyone is curious, the shared state is for all my application code to check or not check invariants. the invariant checking takes additional time, so we don't want to do it in production.. but during testing/development it helps catch bugs]. -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com
Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?
This looks promising. thanks. hope you don't mind one more question -- if i create my own implementation of ITaskHook and add it do the config as you illustrated in prev. msg..will the prepare() method of my implementation be called exactly once shortly after StormTopology is deserialized by the worker node process ? - chris On Mon, Jun 2, 2014 at 7:09 PM, Michael Rose mich...@fullcontact.com wrote: You don't have to include a specific bolt for init code. It's not difficult to push your init code into a separate class and call it from your bolts, lock on that class, run init, and then allow other instances to skip over it. Without changing bolt/spout code, I've taken to including a task hook for init code (e.g. properties / Guice). Check out BaseTaskHook, it's easily extendible and can be included pretty easily too: stormConfig.put(Config.TOPOLOGY_AUTO_TASK_HOOKS, Lists.newArrayList(MyTaskHook.class.getName())); Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Jun 2, 2014 at 7:25 PM, Chris Bedford ch...@buildlackey.com wrote: Yes.. if i used prepare or open on spouts or bolts it would work, but unfortunately it would be a bit brittle. I'd have to include a spout or bolt just for initializing my invariant code... i'd rather do that when the topology is activated on the worker.. so this seems like a good use of an activated()method on the StormTopology class (where activated() would be called after the StormTopology is deserialized by the worker node process). But, if there is no such method, I will make do with what is there. thanks for your response. chris On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant vaill...@animetrics.com wrote: The bolt base classes have a prepare method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html and the spout base classes have a similar activate method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html Is that sufficient for your needs or were you thinking of something different? Marc On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote: Hi there - I would like to set up some state that spouts and bolts share, and I'd like to prepare this state when the StormTopology gets 'activated' on a worker. it would be great if the StormTopology had something like a prepare or open method to indicate when it is starting. I looked but i could find no such API. Maybe I should submit an enhancement request ? Thanks in advance for your responses, - Chris [ if anyone is curious, the shared state is for all my application code to check or not check invariants. the invariant checking takes additional time, so we don't want to do it in production.. but during testing/development it helps catch bugs]. -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com
Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?
Hi there - I would like to set up some state that spouts and bolts share, and I'd like to prepare this state when the StormTopology gets 'activated' on a worker. it would be great if the StormTopology had something like a prepare or open method to indicate when it is starting. I looked but i could find no such API. Maybe I should submit an enhancement request ? Thanks in advance for your responses, - Chris [ if anyone is curious, the shared state is for all my application code to check or not check invariants. the invariant checking takes additional time, so we don't want to do it in production.. but during testing/development it helps catch bugs]. -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com
Best way to get JMX access to storm metrics for spout tuples emitted / acked | bolt tuples emitted, latency etc.
Hi, I'm interested in getting metrics via JMX on not onlycontainer level factors (such as # of garbage collects, heap usage,etc.), but also metrics that describe how spouts and bolts are performing (e.g., # of tuples emitted, # transferred -- the same kind of stuff that the storm UI shows.) I ran across this project : : https://github.com/ooyala/metrics_storm/blob/master/src/ooyala/common/metrics_storm/MetricsStorm.scala I was just wondering if this is the best best to pursue. Does anyone have any concrete experience with this that they can share? thanx! chris
Re: Best way to get JMX access to storm metrics for spout tuples emitted / acked | bolt tuples emitted, latency etc.
Thanks, Noel and Michael ! On Mon, Mar 17, 2014 at 5:55 PM, Noel Milton Vega nmv...@computingarchitects.com wrote: I used Nimbus Thrift on my end. The one thing to note about that approach is that, if you're planning to plot those metrics (say, on Graphite), the results aren't very interesting because the metrics -- emitted, transferred, acks, etc. -- are aggregates and simply monotonically increase, which lead to not so insightful graphs (... more-or-less lines that go up from left-to-right forever). However, I took consecutive metrics samples from Nimbus, say, every 5 seconds, and plotted the difference between the two samples divided by 5 to plot average quantities instead. That was insightful. So while (except for latency type metrics) Nimbus doesn't provide you with average rates for emits/transferred/acks (and you won't see them on the UI either), you can easily derive them manually and send that to Graphite (or to your charting platform). If using Graphite, btw., you also have the option of letting Graphite calculate the averages on the back-end for you, too (giving you options). Finally, for application specific metrics exposed via Java Mbeans, consider Codahale's Metric's API (in-band), and JMXtrans (out-of-band). Noel Milton Vega Dimension Data, LLC. nmv...@didata.us nmv...@computingarchitects.com On 03/17/2014 08:30 PM, Michael Rose wrote: Depending on what you need, you could use the Thrift interface to Nimbus to grab the statistics. The UI project does do some calculations on the raw metrics, so it's not quite the same. The algorithms it uses aren't too difficult to replicate. We ended up building something very similar to what Ooyala did, customized for our specific needs, it's an excellent pattern. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Mar 17, 2014 at 6:21 PM, Chris Bedford ch...@buildlackey.comwrote: Hi, I'm interested in getting metrics via JMX on not onlycontainer level factors (such as # of garbage collects, heap usage,etc.), but also metrics that describe how spouts and bolts are performing (e.g., # of tuples emitted, # transferred -- the same kind of stuff that the storm UI shows.) I ran across this project : : https://github.com/ooyala/metrics_storm/blob/master/src/ooyala/common/metrics_storm/MetricsStorm.scala I was just wondering if this is the best best to pursue. Does anyone have any concrete experience with this that they can share? thanx! chris -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com
when running a negative test that forces exception in my spout i find that the JVM abruptly exits. Any way to intercept this ?
Hi there.. I tried writing a negative testng test which starts my topology in local mode, then submits it. I provide parameters that deliberately cause my code called by my spout to choke and throw a RuntimeException. This causes my test runner (maven's surefire) to complain that The forked VM terminated without saying properly goodbye. VM crash or System.exit called This isn't completely mission critical to get this test done... But I'm curious ! Does any one have an idea how to write a negative test that causes an exception in a spout or bolt ? How does one trap these seemingly fatal errors? I've considered using dependencies in testNg, running a separate thread that waits for a bit and then checks the log to verify the exception was thrown.. All of it comes to naught when the JVM says bye bye. any ideas much appreciated .. thanks ! chris
Wondering where my System.out.println()'s go, also why can't i see results of logging at error() level to my class's slf4j logger
Hi there - I have a problem with my println and logger output not appearing any where i can find it when I deploy a topology to my one node cluster. I don't have this issue when i debug the topology in local mode. In local mode i see all the output. As a simple example I have modified the ExclamationTopology in the storm starter project and tried to figure out where the output was going. My modifications are shown below (in MODIFIED ExclamationTopology section). So, my first question is: where should i be looking for std out output, and my logger output ? I have looked in the following places: 1) the console where i launch the topology 2) the captured standard output of nimbus and supervisor processes (that is /tmp/*.log), given the processes are launched as below: $STORMDIR/bin/storm nimbus /tmp/nimbus.log 21 $STORMDIR/bin/storm supervisor /tmp/supervisor.log 21 3) All logfiles generated under $STORMDIR/logs/* My second question, is how is storm internal logger output being sent to stdout in the first place ? I am seeing lots of logger output going to my console that looks like this: 9059 [Thread-18-count] INFO backtype.storm.daemon.task - Emitting: count default [snow, 20] However, when I look in $STORMDIR/logback/cluster.xml , I don't see any appender configured that would go to stdout... I would assume that logback would require a config line like this: appender name=STDOUT class=ch.qos.logback.core.ConsoleAppender But I see no such line in cluster.xml This leads me to believe that there is some additional mechanism to configure logging that I don't understand. Any leads much much appreciated ! thanks Chris MODIFIED ExclamationTopology public class ExclamationTopology { public static Logger LOG = LoggerFactory.getLogger(ExclamationTopology .class); public static class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } @Override public void execute(Tuple tuple) { System.out.println(tuple: + tuple); LOG.error(tuple: logger + tuple); _collector.emit(tuple, new Values(tuple.getString(0) + !!!)); _collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(word)); } } public static void main(String[] args) throws Exception { System.out.println( STARING THE PROGRAM); LOG.error( -logger -- STARING THE PROGRAM); TopologyBuilder builder = new TopologyBuilder(); etc. etc. the rest is the same as the 'stock version.. -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com