Hi there, I'm using Storm to build a web-crawler, using Storm Crawler SDK.
I'm also using Redis to store new links discovered. I've a Spout to consume those url. After many debug , I've built the Spout like this : public class OutlinkSpoutRedis extends BaseRichSpout { private static final Logger LOG = LoggerFactory .getLogger(OutlinkSpoutRedis.class); private LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<>(); private LinkedBlockingQueue<String> ackQueue = new LinkedBlockingQueue<>(); private LinkedBlockingQueue<String> failQueue = new LinkedBlockingQueue<>(); @Override public void nextTuple() { LOG.info(">>> Calling nextTuple"); if (beingProcessed.size() >= 200000) { LOG.info("Too much beeing processed"); Utils.sleep(50); return; } LOG.info("Pooling from queue"); Values ret = queue.poll(); if (ret == null) { LOG.info("Pooling from queue = null"); Utils.sleep(50); return; } LOG.info("Emitting one url"); String url = ret.get(0).toString(); beingProcessed.put(url, ""); this._collector.emit(ret, url); } @Override public void ack(Object msgId) { LOG.info("Acking"); this.beingProcessed.remove(msgId); this.ackQueue.offer((String) msgId); } @Override public void fail(Object msgId) { LOG.error("Fail tuple {}", msgId); this.beingProcessed.remove(msgId); this.failQueue.offer((String) msgId); } private class ProducerThread extends Thread { @Override public void run() { while (activated) { try { if (this.queue.size() <= 1000) { this.populateQueue(); } Utils.sleep(100); } catch (Exception e) { LOG.error("Error reading queues from redis", e); } } } private void populateQueue() { // Calling Redis to populate Queue queue.offer(new Values(url, metadata)); } } private abstract class AckFailThread extends Thread { @Override public void run() { while (activated) { String message = queue.poll(1, TimeUnit.SECONDS); if (message != null) { this.handleMessage(message); } } } protected abstract void handleMessage(String message); }} I've remove unnecessary code. To understand : nextTuple is polling from a queue (populated on another thread) and ack,fail are emitting to a queue, consumed in two another thread. So, those three methods are not blocking. My problem is on running state, my spout is not called sometimes about thirty seconds, but there still message on nextTuple queue to be consumed. The spout is not acking or failling, So why the Spout thread is blocked ? Thank