Storm Production usage (case studies)
Hello! I would like to know that besides the companies mentioned on the documentation (http://storm.apache.org/documentation/Powered-By.html), if they are any companies that have deployed Storm on production and what were their case study (the way that is also described in the documentation). I'm evaluating Storm and this information will be very helpful to know that our case studies might fit in. I'll look forward for your responses. Thanks, Best regards, Florin
null pointer at DisruptorQueue
Hi, I'm relatively new to Storm, a couple of months using it. So, excuse any stupid question I might post :-) I have a somewhat complex topology... At some point while developing it I have made some change that is producing following exception - 325692 [Thread-345-disruptor-executor[32 32]-send-queue] ERROR backtype.storm.util - Async loop died! java.lang.RuntimeException: java.lang.NullPointerException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_72] Caused by: java.lang.NullPointerException: null at clojure.lang.RT.intCast(RT.java:1087) ~[clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.jar:0.9.3] ... 6 common frames omitted 325693 [Thread-345-disruptor-executor[32 32]-send-queue] ERROR backtype.storm.daemon.executor === It is clearly something I have added because this was not happening before yesterday. I was initially using 0.9.2-incubating and upgrading to 0.9.3 gives the same behavior. Is this a know issue? I can describe more what I'm doing if that would help find the culprit... or even try to create quick-start project. This happens running the topology on a local cluster... Not tested in in a production setting yet. -- Regards - Ernesto Reinaldo Barreiro
Re: null pointer at DisruptorQueue
Hi. This seem to be related to the fact that there are other threads (some custom threads) interacting with my custom bolts. Removing them fixes the problem. Side note: my bolts accumulate data and at some point when new data arrive they dump this accumulated data... if no new data arrives at some point data is not dumped. That's why I had these dump forcing threads. On Fri, Dec 12, 2014 at 11:04 AM, Ernesto Reinaldo Barreiro reier...@gmail.com wrote: Hi, I'm relatively new to Storm, a couple of months using it. So, excuse any stupid question I might post :-) I have a somewhat complex topology... At some point while developing it I have made some change that is producing following exception - 325692 [Thread-345-disruptor-executor[32 32]-send-queue] ERROR backtype.storm.util - Async loop died! java.lang.RuntimeException: java.lang.NullPointerException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_72] Caused by: java.lang.NullPointerException: null at clojure.lang.RT.intCast(RT.java:1087) ~[clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.jar:0.9.3] ... 6 common frames omitted 325693 [Thread-345-disruptor-executor[32 32]-send-queue] ERROR backtype.storm.daemon.executor === It is clearly something I have added because this was not happening before yesterday. I was initially using 0.9.2-incubating and upgrading to 0.9.3 gives the same behavior. Is this a know issue? I can describe more what I'm doing if that would help find the culprit... or even try to create quick-start project. This happens running the topology on a local cluster... Not tested in in a production setting yet. -- Regards - Ernesto Reinaldo Barreiro -- Regards - Ernesto Reinaldo Barreiro
Re: null pointer at DisruptorQueue
Michel, On Fri, Dec 12, 2014 at 5:10 PM, Michael Rose mich...@fullcontact.com wrote: Hi Ernesto, Have multi-threaded bolts is fine as long as you synchronize on the OutputCollector before emitting/acking. That'll solve your issue. Thanks for your answer! Even if the thread is not a Storm thread? Michael Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Fri, Dec 12, 2014 at 6:05 AM, Ernesto Reinaldo Barreiro reier...@gmail.com wrote: Hi. This seem to be related to the fact that there are other threads (some custom threads) interacting with my custom bolts. Removing them fixes the problem. Side note: my bolts accumulate data and at some point when new data arrive they dump this accumulated data... if no new data arrives at some point data is not dumped. That's why I had these dump forcing threads. On Fri, Dec 12, 2014 at 11:04 AM, Ernesto Reinaldo Barreiro reier...@gmail.com wrote: Hi, I'm relatively new to Storm, a couple of months using it. So, excuse any stupid question I might post :-) I have a somewhat complex topology... At some point while developing it I have made some change that is producing following exception - 325692 [Thread-345-disruptor-executor[32 32]-send-queue] ERROR backtype.storm.util - Async loop died! java.lang.RuntimeException: java.lang.NullPointerException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_72] Caused by: java.lang.NullPointerException: null at clojure.lang.RT.intCast(RT.java:1087) ~[clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.jar:0.9.3] ... 6 common frames omitted 325693 [Thread-345-disruptor-executor[32 32]-send-queue] ERROR backtype.storm.daemon.executor === It is clearly something I have added because this was not happening before yesterday. I was initially using 0.9.2-incubating and upgrading to 0.9.3 gives the same behavior. Is this a know issue? I can describe more what I'm doing if that would help find the culprit... or even try to create quick-start project. This happens running the topology on a local cluster... Not tested in in a production setting yet. -- Regards - Ernesto Reinaldo Barreiro -- Regards - Ernesto Reinaldo Barreiro -- Regards - Ernesto Reinaldo Barreiro
Re: null pointer at DisruptorQueue
That's correct. Normally you use OutputCollector in a single thread (the executor thread for the bolt). If your bolt is multithreaded, just synchronize the entire collector no matter what thread you're emitting/acking/failing from. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Fri, Dec 12, 2014 at 9:25 AM, Ernesto Reinaldo Barreiro reier...@gmail.com wrote: Michel, On Fri, Dec 12, 2014 at 5:10 PM, Michael Rose mich...@fullcontact.com wrote: Hi Ernesto, Have multi-threaded bolts is fine as long as you synchronize on the OutputCollector before emitting/acking. That'll solve your issue. Thanks for your answer! Even if the thread is not a Storm thread? Michael Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Fri, Dec 12, 2014 at 6:05 AM, Ernesto Reinaldo Barreiro reier...@gmail.com wrote: Hi. This seem to be related to the fact that there are other threads (some custom threads) interacting with my custom bolts. Removing them fixes the problem. Side note: my bolts accumulate data and at some point when new data arrive they dump this accumulated data... if no new data arrives at some point data is not dumped. That's why I had these dump forcing threads. On Fri, Dec 12, 2014 at 11:04 AM, Ernesto Reinaldo Barreiro reier...@gmail.com wrote: Hi, I'm relatively new to Storm, a couple of months using it. So, excuse any stupid question I might post :-) I have a somewhat complex topology... At some point while developing it I have made some change that is producing following exception - 325692 [Thread-345-disruptor-executor[32 32]-send-queue] ERROR backtype.storm.util - Async loop died! java.lang.RuntimeException: java.lang.NullPointerException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_72] Caused by: java.lang.NullPointerException: null at clojure.lang.RT.intCast(RT.java:1087) ~[clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.jar:0.9.3] ... 6 common frames omitted 325693 [Thread-345-disruptor-executor[32 32]-send-queue] ERROR backtype.storm.daemon.executor === It is clearly something I have added because this was not happening before yesterday. I was initially using 0.9.2-incubating and upgrading to 0.9.3 gives the same behavior. Is this a know issue? I can describe more what I'm doing if that would help find the culprit... or even try to create quick-start project. This happens running the topology on a local cluster... Not tested in in a production setting yet. -- Regards - Ernesto Reinaldo Barreiro -- Regards - Ernesto Reinaldo Barreiro -- Regards - Ernesto Reinaldo Barreiro
Is it possible to 'force' a topology ID to a specific value?
I’m currently looking at ways to leverage the Storm HTTP APIs to monitor the health of our storm cluster and a running topology in particular. I can get all the statistics I need when I know a particular topology ID, parsing the json returned from a URL like: http://servername:8080/api/v1/topology/production-topology-1418162192 with our existing monitoring apparatus. The problem is that I can’t setup a fixed URL for our monitoring systems because Storm builds it’s own ID from the topology name: name + nonce. As it stands, I'd need to write code to list all running topologies and parse out the one I’m interested in, requiring a an application layer I don’t want to write at the moment. Is it possible to override Storm’s appending the nonce, forcing it to use the specific ID string I want? thanks in advance sheldonwh...@comcast.net
Re: write data to file then bulk copy to database
Here I found if I use a RandomTupleSpout which implements IBatchSpout to replace OpaqueTridentKafkaSpout, static class RandomTupleSpout implements IBatchSpout { private transient Random random; private static final int BATCH = 1000; @Override @SuppressWarnings(rawtypes) public void open(final Map conf, final TopologyContext context) { random = new Random(); } @Override public void emitBatch(final long batchId, final TridentCollector collector) { // emit a 3 number tuple (a,b,c) for (int i = 0; i BATCH; i++) { collector.emit(new Values(i, test string inserted into this table)); } } @Override public void ack(final long batchId) {} @Override public void close() {} @Override @SuppressWarnings(rawtypes) public Map getComponentConfiguration() { return null; } @Override public Fields getOutputFields() { return new Fields(batchid, word); } } I can detect batch tuples in TridentState, but with OpaqueTridentKafkaSpout, seems I only receive one 1 row of tuple. thanks Alec On Fri, Dec 12, 2014 at 10:47 AM, Sa Li sa.in.v...@gmail.com wrote: Hi, Taylor Thank you very much, I actually read your book - storm blueprints: patterns for distributed real-time computation, awesome book. I am not a java programmer, but I try my best to program storm code in java, I actually implemented a trident state based on https://github.com/geoforce/storm-postgresql. I walk through the Trident-state link you mentioned, thought I am understanding the state bit. Here this is the updater function: static class EventUpdater implements ReducerAggregatorListString { @Override public ListString init(){ return null; } @Override public ListString reduce(ListString curr, TridentTuple tuple) { ListString updated = null ; if ( curr == null ) { String event = (String) tuple.getValue(1); updated = Lists.newArrayList(event); } else { updated = curr ; } System.out.println(updated); return updated ; } } In the State, I am doing such PostgresqlState(final PostgresqlStateConfig config) { this.config = config; try { Class.forName(org.postgresql.Driver); connection = DriverManager.getConnection(config.getUrl(), sali, sali); } catch ( ClassNotFoundException e) { logger.error(Failed to establish DB connection , e); System.exit(1); } catch (SQLException e) { logger.error(Failed to establish DB connection, e); System.exit(2); } } public static Factory newFactory(final PostgresqlStateConfig config) { return new Factory(config); } // I don't do anything in multiget, since I have to compare the keys in DB or retrieve anything from DB @Override public ListT multiGet(final ListListObject keys) { final ListT result = new ArrayList(); for (final ListObject key : keys) { result.add((T) null) ; } return result; } Next, I wrote two multiput to store the kafka messages, one by copy command, another by multi-insert. //copy command @Override public void multiPut(final ListListObject keys, final ListT values) { System.out.println(multiPut start . ); System.out.println(keys); // I want to print out the batch keys final IteratorListListObject partitionedKeys = Lists.partition(keys, config.getBatchSize()).iterator(); final IteratorListT partitionedValues = Lists.partition(values, config.getBatchSize()).iterator(); while (partitionedKeys.hasNext() partitionedValues.hasNext()) { final ListListObject pkeys = partitionedKeys.next(); final ListT pvalues = partitionedValues.next(); final StringBuilder copyQueryBuilder = new StringBuilder() .append(COPY ) .append(config.getTable()) .append(() .append(buildColumns()) .append() FROM STDIN WITH DELIMITER '|');