I'm trying to shutdown a long running bolt (based on the sample
ExclamationBolt) but it seems the cluster.killTopology() call is only
interrupting one task and continuing to execute other tasks. To simplify
things I am only running one bolt in the topology. This is the bolt code I
am running (locally):

public void execute(Tuple tuple) {
    try {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
    } catch (InterruptedException e) {
        logger.info("Thread Interrupted");

In the logs I see something like this

7927 [Thread-12-exclaim1] INFO  ThreadedExclamationBolt - EXECUTE
16572 [Thread-8-exclaim1] INFO  ThreadedExclamationBolt - Thread Interrupted
16572 [Thread-8-exclaim1] INFO  ThreadedExclamationBolt - EXECUTE
17950 [Thread-10-exclaim1] INFO ThreadedExclamationBolt - EXECUTE
17958 [Thread-12-exclaim1] INFO ThreadedExclamationBolt - EXECUTE

The only workaround I have found is to set an instance variable when I
catch the single InterruptedException i.e.

public class ThreadedExclamationBolt extends BaseRichBolt {
private AtomicBoolean sleepInterrupted = new AtomicBoolean(false);

public void execute(Tuple tuple) {
    try {
        if (sleepInterrupted.get())

        _collector.emit(tuple, new Values(tuple.getString(0)));
    } catch (InterruptedException e) {

But this doesn't seem right to me, what am I missing here?

Reply via email to