I am using a bolt to read data from a text file, and send them to a bolt. 
However i am loosing too many values, the Fail. 
I am newbie to storm and i don’t know where to look to debug this issue. Any 
ideas??

My Storm Spout code is:

package tuc.LSH.storm.spouts;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import tuc.LSH.conf.Consts;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;

/**
 * Created by mixtou on 15/5/15.
 */
public class FileReaderSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed;
    private TopologyContext context;
    private int spout_idx;
    private int spout_id;
    private Map config;
    private int noOfFailedWords;
    private int noOfAckedWords;


    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("data", new Fields("streamId", 
"timestamp", "value"));


    }

    @Override
    public void open(Map config, TopologyContext topologyContext, 
SpoutOutputCollector spoutOutputCollector) {
        this.context = topologyContext;
        this.spout_idx = context.getThisTaskIndex();
        this.spout_id = context.getThisTaskId();
        this.collector = spoutOutputCollector;
        this.config = config;
        this.completed = false;
        this.noOfFailedWords = 0;
        this.noOfAckedWords = 0;

        try {
            System.err.println("Reading File: " + 
config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
            this.fileReader = new 
FileReader(config.get(file_to_read()).toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file [" + 
config.get(file_to_read()) + "]");
        }

    }

    @Override
    public void nextTuple() {

        if (this.completed) {
            return;
        }

        try {

            String str;
            BufferedReader reader = new BufferedReader(fileReader);

            while ((str = reader.readLine()) != null) {

                if (str.isEmpty()) {
                    return;
                }
                String[] temp = str.split(",");
//                    System.err.println("============== "+temp[0] + " + " + 
temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value

                collector.emit("data", new Values(temp[0], temp[2], temp[3]), 
temp[0]); //emmit the correct data to next bolt with guarantee delivery

            }

        } catch (Exception e) {
            System.err.println("Error Reading Tuple: " + e);
            throw new RuntimeException("Error Reading Tuple ", e);
        } finally {
            System.err.println("Finished Reading File. Message From Spout: " + 
spout_idx);
            this.completed = true;

        }

        Utils.sleep(100);
    }

    private String file_to_read() {
//        this.spout_id = context.getThisTaskId();
        if (Consts.NO_OF_SPOUTS > 1) {
            int file_no = spout_idx % Consts.NO_OF_SPOUTS;
            return "data" + file_no;
        } else {
            return "data";
        }
    }

    @Override
    public void ack(Object msgId) {
        super.ack(msgId);
        noOfAckedWords++;
//        System.out.println("OK tuple acked from bolt: " + msgId+" no of acked 
word "+noOfAckedWords);
    }

    @Override
    public void fail(Object msgId) {
        super.fail(msgId);
        noOfFailedWords++;
        System.err.println("ERROR: " + context.getThisComponentId() + " " + 
msgId + " no of words failed " + noOfFailedWords);

    }
}

And my bolt looks like this:

package tuc.LSH.storm.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import sun.jvm.hotspot.runtime.*;
import tuc.LSH.core.hashfunctions.HashFunctionsGen;
import tuc.LSH.core.timeseries.UniversalBasicWindow;


import java.lang.Thread;
import java.util.*;

/**
 * Created by mixtou on 17/5/15.
 */
public class LSHBolt extends BaseRichBolt {
    private int task_id;
    private OutputCollector collector;
    private UniversalBasicWindow universalBasicWindow;

    private String streamId;
    private String time;
    private Float value;

    @Override
    public void prepare(Map conf, TopologyContext topologyContext, 
OutputCollector outputCollector) {
        this.task_id = topologyContext.getThisTaskIndex();
        this.collector = outputCollector;
        this.universalBasicWindow = new UniversalBasicWindow();
        streamId = null;
        time = null;
        value = 0f;
        System.err.println("New Bolt with id: " + task_id);

    }

    @Override
    public void execute(Tuple tuple) {

        if (tuple.getSourceStreamId().equals("sync")) {
            System.out.println("Bolt task id " + task_id + " received from " + 
tuple.getSourceComponent() + " message " + tuple.getString(0));
            System.out.println("Normalizing: Basic Window of Bolt " + task_id);
            universalBasicWindow.normalize(); //fill the rest of the streams 
with last received value to make them same size
            universalBasicWindow = null;
            universalBasicWindow = new UniversalBasicWindow();

            collector.ack(tuple);
        }

        if (tuple.getSourceStreamId().equals("data")) {

            streamId = tuple.getStringByField("streamId");
            time = tuple.getStringByField("timestamp");
            value = Float.parseFloat(tuple.getStringByField("value"));

            universalBasicWindow.pushStream(streamId, value);

            collector.ack(tuple);

            if (universalBasicWindow.isFull(task_id)) { //check if any stream 
of the window is full

//                System.out.println("Univ. Basic Window of bolt " + task_id + 
" is Filled Up");

                collector.emit("bwFilled", new Values(task_id));

//                universalBasicWindow.normalize();
//                universalBasicWindow = new UniversalBasicWindow();

//                TODO:: Add basic window to sliding window and clear

            }

        }


//        collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id"));

    }
}
Any Ideas??


Reply via email to