OutputCollector is not thread safe.  Most of it is but it does call down into 
some code, like the shuffle is not thread safe.  The simplest way to be sure 
that what you are doing is safe, is to synchronize access to the 
OutputCollector within your own code.  If you could send out the version of 
storm that you are using we can probably track down exactly what is happening 
here.

—Bobby

From: 鞠大升 <[email protected]<mailto:[email protected]>>
Reply-To: 
"[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Monday, March 10, 2014 at 2:52 AM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>, 
"[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Topology is hang when bolt "Async loop died" because 
KryoTupleSerializer.serialize throws NullPointerException

hi, all

Background:
---------------------------------------------------------------------------------------------------------------------
we are using Storm 0.9.0.1, our topology has KafkaSpout(read logs from kafka),  
ParserBolt(paser log), SaverBolt(save to kafka again). KafkaSpout have 16 
threads, ParserBolt have 32 threads,  SaverBolt have 16 threads. The ParserBolt 
is written in python using Multilang.

Problems:
---------------------------------------------------------------------------------------------------------------------
Sometimes, KryoTupleSerializer.serialize throws NullPointerException cause the 
ParserBolt died。Then the supervisor will restart the bolts again, but the new 
bolt will never receive any tuples, and the topology is hang until we restart 
the topology.

Analyse:
---------------------------------------------------------------------------------------------------------------------
We found a 
TroubleShooting(https://github.com/nathanmarz/storm/wiki/Troubleshooting#wiki-nullpointerexception-from-deep-inside-storm
 ) says:  This is caused by having multiple threads issue methods on the 
OutputCollector. All emits, acks, and fails must happen on the same thread. One 
subtle way this can happen is if you make a IBasicBolt that emits on a separate 
thread. IBasicBolt's automatically ack after execute is called, so this would 
cause multiple threads to use the OutputCollector leading to this exception. 
When using a basic bolt, all emits must happen in the same thread that runs 
execute.
And we found in ShellBolt.java,the _readerThread is a new thread, handleEmit 
will call emit to emit new tuples.

But another wiki(https://github.com/nathanmarz/storm/wiki/Concepts#wiki-bolts) 
says:  Its perfectly fine to launch new threads in bolts that do processing 
asynchronously. 
OutputCollector<http://nathanmarz.github.com/storm/doc/backtype/storm/task/OutputCollector.html>
 is thread-safe and can be called at any time.

So we have questions:
---------------------------------------------------------------------------------------------------------------------
1) does OutputCollector is thread-safe or not?  if it is not thread-safe, then 
all emits, acks, and fails must happen on the same thread, does ShellBolt has a 
bug?
2) when the bolt is restart, why the topology is hang? by the way, we are using 
netty.

anyone can help?

the work.log:
---------------------------------------------------------------------------------------------------------------------

2014-03-10 12:48:31 b.s.util [ERROR] Async loop died!

java.lang.RuntimeException: java.lang.NullPointerException

        at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
 ~[storm-core-0.9.0.1.jar:na]

        at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
 ~[storm-core-0.9.0.1.jar:na]

        at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) 
~[storm-core-0.9.0.1.jar:na]

        at 
backtype.storm.disruptor$consume_loop_STAR_$fn__849.invoke(disruptor.clj:74) 
~[storm-core-0.9.0.1.jar:na]

        at backtype.storm.util$async_loop$fn__469.invoke(util.clj:406) 
~[storm-core-0.9.0.1.jar:na]

        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]

        at java.lang.Thread.run(Thread.java:722) [na:1.7.0_21]

Caused by: java.lang.NullPointerException: null

        at 
backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:24)
 ~[storm-core-0.9.0.1.jar:na]

        at 
backtype.storm.daemon.worker$mk_transfer_fn$fn__4335$fn__4339.invoke(worker.clj:108)
 ~[storm-core-0.9.0.1.jar:na]

        at backtype.storm.util$fast_list_map.invoke(util.clj:804) 
~[storm-core-0.9.0.1.jar:na]

        at 
backtype.storm.daemon.worker$mk_transfer_fn$fn__4335.invoke(worker.clj:108) 
~[storm-core-0.9.0.1.jar:na]

        at 
backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4060.invoke(executor.clj:240)
 ~[storm-core-0.9.0.1.jar:na]

        at 
backtype.storm.disruptor$clojure_handler$reify__836.onEvent(disruptor.clj:43) 
~[storm-core-0.9.0.1.jar:na]

        at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
 ~[storm-core-0.9.0.1.jar:na]

        ... 6 common frames omitted



--
dashengju
+86 13810875910
[email protected]<mailto:[email protected]>

Reply via email to