Hi Marc,

(I'm a storm newbie myself).

As I understand it your question has two parts:
1. Can Storm detect a Bolt that is "stuck". AFAIK the answer is no.
2. Can Storm do anything about it. AFAIK the answer is no.

What I would do in your situation:
1. I would write a delegate bolt that would delegate all the execute/ack/fail 
to "problematic bolt". 
    I would then connect to that delegate bolt a tick tuple 
http://kitmenke.com/blog/2014/08/04/tick-tuples-within-storm/  which would 
update a TTL data structure
   (such as expireAfterWrite and removeListener as described in 
http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/cache/CacheBuilder.html
  )
   Each time the tick arrives it writes to the Cache which updates the TTL. 
When the TTL expires you will get an event on a separate thread.
   The delegate bolt would ofcourse cannot forward any of the tick tuples to 
the "problematic bolt"

2. There are four choices:
    2.1. Try and interrupt the execute() thread. This is an anti-pattern in 
java since many libraries swallow InterruptedException and do not call 
Thread.interrupted(). So it most likely not work. 
    2.2  Have the "problematic bolt" implement RestartableBolt interface which 
has a restart method that sets everything back to normal. This is also not 
recommended as trying to fix an unknown bug state in code is always bad. But if 
there is a specific place you know the bolt gets stuck, then the restart method 
could just flip an AtomicBoolean and that code would check if that 
AtomicBoolean is set. AtomicBoolean has its cost in terms of CPU pipeline 
caching so make sure you don't put it too deep inside the for loops. 
   2.3 Have the "problematic bolt" run in a separate process. This would 
require implementing the multilang protocol in Java (it is already implemented 
in python/nodejs/ruby). The bolt would get hit by serialization/deserialization 
CPU cycles per emit. 
    Once you implemented the multilang protocol, then run the problematic bolt 
using ShellBolt which has the PID of the child process and can kill it.
   2.4 Throw an exception that will crash the entire worker process. The 
supervisor would then restart the process. But all of the executors on the 
workers get restarted too (and you get a few tens of seconds of downtime).

Regards,
Itai


________________________________________
From: Marc Vaillant <[email protected]>
Sent: Thursday, October 9, 2014 8:05 PM
To: [email protected]
Subject: Why won't storm restart failing tasks?

We have now replicated in the exclamation topology, a situation that is
occurring for us in production.  In short, one of the tasks gets into a
hung state so that all tuples that are sent to it fail.  We expected
storm to detect this failing state (task heartbeat, etc) and then
restart a new task/executor.  But that does not happen, storm just
continues to send tuples to to the failing task.  How can we get storm
to tear down the executor or task and spawn a new one?

To replicate the issue in the exclamation topology, we modified
TestWordSpout to emit the word "dummy" only once, and then emit from the
standard list as usual:

    public void nextTuple() {
        Utils.sleep(_sleepTime);

                                if(!_isSent)
                                {
                                        _collector.emit(new Values("dummy"), 
new String("dummy" + _counter));
                                        _isSent = true;
                                }
                                else
                                {
                                        final String[] words = new String[] 
{"nathan", "mike", "jackson", "golda", "bertels"};
                                        final Random rand = new Random();
                                        final String word = 
words[rand.nextInt(words.length)] + _counter;
                                        _collector.emit(new Values(word), word);
                                }
                                _counter++;
                }

In the ExclamationBolt's execute method we go into an infinite loop if
the tuple received is "dummy":


    public void execute(Tuple tuple){
                        if(tuple.getString(0) == "dummy")
                        {
                                while(true) {}
                        }
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

The topology has a parallelism of 3 for the ExclaimationBolt so the task
that receives the "dummy" tuple fails every subsequent tuple it
receives because it is in stuck in an  infinite loop, but the other 2
tasks continue to process tuples normally.  While storm eventually fails
all the tuples that are sent to the "dummy" task, it never tears the
task down to restart a new one.

Any help greatly appreciated.

Thanks,
Marc

Reply via email to