I think the problem is when My topology is working the thread calling
nextTuple seems to be busy... Why the method isn't called ?

Someone can guid me to some documentation or the code calling nextTuple
just to understand what is blocking....

Thank you guys

2016-05-09 9:57 GMT+02:00 Adrien Carreira <a...@reportlinker.com>:

> Hi there,
>
> I'm using Storm to build a web-crawler, using Storm Crawler SDK.
>
> I'm also using Redis to store new links discovered.
>
> I've a Spout to consume those url. After many debug , I've built the Spout
> like this :
>
> public class OutlinkSpoutRedis extends BaseRichSpout {
>
>     private static final Logger LOG = LoggerFactory
>             .getLogger(OutlinkSpoutRedis.class);
>     private LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<>();
>     private LinkedBlockingQueue<String> ackQueue = new 
> LinkedBlockingQueue<>();
>     private LinkedBlockingQueue<String> failQueue = new 
> LinkedBlockingQueue<>();
>
>
>     @Override
>     public void nextTuple() {
>         LOG.info(">>> Calling nextTuple");
>
>         if (beingProcessed.size() >= 200000) {
>             LOG.info("Too much beeing processed");
>             Utils.sleep(50);
>             return;
>         }
>
>         LOG.info("Pooling from queue");
>         Values ret = queue.poll();
>
>         if (ret == null) {
>             LOG.info("Pooling from queue = null");
>             Utils.sleep(50);
>             return;
>         }
>
>         LOG.info("Emitting one url");
>
>         String url = ret.get(0).toString();
>         beingProcessed.put(url, "");
>
>         this._collector.emit(ret, url);
>     }
>
>     @Override
>     public void ack(Object msgId) {
>         LOG.info("Acking");
>         this.beingProcessed.remove(msgId);
>         this.ackQueue.offer((String) msgId);
>     }
>
>     @Override
>     public void fail(Object msgId) {
>         LOG.error("Fail tuple {}", msgId);
>         this.beingProcessed.remove(msgId);
>         this.failQueue.offer((String) msgId);
>     }
>
>     private class ProducerThread extends Thread {
>         @Override
>         public void run() {
>             while (activated) {
>                 try {
>                     if (this.queue.size() <= 1000) {
>                         this.populateQueue();
>                     }
>
>                     Utils.sleep(100);
>                 } catch (Exception e) {
>                     LOG.error("Error reading queues from redis", e);
>                 }
>             }
>         }
>
>         private void populateQueue() {
>             // Calling Redis to populate Queue
>             queue.offer(new Values(url, metadata));
>         }
>     }
>
>     private abstract class AckFailThread extends Thread {
>         @Override
>         public void run() {
>             while (activated) {
>                 String message = queue.poll(1, TimeUnit.SECONDS);
>
>                 if (message != null) {
>                     this.handleMessage(message);
>                 }
>             }
>         }
>
>         protected abstract void handleMessage(String message);
>     }}
>
>
> I've remove unnecessary code.
> To understand : nextTuple is polling from a queue (populated on another
> thread) and ack,fail are emitting to a queue, consumed in two another
> thread. So, those three methods are not blocking.
>
> My problem is on running state, my spout is not called sometimes about
> thirty seconds, but there still message on nextTuple queue to be consumed.
> The spout is not acking or failling, So why the  Spout thread is blocked ?
>
> Thank
>
>
>
>

Reply via email to