We have now replicated in the exclamation topology, a situation that is
occurring for us in production. In short, one of the tasks gets into a
hung state so that all tuples that are sent to it fail. We expected
storm to detect this failing state (task heartbeat, etc) and then
restart a new task/executor. But that does not happen, storm just
continues to send tuples to to the failing task. How can we get storm
to tear down the executor or task and spawn a new one?
To replicate the issue in the exclamation topology, we modified
TestWordSpout to emit the word "dummy" only once, and then emit from the
standard list as usual:
public void nextTuple() {
Utils.sleep(_sleepTime);
if(!_isSent)
{
_collector.emit(new Values("dummy"),
new String("dummy" + _counter));
_isSent = true;
}
else
{
final String[] words = new String[]
{"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word =
words[rand.nextInt(words.length)] + _counter;
_collector.emit(new Values(word), word);
}
_counter++;
}
In the ExclamationBolt's execute method we go into an infinite loop if
the tuple received is "dummy":
public void execute(Tuple tuple){
if(tuple.getString(0) == "dummy")
{
while(true) {}
}
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
The topology has a parallelism of 3 for the ExclaimationBolt so the task
that receives the "dummy" tuple fails every subsequent tuple it
receives because it is in stuck in an infinite loop, but the other 2
tasks continue to process tuples normally. While storm eventually fails
all the tuples that are sent to the "dummy" task, it never tears the
task down to restart a new one.
Any help greatly appreciated.
Thanks,
Marc