Hi Itai, Thanks very much for your response and suggestions. My first naive thought is: why is this so hard? Isn't this the most basic fault tolerance provided by the simplest load balancers (which is essentially what a shuffle grouping is)? Second question is that I see config parameters like task.heartbeat.frequency.secs, what is the purpose of a task heartbeat if not to detect this kind of situation?
Thanks, Marc On Fri, Oct 10, 2014 at 09:22:38AM +0000, Itai Frenkel wrote: > Hi Marc, > > (I'm a storm newbie myself). > > As I understand it your question has two parts: > 1. Can Storm detect a Bolt that is "stuck". AFAIK the answer is no. > 2. Can Storm do anything about it. AFAIK the answer is no. > > What I would do in your situation: > 1. I would write a delegate bolt that would delegate all the execute/ack/fail > to "problematic bolt". > I would then connect to that delegate bolt a tick tuple > http://kitmenke.com/blog/2014/08/04/tick-tuples-within-storm/ which would > update a TTL data structure > (such as expireAfterWrite and removeListener as described in > http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/cache/CacheBuilder.html > ) > Each time the tick arrives it writes to the Cache which updates the TTL. > When the TTL expires you will get an event on a separate thread. > The delegate bolt would ofcourse cannot forward any of the tick tuples to > the "problematic bolt" > > 2. There are four choices: > 2.1. Try and interrupt the execute() thread. This is an anti-pattern in > java since many libraries swallow InterruptedException and do not call > Thread.interrupted(). So it most likely not work. > 2.2 Have the "problematic bolt" implement RestartableBolt interface > which has a restart method that sets everything back to normal. This is also > not recommended as trying to fix an unknown bug state in code is always bad. > But if there is a specific place you know the bolt gets stuck, then the > restart method could just flip an AtomicBoolean and that code would check if > that AtomicBoolean is set. AtomicBoolean has its cost in terms of CPU > pipeline caching so make sure you don't put it too deep inside the for loops. > 2.3 Have the "problematic bolt" run in a separate process. This would > require implementing the multilang protocol in Java (it is already > implemented in python/nodejs/ruby). The bolt would get hit by > serialization/deserialization CPU cycles per emit. > Once you implemented the multilang protocol, then run the problematic > bolt using ShellBolt which has the PID of the child process and can kill it. > 2.4 Throw an exception that will crash the entire worker process. The > supervisor would then restart the process. But all of the executors on the > workers get restarted too (and you get a few tens of seconds of downtime). > > Regards, > Itai > > > ________________________________________ > From: Marc Vaillant <[email protected]> > Sent: Thursday, October 9, 2014 8:05 PM > To: [email protected] > Subject: Why won't storm restart failing tasks? > > We have now replicated in the exclamation topology, a situation that is > occurring for us in production. In short, one of the tasks gets into a > hung state so that all tuples that are sent to it fail. We expected > storm to detect this failing state (task heartbeat, etc) and then > restart a new task/executor. But that does not happen, storm just > continues to send tuples to to the failing task. How can we get storm > to tear down the executor or task and spawn a new one? > > To replicate the issue in the exclamation topology, we modified > TestWordSpout to emit the word "dummy" only once, and then emit from the > standard list as usual: > > public void nextTuple() { > Utils.sleep(_sleepTime); > > if(!_isSent) > { > _collector.emit(new Values("dummy"), > new String("dummy" + _counter)); > _isSent = true; > } > else > { > final String[] words = new String[] > {"nathan", "mike", "jackson", "golda", "bertels"}; > final Random rand = new Random(); > final String word = > words[rand.nextInt(words.length)] + _counter; > _collector.emit(new Values(word), > word); > } > _counter++; > } > > In the ExclamationBolt's execute method we go into an infinite loop if > the tuple received is "dummy": > > > public void execute(Tuple tuple){ > if(tuple.getString(0) == "dummy") > { > while(true) {} > } > _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); > _collector.ack(tuple); > } > > The topology has a parallelism of 3 for the ExclaimationBolt so the task > that receives the "dummy" tuple fails every subsequent tuple it > receives because it is in stuck in an infinite loop, but the other 2 > tasks continue to process tuples normally. While storm eventually fails > all the tuples that are sent to the "dummy" task, it never tears the > task down to restart a new one. > > Any help greatly appreciated. > > Thanks, > Marc
