Hello! You can also have a look at this post: http://stackoverflow.com/questions/24413088/storm-max-spout-pending. It might be helpful. Regards, Florin
On Sun, Jun 7, 2015 at 4:17 PM, Nathan Leung <ncle...@gmail.com> wrote: > You should emit with a message id, which will prevent too many messages > from being in flight simultaneously, which will alleviate your out of > memory conditions. > On Jun 7, 2015 5:05 AM, "Michail Toutoudakis" <mix...@gmail.com> wrote: > >> What is the best spout implementation for reading input data from file? I >> have implemented a spout for reading input data from file using a scanner >> which seems to perform better than buffered file reader. >> However i still loose some values, not many this time about 1%, but the >> problem is that after a few minutes of run i get java out of memory >> exception and i believe it has to do with values buffering. >> My spout implementation is: >> >> package tuc.LSH.storm.spouts; >> >> import backtype.storm.spout.SpoutOutputCollector; >> import backtype.storm.task.TopologyContext; >> import backtype.storm.topology.IRichSpout; >> import backtype.storm.topology.OutputFieldsDeclarer; >> import backtype.storm.topology.base.BaseRichSpout; >> import backtype.storm.tuple.Fields; >> import backtype.storm.tuple.Values; >> import backtype.storm.utils.Utils; >> import tuc.LSH.conf.Consts; >> >> import javax.rmi.CORBA.Util; >> import java.io.*; >> import java.util.Map; >> import java.util.Scanner; >> >> /** >> * Created by mixtou on 15/5/15. >> */ >> public class FileReaderSpout extends BaseRichSpout { >> //public class FileReaderSpout implements IRichSpout { >> >> private SpoutOutputCollector collector; >> private Scanner scanner; >> private boolean completed; >> private TopologyContext context; >> private int spout_idx; >> private int spout_id; >> private Map config; >> private int noOfFailedWords; >> private int noOfAckedWords; >> >> @Override >> public void declareOutputFields(OutputFieldsDeclarer >> outputFieldsDeclarer) { >> outputFieldsDeclarer.declareStream("data", new Fields("streamId", >> "timestamp", "value")); >> >> >> } >> >> @Override >> public void open(Map config, TopologyContext topologyContext, >> SpoutOutputCollector spoutOutputCollector) { >> this.context = topologyContext; >> this.spout_idx = context.getThisTaskIndex(); >> this.spout_id = context.getThisTaskId(); >> this.collector = spoutOutputCollector; >> this.config = config; >> this.completed = false; >> this.noOfFailedWords = 0; >> this.noOfAckedWords = 0; >> >> try { >> this.scanner = new Scanner(new >> File(config.get(file_to_read()).toString())); >> System.err.println("Scanner Reading File: " + >> config.get(file_to_read()).toString() + " Spout index: " + spout_idx); >> } catch (FileNotFoundException e) { >> e.printStackTrace(); >> } >> >> } >> >> @Override >> public void nextTuple() { >> >> if(!completed) { >> if (scanner.hasNextLine()) { >> String[] temp = scanner.nextLine().split(","); >> // System.err.println("============== " + temp[0] + " + " + >> temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value >> collector.emit("data", new Values(temp[0], temp[2], >> temp[3]), temp[0]); //emmit the correct data to next bolt without guarantee >> delivery >> Utils.sleep(1); >> } else { >> System.err.println("End of File Closing Reader"); >> scanner.close(); >> completed = true; >> } >> } >> >> } >> >> private String file_to_read() { >> // this.spout_id = context.getThisTaskId(); >> if (Consts.NO_OF_SPOUTS > 1) { >> int file_no = spout_idx % Consts.NO_OF_SPOUTS; >> return "data" + file_no; >> } else { >> return "data"; >> } >> } >> >> @Override >> public void ack(Object msgId) { >> super.ack(msgId); >> noOfAckedWords++; >> // System.out.println("OK tuple acked from bolt: " + msgId + " no of >> acked word " + noOfAckedWords); >> } >> >> @Override >> public void fail(Object msgId) { >> super.fail(msgId); >> noOfFailedWords++; >> System.err.println("ERROR: " + context.getThisComponentId() + " " + >> msgId + " no of words failed " + noOfFailedWords); >> >> } >> >> } >> >> >>