I think the problem is when My topology is working the thread calling nextTuple seems to be busy... Why the method isn't called ?
Someone can guid me to some documentation or the code calling nextTuple just to understand what is blocking.... Thank you guys 2016-05-09 9:57 GMT+02:00 Adrien Carreira <a...@reportlinker.com>: > Hi there, > > I'm using Storm to build a web-crawler, using Storm Crawler SDK. > > I'm also using Redis to store new links discovered. > > I've a Spout to consume those url. After many debug , I've built the Spout > like this : > > public class OutlinkSpoutRedis extends BaseRichSpout { > > private static final Logger LOG = LoggerFactory > .getLogger(OutlinkSpoutRedis.class); > private LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<>(); > private LinkedBlockingQueue<String> ackQueue = new > LinkedBlockingQueue<>(); > private LinkedBlockingQueue<String> failQueue = new > LinkedBlockingQueue<>(); > > > @Override > public void nextTuple() { > LOG.info(">>> Calling nextTuple"); > > if (beingProcessed.size() >= 200000) { > LOG.info("Too much beeing processed"); > Utils.sleep(50); > return; > } > > LOG.info("Pooling from queue"); > Values ret = queue.poll(); > > if (ret == null) { > LOG.info("Pooling from queue = null"); > Utils.sleep(50); > return; > } > > LOG.info("Emitting one url"); > > String url = ret.get(0).toString(); > beingProcessed.put(url, ""); > > this._collector.emit(ret, url); > } > > @Override > public void ack(Object msgId) { > LOG.info("Acking"); > this.beingProcessed.remove(msgId); > this.ackQueue.offer((String) msgId); > } > > @Override > public void fail(Object msgId) { > LOG.error("Fail tuple {}", msgId); > this.beingProcessed.remove(msgId); > this.failQueue.offer((String) msgId); > } > > private class ProducerThread extends Thread { > @Override > public void run() { > while (activated) { > try { > if (this.queue.size() <= 1000) { > this.populateQueue(); > } > > Utils.sleep(100); > } catch (Exception e) { > LOG.error("Error reading queues from redis", e); > } > } > } > > private void populateQueue() { > // Calling Redis to populate Queue > queue.offer(new Values(url, metadata)); > } > } > > private abstract class AckFailThread extends Thread { > @Override > public void run() { > while (activated) { > String message = queue.poll(1, TimeUnit.SECONDS); > > if (message != null) { > this.handleMessage(message); > } > } > } > > protected abstract void handleMessage(String message); > }} > > > I've remove unnecessary code. > To understand : nextTuple is polling from a queue (populated on another > thread) and ack,fail are emitting to a queue, consumed in two another > thread. So, those three methods are not blocking. > > My problem is on running state, my spout is not called sometimes about > thirty seconds, but there still message on nextTuple queue to be consumed. > The spout is not acking or failling, So why the Spout thread is blocked ? > > Thank > > > >