Hi Marc,

Did you override the "fail" method of BaseRichSpout?

Regards,
Onur


On Tue, Oct 14, 2014 at 6:24 PM, Marc Vaillant <[email protected]>
wrote:

> How about:
>
>
> https://storm.incubator.apache.org/apidocs/backtype/storm/Config.html#NIMBUS_TASK_TIMEOUT_SECS
>
> "How long without heartbeating a task can go before nimbus will consider
> the task dead and reassign it to another location."
>
> and
>
>
> https://storm.incubator.apache.org/apidocs/backtype/storm/Config.html#NIMBUS_REASSIGN
>
> "Whether or not nimbus should reassign tasks if it detects that a task
> goes down. Defaults to true, and it's not recommended to change this
> value."
>
> Marc
>
> On Sat, Oct 11, 2014 at 08:42:16PM +0000, Itai Frenkel wrote:
> > AFAIK the task.heartbeat.frequency.secs is related to zookeeper
> bookeeping. Not sure if it is being act upon.
> >
> > See also https://github.com/apache/storm/pull/286 (related to 2.3
> below).
> >
> > Itai
> >
> > ________________________________________
> > From: Marc Vaillant <[email protected]>
> > Sent: Friday, October 10, 2014 7:32 PM
> > To: [email protected]
> > Subject: Re: Why won't storm restart failing tasks?
> >
> > Hi Itai,
> >
> > Thanks very much for your response and suggestions.  My first naive
> > thought is: why is this so hard? Isn't this the most basic fault
> > tolerance provided by the simplest load balancers (which is essentially
> > what a shuffle grouping is)?  Second question is that I see config
> > parameters like task.heartbeat.frequency.secs, what is the purpose of a
> > task heartbeat if not to detect this kind of situation?
> >
> > Thanks,
> > Marc
> >
> > On Fri, Oct 10, 2014 at 09:22:38AM +0000, Itai Frenkel wrote:
> > > Hi Marc,
> > >
> > > (I'm a storm newbie myself).
> > >
> > > As I understand it your question has two parts:
> > > 1. Can Storm detect a Bolt that is "stuck". AFAIK the answer is no.
> > > 2. Can Storm do anything about it. AFAIK the answer is no.
> > >
> > > What I would do in your situation:
> > > 1. I would write a delegate bolt that would delegate all the
> execute/ack/fail to "problematic bolt".
> > >     I would then connect to that delegate bolt a tick tuple
> http://kitmenke.com/blog/2014/08/04/tick-tuples-within-storm/  which
> would update a TTL data structure
> > >    (such as expireAfterWrite and removeListener as described in
> http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/cache/CacheBuilder.html
> )
> > >    Each time the tick arrives it writes to the Cache which updates the
> TTL. When the TTL expires you will get an event on a separate thread.
> > >    The delegate bolt would ofcourse cannot forward any of the tick
> tuples to the "problematic bolt"
> > >
> > > 2. There are four choices:
> > >     2.1. Try and interrupt the execute() thread. This is an
> anti-pattern in java since many libraries swallow InterruptedException and
> do not call Thread.interrupted(). So it most likely not work.
> > >     2.2  Have the "problematic bolt" implement RestartableBolt
> interface which has a restart method that sets everything back to normal.
> This is also not recommended as trying to fix an unknown bug state in code
> is always bad. But if there is a specific place you know the bolt gets
> stuck, then the restart method could just flip an AtomicBoolean and that
> code would check if that AtomicBoolean is set. AtomicBoolean has its cost
> in terms of CPU pipeline caching so make sure you don't put it too deep
> inside the for loops.
> > >    2.3 Have the "problematic bolt" run in a separate process. This
> would require implementing the multilang protocol in Java (it is already
> implemented in python/nodejs/ruby). The bolt would get hit by
> serialization/deserialization CPU cycles per emit.
> > >     Once you implemented the multilang protocol, then run the
> problematic bolt using ShellBolt which has the PID of the child process and
> can kill it.
> > >    2.4 Throw an exception that will crash the entire worker process.
> The supervisor would then restart the process. But all of the executors on
> the workers get restarted too (and you get a few tens of seconds of
> downtime).
> > >
> > > Regards,
> > > Itai
> > >
> > >
> > > ________________________________________
> > > From: Marc Vaillant <[email protected]>
> > > Sent: Thursday, October 9, 2014 8:05 PM
> > > To: [email protected]
> > > Subject: Why won't storm restart failing tasks?
> > >
> > > We have now replicated in the exclamation topology, a situation that is
> > > occurring for us in production.  In short, one of the tasks gets into a
> > > hung state so that all tuples that are sent to it fail.  We expected
> > > storm to detect this failing state (task heartbeat, etc) and then
> > > restart a new task/executor.  But that does not happen, storm just
> > > continues to send tuples to to the failing task.  How can we get storm
> > > to tear down the executor or task and spawn a new one?
> > >
> > > To replicate the issue in the exclamation topology, we modified
> > > TestWordSpout to emit the word "dummy" only once, and then emit from
> the
> > > standard list as usual:
> > >
> > >     public void nextTuple() {
> > >         Utils.sleep(_sleepTime);
> > >
> > >                                 if(!_isSent)
> > >                                 {
> > >                                         _collector.emit(new
> Values("dummy"), new String("dummy" + _counter));
> > >                                         _isSent = true;
> > >                                 }
> > >                                 else
> > >                                 {
> > >                                         final String[] words = new
> String[] {"nathan", "mike", "jackson", "golda", "bertels"};
> > >                                         final Random rand = new
> Random();
> > >                                         final String word =
> words[rand.nextInt(words.length)] + _counter;
> > >                                         _collector.emit(new
> Values(word), word);
> > >                                 }
> > >                                 _counter++;
> > >                 }
> > >
> > > In the ExclamationBolt's execute method we go into an infinite loop if
> > > the tuple received is "dummy":
> > >
> > >
> > >     public void execute(Tuple tuple){
> > >                         if(tuple.getString(0) == "dummy")
> > >                         {
> > >                                 while(true) {}
> > >                         }
> > >       _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
> > >       _collector.ack(tuple);
> > >     }
> > >
> > > The topology has a parallelism of 3 for the ExclaimationBolt so the
> task
> > > that receives the "dummy" tuple fails every subsequent tuple it
> > > receives because it is in stuck in an  infinite loop, but the other 2
> > > tasks continue to process tuples normally.  While storm eventually
> fails
> > > all the tuples that are sent to the "dummy" task, it never tears the
> > > task down to restart a new one.
> > >
> > > Any help greatly appreciated.
> > >
> > > Thanks,
> > > Marc
>



-- 
Onur Ünlü

Reply via email to