Hi Marc,
(I'm a storm newbie myself).
As I understand it your question has two parts:
1. Can Storm detect a Bolt that is "stuck". AFAIK the answer is no.
2. Can Storm do anything about it. AFAIK the answer is no.
What I would do in your situation:
1. I would write a delegate bolt that would delegate all the execute/ack/fail
to "problematic bolt".
I would then connect to that delegate bolt a tick tuple
http://kitmenke.com/blog/2014/08/04/tick-tuples-within-storm/ which would
update a TTL data structure
(such as expireAfterWrite and removeListener as described in
http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/cache/CacheBuilder.html
)
Each time the tick arrives it writes to the Cache which updates the TTL.
When the TTL expires you will get an event on a separate thread.
The delegate bolt would ofcourse cannot forward any of the tick tuples to
the "problematic bolt"
2. There are four choices:
2.1. Try and interrupt the execute() thread. This is an anti-pattern in
java since many libraries swallow InterruptedException and do not call
Thread.interrupted(). So it most likely not work.
2.2 Have the "problematic bolt" implement RestartableBolt interface which
has a restart method that sets everything back to normal. This is also not
recommended as trying to fix an unknown bug state in code is always bad. But if
there is a specific place you know the bolt gets stuck, then the restart method
could just flip an AtomicBoolean and that code would check if that
AtomicBoolean is set. AtomicBoolean has its cost in terms of CPU pipeline
caching so make sure you don't put it too deep inside the for loops.
2.3 Have the "problematic bolt" run in a separate process. This would
require implementing the multilang protocol in Java (it is already implemented
in python/nodejs/ruby). The bolt would get hit by serialization/deserialization
CPU cycles per emit.
Once you implemented the multilang protocol, then run the problematic bolt
using ShellBolt which has the PID of the child process and can kill it.
2.4 Throw an exception that will crash the entire worker process. The
supervisor would then restart the process. But all of the executors on the
workers get restarted too (and you get a few tens of seconds of downtime).
Regards,
Itai
________________________________________
From: Marc Vaillant <[email protected]>
Sent: Thursday, October 9, 2014 8:05 PM
To: [email protected]
Subject: Why won't storm restart failing tasks?
We have now replicated in the exclamation topology, a situation that is
occurring for us in production. In short, one of the tasks gets into a
hung state so that all tuples that are sent to it fail. We expected
storm to detect this failing state (task heartbeat, etc) and then
restart a new task/executor. But that does not happen, storm just
continues to send tuples to to the failing task. How can we get storm
to tear down the executor or task and spawn a new one?
To replicate the issue in the exclamation topology, we modified
TestWordSpout to emit the word "dummy" only once, and then emit from the
standard list as usual:
public void nextTuple() {
Utils.sleep(_sleepTime);
if(!_isSent)
{
_collector.emit(new Values("dummy"),
new String("dummy" + _counter));
_isSent = true;
}
else
{
final String[] words = new String[]
{"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word =
words[rand.nextInt(words.length)] + _counter;
_collector.emit(new Values(word), word);
}
_counter++;
}
In the ExclamationBolt's execute method we go into an infinite loop if
the tuple received is "dummy":
public void execute(Tuple tuple){
if(tuple.getString(0) == "dummy")
{
while(true) {}
}
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
The topology has a parallelism of 3 for the ExclaimationBolt so the task
that receives the "dummy" tuple fails every subsequent tuple it
receives because it is in stuck in an infinite loop, but the other 2
tasks continue to process tuples normally. While storm eventually fails
all the tuples that are sent to the "dummy" task, it never tears the
task down to restart a new one.
Any help greatly appreciated.
Thanks,
Marc