I am using a bolt to read data from a text file, and send them to a bolt. However i am loosing too many values, the Fail. I am newbie to storm and i don’t know where to look to debug this issue. Any ideas??
My Storm Spout code is: package tuc.LSH.storm.spouts; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import tuc.LSH.conf.Consts; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map; /** * Created by mixtou on 15/5/15. */ public class FileReaderSpout extends BaseRichSpout { private SpoutOutputCollector collector; private FileReader fileReader; private boolean completed; private TopologyContext context; private int spout_idx; private int spout_id; private Map config; private int noOfFailedWords; private int noOfAckedWords; @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declareStream("data", new Fields("streamId", "timestamp", "value")); } @Override public void open(Map config, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.context = topologyContext; this.spout_idx = context.getThisTaskIndex(); this.spout_id = context.getThisTaskId(); this.collector = spoutOutputCollector; this.config = config; this.completed = false; this.noOfFailedWords = 0; this.noOfAckedWords = 0; try { System.err.println("Reading File: " + config.get(file_to_read()).toString() + " Spout index: " + spout_idx); this.fileReader = new FileReader(config.get(file_to_read()).toString()); } catch (FileNotFoundException e) { throw new RuntimeException("Error reading file [" + config.get(file_to_read()) + "]"); } } @Override public void nextTuple() { if (this.completed) { return; } try { String str; BufferedReader reader = new BufferedReader(fileReader); while ((str = reader.readLine()) != null) { if (str.isEmpty()) { return; } String[] temp = str.split(","); // System.err.println("============== "+temp[0] + " + " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value collector.emit("data", new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct data to next bolt with guarantee delivery } } catch (Exception e) { System.err.println("Error Reading Tuple: " + e); throw new RuntimeException("Error Reading Tuple ", e); } finally { System.err.println("Finished Reading File. Message From Spout: " + spout_idx); this.completed = true; } Utils.sleep(100); } private String file_to_read() { // this.spout_id = context.getThisTaskId(); if (Consts.NO_OF_SPOUTS > 1) { int file_no = spout_idx % Consts.NO_OF_SPOUTS; return "data" + file_no; } else { return "data"; } } @Override public void ack(Object msgId) { super.ack(msgId); noOfAckedWords++; // System.out.println("OK tuple acked from bolt: " + msgId+" no of acked word "+noOfAckedWords); } @Override public void fail(Object msgId) { super.fail(msgId); noOfFailedWords++; System.err.println("ERROR: " + context.getThisComponentId() + " " + msgId + " no of words failed " + noOfFailedWords); } } And my bolt looks like this: package tuc.LSH.storm.bolts; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import sun.jvm.hotspot.runtime.*; import tuc.LSH.core.hashfunctions.HashFunctionsGen; import tuc.LSH.core.timeseries.UniversalBasicWindow; import java.lang.Thread; import java.util.*; /** * Created by mixtou on 17/5/15. */ public class LSHBolt extends BaseRichBolt { private int task_id; private OutputCollector collector; private UniversalBasicWindow universalBasicWindow; private String streamId; private String time; private Float value; @Override public void prepare(Map conf, TopologyContext topologyContext, OutputCollector outputCollector) { this.task_id = topologyContext.getThisTaskIndex(); this.collector = outputCollector; this.universalBasicWindow = new UniversalBasicWindow(); streamId = null; time = null; value = 0f; System.err.println("New Bolt with id: " + task_id); } @Override public void execute(Tuple tuple) { if (tuple.getSourceStreamId().equals("sync")) { System.out.println("Bolt task id " + task_id + " received from " + tuple.getSourceComponent() + " message " + tuple.getString(0)); System.out.println("Normalizing: Basic Window of Bolt " + task_id); universalBasicWindow.normalize(); //fill the rest of the streams with last received value to make them same size universalBasicWindow = null; universalBasicWindow = new UniversalBasicWindow(); collector.ack(tuple); } if (tuple.getSourceStreamId().equals("data")) { streamId = tuple.getStringByField("streamId"); time = tuple.getStringByField("timestamp"); value = Float.parseFloat(tuple.getStringByField("value")); universalBasicWindow.pushStream(streamId, value); collector.ack(tuple); if (universalBasicWindow.isFull(task_id)) { //check if any stream of the window is full // System.out.println("Univ. Basic Window of bolt " + task_id + " is Filled Up"); collector.emit("bwFilled", new Values(task_id)); // universalBasicWindow.normalize(); // universalBasicWindow = new UniversalBasicWindow(); // TODO:: Add basic window to sliding window and clear } } // collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id")); } } Any Ideas??