I have written a custom receiver for converting the tuples in the Dynamic Queue/EventGen to the Dstream.But i dont know why It is only processing data for some time (3-4 sec.) only and then shows Queue as Empty .ANy suggestions please ..>>
--code // public class JavaCustomReceiver extends Receiver<String> implements ISyntheticEventGen { EventGen eventGen; BlockingQueue<List<String>> eventQueue; String csvFileName; String outSpoutCSVLogFileName; double scalingFactor; public JavaCustomReceiver(String csvFileName, String outSpoutCSVLogFileName, double scalingFactor) { super(StorageLevel.MEMORY_AND_DISK()); this.csvFileName = csvFileName; this.outSpoutCSVLogFileName = outSpoutCSVLogFileName; this.scalingFactor = scalingFactor; this.eventGen = new EventGen(this,this.scalingFactor); this.eventGen.launch(this.csvFileName, this.outSpoutCSVLogFileName); //Launch threads this.eventQueue = new LinkedBlockingQueue<List<String>>(); System.out.println("for watching queue"); } public void onStart() { // Start the thread that receives data over a connection new Thread() { @Override public void run() { receive(); } }.start(); } public void onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */ private void receive() { try { // connect to the server // socket = new Socket(host, port); // BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); // Until stopped or connection broken continue reading while (!isStopped() ) { List<String> entry = this.eventQueue.take(); String str=""; for(String s:entry) str+=s+","; System.out.println("Received data '" + str + "'"); store(str); } // Restart in an attempt to connect again when server is active again restart("Trying to connect again"); } catch(Throwable t) { // restart if there is any other error restart("Error receiving data", t); } } @Override public StorageLevel storageLevel() { return StorageLevel.MEMORY_AND_DISK(); } @Override public void receive(List<String> event) { // TODO Auto-generated method stub //System.out.println("Called IN SPOUT### "); try { this.eventQueue.put(event); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } -- Thanks & Regards, Anshu Shukla