I'm trying to shutdown a long running bolt (based on the sample ExclamationBolt) but it seems the cluster.killTopology() call is only interrupting one task and continuing to execute other tasks. To simplify things I am only running one bolt in the topology. This is the bolt code I am running (locally):
@Override public void execute(Tuple tuple) { logger.info("EXECUTE"); try { Thread.sleep(2000); _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } catch (InterruptedException e) { logger.info("Thread Interrupted"); } } In the logs I see something like this 7927 [Thread-12-exclaim1] INFO ThreadedExclamationBolt - EXECUTE ... 16572 [Thread-8-exclaim1] INFO ThreadedExclamationBolt - Thread Interrupted 16572 [Thread-8-exclaim1] INFO ThreadedExclamationBolt - EXECUTE ... 17950 [Thread-10-exclaim1] INFO ThreadedExclamationBolt - EXECUTE 17958 [Thread-12-exclaim1] INFO ThreadedExclamationBolt - EXECUTE The only workaround I have found is to set an instance variable when I catch the single InterruptedException i.e. public class ThreadedExclamationBolt extends BaseRichBolt { ... private AtomicBoolean sleepInterrupted = new AtomicBoolean(false); @Override public void execute(Tuple tuple) { try { if (sleepInterrupted.get()) return; Thread.sleep(2000); _collector.emit(tuple, new Values(tuple.getString(0))); _collector.ack(tuple); } catch (InterruptedException e) { sleepInterrupted.set(true); } } But this doesn't seem right to me, what am I missing here?