I don't see a store() call in your receive(). Search for store() in here http://spark.apache.org/ docs/latest/streaming-custom-receivers.html
On Wed, Nov 2, 2016 at 10:23 AM, Cassa L <lcas...@gmail.com> wrote: > Hi, > I am using spark 1.6. I wrote a custom receiver to read from WebSocket. > But when I start my spark job, it connects to the WebSocket but doesn't > get any message. Same code, if I write as separate scala class, it works > and prints messages from WebSocket. Is anything missing in my Spark Code? > There are no errors in spark console. > > Here is my receiver - > > import org.apache.spark.Logging > import org.apache.spark.storage.StorageLevel > import org.apache.spark.streaming.receiver.Receiver > import org.jfarcand.wcs.{MessageListener, TextListener, WebSocket} > > /** > * Custom receiver for WebSocket > */ > class WebSocketReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) > with Runnable with Logging { > > private var webSocket: WebSocket = _ > > @transient > private var thread: Thread = _ > > override def onStart(): Unit = { > thread = new Thread(this) > thread.start() > } > > override def onStop(): Unit = { > setWebSocket(null) > thread.interrupt() > } > > override def run(): Unit = { > println("Received ----") > receive() > } > > private def receive(): Unit = { > > > val connection = WebSocket().open("ws://localhost:3001") > println("WebSocket Connected ..." ) > println("Connected ------- " + connection) > setWebSocket(connection) > > connection.listener(new TextListener { > > override def onMessage(message: String) { > System.out.println("Message in Spark client is --> " + > message) > } > }) > > > } > > private def setWebSocket(newWebSocket: WebSocket) = synchronized { > if (webSocket != null) { > webSocket.shutDown > } > webSocket = newWebSocket > } > > } > > > ===== > > Here is code for Spark job > > > object WebSocketTestApp { > > def main(args: Array[String]) { > val conf = new SparkConf() > .setAppName("Test Web Socket") > .setMaster("local[20]") > .set("test", "") > val ssc = new StreamingContext(conf, Seconds(5)) > > > val stream: ReceiverInputDStream[String] = ssc.receiverStream(new > WebSocketReceiver()) > stream.print() > > ssc.start() > ssc.awaitTermination() > } > > > ============== > } > > > Thanks, > > LCassa > >