Hello!
  You can also have a look at this post:
http://stackoverflow.com/questions/24413088/storm-max-spout-pending. It
might be helpful.
Regards,
 Florin

On Sun, Jun 7, 2015 at 4:17 PM, Nathan Leung <ncle...@gmail.com> wrote:

> You should emit with a message id, which will prevent too many messages
> from being in flight simultaneously, which will alleviate your out of
> memory conditions.
> On Jun 7, 2015 5:05 AM, "Michail Toutoudakis" <mix...@gmail.com> wrote:
>
>> What is the best spout implementation for reading input data from file? I
>> have implemented a spout for reading input data from file using a scanner
>> which seems to perform better than buffered file reader.
>> However i still loose some values, not many this time about 1%, but the
>> problem is that after a few minutes of run i get java out of memory
>> exception and i believe it has to do with values buffering.
>> My spout implementation is:
>>
>> package tuc.LSH.storm.spouts;
>>
>> import backtype.storm.spout.SpoutOutputCollector;
>> import backtype.storm.task.TopologyContext;
>> import backtype.storm.topology.IRichSpout;
>> import backtype.storm.topology.OutputFieldsDeclarer;
>> import backtype.storm.topology.base.BaseRichSpout;
>> import backtype.storm.tuple.Fields;
>> import backtype.storm.tuple.Values;
>> import backtype.storm.utils.Utils;
>> import tuc.LSH.conf.Consts;
>>
>> import javax.rmi.CORBA.Util;
>> import java.io.*;
>> import java.util.Map;
>> import java.util.Scanner;
>>
>> /**
>>  * Created by mixtou on 15/5/15.
>>  */
>> public class FileReaderSpout extends BaseRichSpout {
>> //public class FileReaderSpout implements IRichSpout {
>>
>>     private SpoutOutputCollector collector;
>>     private Scanner scanner;
>>     private boolean completed;
>>     private TopologyContext context;
>>     private int spout_idx;
>>     private int spout_id;
>>     private Map config;
>>     private int noOfFailedWords;
>>     private int noOfAckedWords;
>>
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer 
>> outputFieldsDeclarer) {
>>         outputFieldsDeclarer.declareStream("data", new Fields("streamId", 
>> "timestamp", "value"));
>>
>>
>>     }
>>
>>     @Override
>>     public void open(Map config, TopologyContext topologyContext, 
>> SpoutOutputCollector spoutOutputCollector) {
>>         this.context = topologyContext;
>>         this.spout_idx = context.getThisTaskIndex();
>>         this.spout_id = context.getThisTaskId();
>>         this.collector = spoutOutputCollector;
>>         this.config = config;
>>         this.completed = false;
>>         this.noOfFailedWords = 0;
>>         this.noOfAckedWords = 0;
>>
>>         try {
>>             this.scanner = new Scanner(new 
>> File(config.get(file_to_read()).toString()));
>>             System.err.println("Scanner Reading File: " + 
>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>>         } catch (FileNotFoundException e) {
>>             e.printStackTrace();
>>         }
>>
>>     }
>>
>>     @Override
>>     public void nextTuple() {
>>
>>         if(!completed) {
>>             if (scanner.hasNextLine()) {
>>                 String[] temp = scanner.nextLine().split(",");
>> //            System.err.println("============== " + temp[0] + " + " + 
>> temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>>                 collector.emit("data", new Values(temp[0], temp[2], 
>> temp[3]), temp[0]); //emmit the correct data to next bolt without guarantee 
>> delivery
>>                 Utils.sleep(1);
>>             } else {
>>                 System.err.println("End of File Closing Reader");
>>                 scanner.close();
>>                 completed = true;
>>             }
>>         }
>>
>>     }
>>
>>     private String file_to_read() {
>> //        this.spout_id = context.getThisTaskId();
>>         if (Consts.NO_OF_SPOUTS > 1) {
>>             int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>>             return "data" + file_no;
>>         } else {
>>             return "data";
>>         }
>>     }
>>
>>     @Override
>>     public void ack(Object msgId) {
>>         super.ack(msgId);
>>         noOfAckedWords++;
>> //        System.out.println("OK tuple acked from bolt: " + msgId + " no of 
>> acked word " + noOfAckedWords);
>>     }
>>
>>     @Override
>>     public void fail(Object msgId) {
>>         super.fail(msgId);
>>         noOfFailedWords++;
>>         System.err.println("ERROR: " + context.getThisComponentId() + " " + 
>> msgId + " no of words failed " + noOfFailedWords);
>>
>>     }
>>
>> }
>>
>>
>>

Reply via email to