Topology is hang when bolt Async loop died because KryoTupleSerializer.serialize throws NullPointerException

2014-03-10 Thread 鞠大升
hi, all

*Background:*
-
we are using Storm 0.9.0.1, our topology has KafkaSpout(read logs from
kafka),  ParserBolt(paser log), SaverBolt(save to kafka again). KafkaSpout
have 16 threads, ParserBolt have 32 threads,  SaverBolt have 16 threads.
The ParserBolt is written in python using Multilang.

*Problems:*
-
Sometimes, KryoTupleSerializer.serialize throws NullPointerException cause
the ParserBolt died。Then the supervisor will restart the bolts again, but
the new bolt will never receive any tuples, and the topology is hang until
we restart the topology.

*Analyse:*
-
We found a TroubleShooting(
https://github.com/nathanmarz/storm/wiki/Troubleshooting#wiki-nullpointerexception-from-deep-inside-storm)
says:  This
is caused by having multiple threads issue methods on the OutputCollector.
All emits, acks, and fails must happen on the same thread. One subtle way
this can happen is if you make a IBasicBolt that emits on a separate thread.
 IBasicBolt's automatically ack after execute is called, so this would
cause multiple threads to use the OutputCollector leading to this
exception. When using a basic bolt, all emits must happen in the same
thread that runs execute.
And we found in ShellBolt.java,the _readerThread is a new thread,
handleEmit will call emit to emit new tuples.

But another wiki(
https://github.com/nathanmarz/storm/wiki/Concepts#wiki-bolts) says:  Its
perfectly fine to launch new threads in bolts that do processing
asynchronously.
OutputCollectorhttp://nathanmarz.github.com/storm/doc/backtype/storm/task/OutputCollector.html
is
thread-safe and can be called at any time.

*So we have questions:*
-
1) does OutputCollector is thread-safe or not?  if it is not thread-safe,
then all emits, acks, and fails must happen on the same thread, does
ShellBolt has a bug?
2) when the bolt is restart, why the topology is hang? by the way, we are
using netty.

anyone can help?

the work.log:
-

2014-03-10 12:48:31 b.s.util [ERROR] Async loop died!

java.lang.RuntimeException: java.lang.NullPointerException

at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
~[storm-core-0.9.0.1.jar:na]

at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
~[storm-core-0.9.0.1.jar:na]

at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
~[storm-core-0.9.0.1.jar:na]

at
backtype.storm.disruptor$consume_loop_STAR_$fn__849.invoke(disruptor.clj:74)
~[storm-core-0.9.0.1.jar:na]

at backtype.storm.util$async_loop$fn__469.invoke(util.clj:406)
~[storm-core-0.9.0.1.jar:na]

at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]

at java.lang.Thread.run(Thread.java:722) [na:1.7.0_21]

Caused by: java.lang.NullPointerException: null

at
backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:24)
~[storm-core-0.9.0.1.jar:na]

at
backtype.storm.daemon.worker$mk_transfer_fn$fn__4335$fn__4339.invoke(worker.clj:108)
~[storm-core-0.9.0.1.jar:na]

at backtype.storm.util$fast_list_map.invoke(util.clj:804)
~[storm-core-0.9.0.1.jar:na]

at
backtype.storm.daemon.worker$mk_transfer_fn$fn__4335.invoke(worker.clj:108)
~[storm-core-0.9.0.1.jar:na]

at
backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4060.invoke(executor.clj:240)
~[storm-core-0.9.0.1.jar:na]

at
backtype.storm.disruptor$clojure_handler$reify__836.onEvent(disruptor.clj:43)
~[storm-core-0.9.0.1.jar:na]

at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
~[storm-core-0.9.0.1.jar:na]

... 6 common frames omitted



-- 
dashengju
+86 13810875910
dashen...@gmail.com


Topology submission exception caused by Class Not Found backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917

2014-03-10 Thread James Hardy
Hi

I'm hoping someone can help.  I've been learning Storm for the past few
hours and everything has been great however I'm getting
a java.lang.ClassNotFoundException:
backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917
exception when trying to submit my topology to a local cluster.

I have a simple spout which emits the string go and the ExclamationBolt
taken from the examples.  I wanted to create a pipeline of bolts and so
created a topology:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(test, new Spout());
builder.setBolt(ex1, new ExclamationBolt()).shuffleGrouping(test);
builder.setBolt(ex2, new ExclamationBolt()).shuffleGrouping(ex1);

This works fine no problem at all however when I add another bolt I get the
Topology submission exception:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(test, new Spout());
builder.setBolt(ex1, new ExclamationBolt()).shuffleGrouping(test);
builder.setBolt(ex2, new ExclamationBolt()).shuffleGrouping(ex1);
builder.setBolt(ex3, new ExclamationBolt()).shuffleGrouping(ex2);

If anyone has any ideas they would be really appreciated.  I feel like I've
hit a brick wall :/

Thanks

James


How to define grouping in a Topology with an 'a priori' unknown number of streams to subscribe?

2014-03-10 Thread Susana González
Hi,


I need help to go from a simple Storm topology like this:



TopologyBuilder builder = *new* TopologyBuilder();

builder.setSpout(mySpout, spout, spoutParallelism);

builder.setBolt(myBoltA, boltA, boltAParallelism).shuffleGrouping(
mySpout);

builder.setBolt(myBoltB, boltB, boltBParallelism).shuffleGrouping(
myBoltA);



to a new topology  where there are several processes myBoltA* whose emitted
tuples need to be processed by a same process boltB.


The problem is that the number of processes myBoltA* I have is read from a
configuration file when the topology is started, so I don't know them a
priori to define the grouping in the code just with:



builder.setBolt(myBoltB, boltB, boltBParallelism)

   .shuffleGrouping(myBoltA1)

   .shuffleGrouping(myBoltA2)

   .etc...

   .shuffleGrouping(myBoltAn);



I've searched if it's possible to do it using a CustomStreamGrouping or
using Trident... but I haven't found how to implement it yet.



Any idea?



Thanks in advance!

 Susana


Re: How to define grouping in a Topology with an 'a priori' unknown number of streams to subscribe?

2014-03-10 Thread Nathan Leung
You can do something like

BoltDeclarer bd = builder.setBolt(myBoltB, boltB, boltBparallelism);
for (int i = 1; i  numInstances; ++i) {
bd.shuffleGrouping(myBoltA + i);
}


On Mon, Mar 10, 2014 at 11:52 AM, Susana González susan...@gmail.comwrote:

 Hi,


 I need help to go from a simple Storm topology like this:



 TopologyBuilder builder = *new* TopologyBuilder();

 builder.setSpout(mySpout, spout, spoutParallelism);

 builder.setBolt(myBoltA, boltA, boltAParallelism).shuffleGrouping(
 mySpout);

 builder.setBolt(myBoltB, boltB, boltBParallelism).shuffleGrouping(
 myBoltA);



 to a new topology  where there are several processes myBoltA* whose
 emitted tuples need to be processed by a same process boltB.


 The problem is that the number of processes myBoltA* I have is read from a
 configuration file when the topology is started, so I don't know them a
 priori to define the grouping in the code just with:



 builder.setBolt(myBoltB, boltB, boltBParallelism)

.shuffleGrouping(myBoltA1)

.shuffleGrouping(myBoltA2)

.etc...

.shuffleGrouping(myBoltAn);



 I've searched if it's possible to do it using a CustomStreamGrouping or
 using Trident... but I haven't found how to implement it yet.



 Any idea?



 Thanks in advance!

  Susana





Re: Topology submission exception caused by Class Not Found backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917

2014-03-10 Thread James Hardy
It was indeed a classpath/maven issue.  Thank you Nathan!


On Mon, Mar 10, 2014 at 2:36 PM, Nathan Leung ncle...@gmail.com wrote:

 These appear to classes generated from clojure.  Are you building against
 the same version of storm that you are using to run the application?


 On Mon, Mar 10, 2014 at 8:07 AM, James Hardy jcdhardy...@gmail.comwrote:

 Hi

 I'm hoping someone can help.  I've been learning Storm for the past few
 hours and everything has been great however I'm getting
 a java.lang.ClassNotFoundException:
 backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917
 exception when trying to submit my topology to a local cluster.

 I have a simple spout which emits the string go and the ExclamationBolt
 taken from the examples.  I wanted to create a pipeline of bolts and so
 created a topology:

 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout(test, new Spout());
 builder.setBolt(ex1, new ExclamationBolt()).shuffleGrouping(test);
 builder.setBolt(ex2, new ExclamationBolt()).shuffleGrouping(ex1);

 This works fine no problem at all however when I add another bolt I get
 the Topology submission exception:

 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout(test, new Spout());
 builder.setBolt(ex1, new ExclamationBolt()).shuffleGrouping(test);
 builder.setBolt(ex2, new ExclamationBolt()).shuffleGrouping(ex1);
 builder.setBolt(ex3, new ExclamationBolt()).shuffleGrouping(ex2);

 If anyone has any ideas they would be really appreciated.  I feel like
 I've hit a brick wall :/

 Thanks

 James





Setting Bolt heap Size? Config.TOPOLOGY_WORKER_CHILDOPT or Config.NIMBUS_CHILDOPTS ??

2014-03-10 Thread shahab
Hi

I just wonder: 1. what is the required memory settings in order to run a
Bolt holding a very large object (around 2GB) ? is it
 Config.TOPOLOGY_WORKER_CHILDOPT which needs to be or
 Config.NIMBUS_CHILDOPTS ??

best,
/Shahab


Re: java.lang.OutOfMemoryError: Java heap space in Nimbus

2014-03-10 Thread Derek Dagit

Yes, set 'nimbus.childopts: -Xmx?' in your storm.yaml, and restart nimbus. If 
unset, I believe the default is -Xmx1024m, for a max of 1024 MB heap.

You can set it to -Xmx2048m, for example, to have a max heap size of 2048 MB.

Set this on the node that runs nimbus, not in your topology conf.

--
Derek

On 3/10/14, 14:19, shahab wrote:

Hi,

I am facing OutOfMemoryError: Java heap space exception in Nimbus while 
running in cluster mode. I just wonder what are the possible JVM or Storm  options that I 
can set to overcome this problem?

I am running a storm topology in Cluster mode where all servers (zookeeper, 
nimbus, supervisor and worker) are in one machine. The toplogy that I use is as 
follows:


conf.setMaxSpoutPending(2000); // maximum number of pending messages at spout
conf.setNumWorkers(4);
conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 12);
conf.setMaxTaskParallelism(2);


but I get the following Exception in Nimbus log file:
java.lang.OutOfMemoryError: Java heap space
 at 
org.apache.commons.io.output.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:271)
 at org.apache.commons.io.IOUtils.toByteArray(IOUtils.java:219)
 at 
org.apache.commons.io.FileUtils.readFileToByteArray(FileUtils.java:1136)
 at 
backtype.storm.daemon.nimbus$read_storm_topology.invoke(nimbus.clj:305)
 at 
backtype.storm.daemon.nimbus$compute_executors.invoke(nimbus.clj:407)
 at 
backtype.storm.daemon.nimbus$compute_executor__GT_component.invoke(nimbus.clj:420)
 at 
backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:315)
 at 
backtype.storm.daemon.nimbus$mk_assignments$iter__3416__3420$fn__3421.invoke(nimbus.clj:636)
 at clojure.lang.LazySeq.sval(LazySeq.java:42)
 at clojure.lang.LazySeq.seq(LazySeq.java:60)
 at clojure.lang.RT.seq(RT.java:473)
 at clojure.core$seq.invoke(core.clj:133)
 at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
 at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
 at 
clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
 at clojure.core$reduce.invoke(core.clj:6030)
 at clojure.core$into.invoke(core.clj:6077)
 at backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:635)
 at clojure.lang.RestFn.invoke(RestFn.java:410)
 at 
backtype.storm.daemon.nimbus$fn__3592$exec_fn__1228__auto3593$fn__3598$fn__3599.invoke(nimbus.clj:872)
 at 
backtype.storm.daemon.nimbus$fn__3592$exec_fn__1228__auto3593$fn__3598.invoke(nimbus.clj:871)
 at 
backtype.storm.timer$schedule_recurring$this__1776.invoke(timer.clj:69)
 at backtype.storm.timer$mk_timer$fn__1759$fn__1760.invoke(timer.clj:33)
 at backtype.storm.timer$mk_timer$fn__1759.invoke(timer.clj:26)
 at clojure.lang.AFn.run(AFn.java:24)
 at java.lang.Thread.run(Thread.java:744)
2014-03-10 20:10:02 NIOServerCnxn [ERROR] Thread 
Thread[pool-4-thread-40,5,main] died
java.lang.OutOfMemoryError: Java heap space
 at 
org.apache.commons.io.output.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:271)
 at org.apache.commons.io.IOUtils.toByteArray(IOUtils.java:219)
 at 
org.apache.commons.io.FileUtils.readFileToByteArray(FileUtils.java:1136)
 at 
backtype.storm.daemon.nimbus$read_storm_topology.invoke(nimbus.clj:305)
 at 
backtype.storm.daemon.nimbus$fn__3592$exec_fn__1228__auto__$reify__3605.getTopologyInfo(nimbus.clj:1066)
 at 
backtype.storm.generated.Nimbus$Processor$getTopologyInfo.getResult(Nimbus.java:1481)
 at 
backtype.storm.generated.Nimbus$Processor$getTopologyInfo.getResult(Nimbus.java:1469)
 at org.apache.thrift7.ProcessFunction.process(ProcessFunction.java:32)
 at org.apache.thrift7.TBaseProcessor.process(TBaseProcessor.java:34)
 at 
org.apache.thrift7.server.TNonblockingServer$FrameBuffer.invoke(TNonblockingServer.java:632)
 at 
org.apache.thrift7.server.THsHaServer$Invocation.run(THsHaServer.java:201)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
2014-03-10 20:10:02 util [INFO] Halting process: (Error when processing an 
event)


best ,
/Shahab



Error when trying to use multilang in a project built from scratch (not storm-starter)

2014-03-10 Thread Chris James
Reposting since I posted this before at a poor time and got no response.

I'm trying out a storm project built from scratch in Java, but with a
Python bolt.  I have everything running with all Java spouts/bolts just
fine, but when I try to incorporate a python bolt I am running into issues.

I have my project separated into a /storm/ for topologies, /storm/bolts/
for bolts, /storm/spouts for spouts, and /storm/multilang/ for the
multilang wrappers. Right now the only thing in /storm/multilang/ is
storm.py, copied and pasted from the storm-starter project.  In my bolts
folder, I have a dummy bolt set up that just prints the tuple.  I've
virtually mimicked the storm-starter WordCountTopology example for using a
python bolt, so I think the code is OK and the configuration is the issue.

So my question is simple.  What configuration steps do I have to set up so
that my topology knows where to look to find storm.py when I run
super(python, dummypythonbolt.py)?  I noticed an error in the stack
trace claiming that it could not run python (python is definitely on my
path and I use it everyday), and that is looking in a resources folder that
does not exist.  Here is the line in question:

Caused by: java.io.IOException: Cannot run program python (in directory
C:\Users\chris\AppData\Local\Temp\67daff0e-7348-46ee-9b62-83f8ee4e431c\supervisor\stormdist\dummy-topology-1-1394418571\resources):
CreateProcess error=267, The directory name is invalid

A more extensive stack trace is here: http://pastebin.com/6yx97m0M

So once again: what is the configuration step that I am missing to allow my
topology to see storm.py and be able to run multilang spouts/bolts in my
topology?

Thanks!


Re: Error when trying to use multilang in a project built from scratch (not storm-starter)

2014-03-10 Thread P. Taylor Goetz
I don't have access to a windows machine at the moment, but does this help?

http://support.microsoft.com/kb/832434

 On Mar 10, 2014, at 4:51 PM, Chris James chris.james.cont...@gmail.com 
 wrote:
 
 Reposting since I posted this before at a poor time and got no response.
 
 I'm trying out a storm project built from scratch in Java, but with a Python 
 bolt.  I have everything running with all Java spouts/bolts just fine, but 
 when I try to incorporate a python bolt I am running into issues.
 
 I have my project separated into a /storm/ for topologies, /storm/bolts/ for 
 bolts, /storm/spouts for spouts, and /storm/multilang/ for the multilang 
 wrappers. Right now the only thing in /storm/multilang/ is storm.py, copied 
 and pasted from the storm-starter project.  In my bolts folder, I have a 
 dummy bolt set up that just prints the tuple.  I've virtually mimicked the 
 storm-starter WordCountTopology example for using a python bolt, so I think 
 the code is OK and the configuration is the issue.
 
 So my question is simple.  What configuration steps do I have to set up so 
 that my topology knows where to look to find storm.py when I run 
 super(python, dummypythonbolt.py)?  I noticed an error in the stack trace 
 claiming that it could not run python (python is definitely on my path and I 
 use it everyday), and that is looking in a resources folder that does not 
 exist.  Here is the line in question:
 
 Caused by: java.io.IOException: Cannot run program python (in directory 
 C:\Users\chris\AppData\Local\Temp\67daff0e-7348-46ee-9b62-83f8ee4e431c\supervisor\stormdist\dummy-topology-1-1394418571\resources):
  CreateProcess error=267, The directory name is invalid
 
 A more extensive stack trace is here: http://pastebin.com/6yx97m0M
 
 So once again: what is the configuration step that I am missing to allow my 
 topology to see storm.py and be able to run multilang spouts/bolts in my 
 topology?
 
 Thanks!


Re: Error when trying to use multilang in a project built from scratch (not storm-starter)

2014-03-10 Thread Derek Dagit

Two quick thoughts:

- Can you cd to 
'C:\Users\chris\AppData\Local\Temp\67daff0e-7348-46ee-9b62-83f8ee4e431c\supervisor\stormdist\dummy-topology-1-1394418571\resources'
 from the shell as yourself?

- What are the permissions on that directory?  Is the supervisor daemon running 
as another user?

--
Derek

On 3/10/14, 17:05, P. Taylor Goetz wrote:

I don't have access to a windows machine at the moment, but does this help?

http://support.microsoft.com/kb/832434

On Mar 10, 2014, at 4:51 PM, Chris James chris.james.cont...@gmail.com 
mailto:chris.james.cont...@gmail.com wrote:


Reposting since I posted this before at a poor time and got no response.

I'm trying out a storm project built from scratch in Java, but with a Python 
bolt.  I have everything running with all Java spouts/bolts just fine, but when 
I try to incorporate a python bolt I am running into issues.

I have my project separated into a /storm/ for topologies, /storm/bolts/ for 
bolts, /storm/spouts for spouts, and /storm/multilang/ for the multilang 
wrappers. Right now the only thing in /storm/multilang/ is storm.py, copied and 
pasted from the storm-starter project.  In my bolts folder, I have a dummy bolt 
set up that just prints the tuple.  I've virtually mimicked the storm-starter 
WordCountTopology example for using a python bolt, so I think the code is OK 
and the configuration is the issue.

So my question is simple.  What configuration steps do I have to set up so that my topology knows 
where to look to find storm.py when I run super(python, 
dummypythonbolt.py)?  I noticed an error in the stack trace claiming that it could not 
run python (python is definitely on my path and I use it everyday), and that is looking in a 
resources folder that does not exist.  Here is the line in question:

Caused by: java.io.IOException: Cannot run program python (in directory 
C:\Users\chris\AppData\Local\Temp\67daff0e-7348-46ee-9b62-83f8ee4e431c\supervisor\stormdist\dummy-topology-1-1394418571\resources):
 CreateProcess error=267, The directory name is invalid

A more extensive stack trace is here: http://pastebin.com/6yx97m0M

So once again: what is the configuration step that I am missing to allow my 
topology to see storm.py and be able to run multilang spouts/bolts in my 
topology?

Thanks!


Re: Error when trying to use multilang in a project built from scratch (not storm-starter)

2014-03-10 Thread Chris James
Derek: No I cannot cd into that directory, but I can cd into the directory
one up from it (dummy-topology-1-1394418571).  That directory contains
stormcode.ser and stormconf.ser files.  The topology is running locally for
testing, so I'm not launching any separate supervisor daemon.  It just
seems like it never even attempted to create the resources directory (but
it successfully created all the ancestor directories), as the folder isn't
really locked down at all.

P. Taylor: I get what you're implying, but Eclipse is being run as an
administrator already, and I am debugging the topology locally straight out
of eclipse.  It seems bizarre that there would be permissions issues on a
folder that the project itself created.


On Mon, Mar 10, 2014 at 6:16 PM, Derek Dagit der...@yahoo-inc.com wrote:

 Two quick thoughts:

 - Can you cd to 'C:\Users\chris\AppData\Local\
 Temp\67daff0e-7348-46ee-9b62-83f8ee4e431c\supervisor\
 stormdist\dummy-topology-1-1394418571\resources' from the shell as
 yourself?

 - What are the permissions on that directory?  Is the supervisor daemon
 running as another user?

 --
 Derek


 On 3/10/14, 17:05, P. Taylor Goetz wrote:

 I don't have access to a windows machine at the moment, but does this
 help?

 http://support.microsoft.com/kb/832434

 On Mar 10, 2014, at 4:51 PM, Chris James 
 chris.james.cont...@gmail.commailto:
 chris.james.cont...@gmail.com wrote:

  Reposting since I posted this before at a poor time and got no response.

 I'm trying out a storm project built from scratch in Java, but with a
 Python bolt.  I have everything running with all Java spouts/bolts just
 fine, but when I try to incorporate a python bolt I am running into issues.

 I have my project separated into a /storm/ for topologies, /storm/bolts/
 for bolts, /storm/spouts for spouts, and /storm/multilang/ for the
 multilang wrappers. Right now the only thing in /storm/multilang/ is
 storm.py, copied and pasted from the storm-starter project.  In my bolts
 folder, I have a dummy bolt set up that just prints the tuple.  I've
 virtually mimicked the storm-starter WordCountTopology example for using a
 python bolt, so I think the code is OK and the configuration is the issue.

 So my question is simple.  What configuration steps do I have to set up
 so that my topology knows where to look to find storm.py when I run
 super(python, dummypythonbolt.py)?  I noticed an error in the stack
 trace claiming that it could not run python (python is definitely on my
 path and I use it everyday), and that is looking in a resources folder that
 does not exist.  Here is the line in question:

 Caused by: java.io.IOException: Cannot run program python (in
 directory C:\Users\chris\AppData\Local\Temp\67daff0e-7348-46ee-9b62-
 83f8ee4e431c\supervisor\stormdist\dummy-topology-1-1394418571\resources):
 CreateProcess error=267, The directory name is invalid

 A more extensive stack trace is here: http://pastebin.com/6yx97m0M

 So once again: what is the configuration step that I am missing to allow
 my topology to see storm.py and be able to run multilang spouts/bolts in my
 topology?

 Thanks!




Re: Storm-0.9.0-wip21 failed for topology with Hadoop and Hbase dependencies

2014-03-10 Thread Zheng Xue
Sorry for the late reply. This exception is not from storm. It is caused by
the version issue of Hbase.

Thanks a lot for your help.


2014-03-09 16:35 GMT+08:00 Niels Basjes ni...@basjes.nl:

 Try putting those static initializations into the prepare.
 On Mar 8, 2014 12:20 PM, Zheng Xue xuezhs...@gmail.com wrote:

 Thanks for you reply.

 I excluded the log4j dependency in the pom file. The topology can run in
 Storm-0.9.0-wip21 now.

 But another error appears in the last bolt, which is responsible for
 writing statistics into HBase. Here is the exception information:


 java.lang.RuntimeException: java.lang.NullPointerException
  at 
 backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
  at 
 backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
  at 
 backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
  at 
 backtype.storm.daemon.executor$fn__3483$fn__3495$fn__3542.invoke(executor.clj:715)
  at backtype.storm.util$async_loop$fn__441.invoke(util.clj:396)
  at clojure.lang.AFn.run(AFn.java:24)
  at java.lang.Thread.run(Thread.java:744)
 Caused by: java.lang.NullPointerException
  at 
 com.mycompany.app.MobileNetLogThresholdBolt.execute(MobileNetLogThresholdBolt.java:81)
  at 
 backtype.storm.daemon.executor$fn__3483$tuple_action_fn__3485.invoke(executor.clj:610)
  at 
 backtype.storm.daemon.executor$mk_task_receiver$fn__3406.invoke(executor.clj:381)
  at 
 backtype.storm.disruptor$clojure_handler$reify__2948.onEvent(disruptor.clj:43)
  at 
 backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)

 Line 81 in MobileNetLogThresholdBolt.java is table.put(put) (see the
 java code of this bolt below). Referred to
 https://github.com/nathanmarz/storm/wiki/Troubleshooting, multiple
 threads using the OutputCollector leads to this exception. But how can I
 fix it? Thanks.

  public class MobileNetLogThresholdBolt implements IRichBolt {
 private OutputCollector outputCollector;
 public static Configuration configuration;
 public static String tablename = t_mobilenet_threshold;
 public static HTable table;
 static {
  configuration = HBaseConfiguration.create();
  configuration.set(hbase.zookeeper.property.clientPort,2181);
  configuration.set(hbase.zookeeper.quorum, xx.xx.xx.xx);
  configuration.set(hbase.master, xx.xx.xx.xx:6);
 }


 private Log log = LogFactory.getLog(MobileNetLogThresholdBolt.class);

 @Override
 public void prepare(Map map, TopologyContext topologyContext,
 OutputCollector outputCollector) {
 this.outputCollector = outputCollector;
 try {
  table = new HTable(configuration, tablename);
  } catch (IOException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
  }

 }


 @Override
  public void execute(Tuple tuple) {

 log.info(deal data  + tuple.getString(0) + = +
 tuple.getInteger(1));
 if (tuple.getInteger(1)  2) {

 Put put = new Put(Bytes.toBytes(tuple.getString(0)));
 put.add(Bytes.toBytes(STAT_INFO), Bytes.toBytes(COUNT),
 Bytes.toBytes(String.valueOf(tuple.getInteger(1;
 try {
  table.put(put);
  } catch (IOException e) {
  e.printStackTrace();
  }
 }
 this.outputCollector.emit(tuple, tuple.getValues());
 this.outputCollector.ack(tuple);
 }

 @Override
 public void cleanup() {

 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer
 outputFieldsDeclarer) {
 }

 @Override
 public MapString, Object getComponentConfiguration() {
 return null;
 }
 }


 2014-03-08 0:53 GMT+08:00 bijoy deb bijoy.comput...@gmail.com:

 Hi Zheng,

 Did you look at the logs for what exactly the error message is?
 In case you see any error that says multiple default.yaml in path
 ...,try the below:
   --In the with-dependencies jar,can you check if you have a
 default.yaml or storm.yaml file.If yes,please delete it and try submitting
 the resulting jar.

 Thanks
 Bijoy


 On Fri, Mar 7, 2014 at 10:51 AM, Zheng Xue xuezhs...@gmail.com wrote:

 Hi, all:

 I was trying to build a Storm topology with Hadoop and Hbase
 dependencies. And I want to run this topology with Storm-on-Yarn. The
 version of Storm in it is 0.9.0-wip21. I created the Jar file with Maven,
 and the pom.xml file is attached.

 I submitted the topology (with-dependencies), and there was no
 exceptions. But it did't run at all. I checked the supervisor logs (see
 bottom), which shows that the workers failed to start.

 To trace the reason of this issue, I add a test topology
 (WordCountTopology) in the jar file. The problems remains when trying to
 submit the WordCountTopology with dependencies, but it works well when
 trying to submit the topology without dependencies.

 To make it clear that where is this issue from, I