Hi there,

I'm using Storm to build a web-crawler, using Storm Crawler SDK.

I'm also using Redis to store new links discovered.

I've a Spout to consume those url. After many debug , I've built the Spout
like this :

public class OutlinkSpoutRedis extends BaseRichSpout {

    private static final Logger LOG = LoggerFactory
            .getLogger(OutlinkSpoutRedis.class);
    private LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<>();
    private LinkedBlockingQueue<String> ackQueue = new LinkedBlockingQueue<>();
    private LinkedBlockingQueue<String> failQueue = new LinkedBlockingQueue<>();


    @Override
    public void nextTuple() {
        LOG.info(">>> Calling nextTuple");

        if (beingProcessed.size() >= 200000) {
            LOG.info("Too much beeing processed");
            Utils.sleep(50);
            return;
        }

        LOG.info("Pooling from queue");
        Values ret = queue.poll();

        if (ret == null) {
            LOG.info("Pooling from queue = null");
            Utils.sleep(50);
            return;
        }

        LOG.info("Emitting one url");

        String url = ret.get(0).toString();
        beingProcessed.put(url, "");

        this._collector.emit(ret, url);
    }

    @Override
    public void ack(Object msgId) {
        LOG.info("Acking");
        this.beingProcessed.remove(msgId);
        this.ackQueue.offer((String) msgId);
    }

    @Override
    public void fail(Object msgId) {
        LOG.error("Fail tuple {}", msgId);
        this.beingProcessed.remove(msgId);
        this.failQueue.offer((String) msgId);
    }

    private class ProducerThread extends Thread {
        @Override
        public void run() {
            while (activated) {
                try {
                    if (this.queue.size() <= 1000) {
                        this.populateQueue();
                    }

                    Utils.sleep(100);
                } catch (Exception e) {
                    LOG.error("Error reading queues from redis", e);
                }
            }
        }

        private void populateQueue() {
            // Calling Redis to populate Queue
            queue.offer(new Values(url, metadata));
        }
    }

    private abstract class AckFailThread extends Thread {
        @Override
        public void run() {
            while (activated) {
                String message = queue.poll(1, TimeUnit.SECONDS);

                if (message != null) {
                    this.handleMessage(message);
                }
            }
        }

        protected abstract void handleMessage(String message);
    }}


I've remove unnecessary code.
To understand : nextTuple is polling from a queue (populated on another
thread) and ack,fail are emitting to a queue, consumed in two another
thread. So, those three methods are not blocking.

My problem is on running state, my spout is not called sometimes about
thirty seconds, but there still message on nextTuple queue to be consumed.
The spout is not acking or failling, So why the  Spout thread is blocked ?

Thank

Reply via email to