I don't see a store() call in your receive().

Search for store() in here http://spark.apache.org/
docs/latest/streaming-custom-receivers.html

On Wed, Nov 2, 2016 at 10:23 AM, Cassa L <lcas...@gmail.com> wrote:

> Hi,
> I am using spark 1.6. I wrote a custom receiver to read from WebSocket.
> But when I start my spark job, it  connects to the WebSocket but  doesn't
> get any message. Same code, if I write as separate scala class, it works
> and prints messages from WebSocket. Is anything missing in my Spark Code?
> There are no errors in spark console.
>
> Here is my receiver -
>
> import org.apache.spark.Logging
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.streaming.receiver.Receiver
> import org.jfarcand.wcs.{MessageListener, TextListener, WebSocket}
>
> /**
>   * Custom receiver for WebSocket
>   */
> class WebSocketReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) 
> with Runnable with Logging {
>
>   private var webSocket: WebSocket = _
>
>   @transient
>   private var thread: Thread = _
>
>   override def onStart(): Unit = {
>     thread = new Thread(this)
>     thread.start()
>   }
>
>   override def onStop(): Unit = {
>     setWebSocket(null)
>     thread.interrupt()
>   }
>
>   override def run(): Unit = {
>     println("Received ----")
>     receive()
>   }
>
>   private def receive(): Unit = {
>
>
>     val connection = WebSocket().open("ws://localhost:3001")
>     println("WebSocket  Connected ..." )
>     println("Connected ------- " + connection)
>     setWebSocket(connection)
>
>    connection.listener(new TextListener {
>
>          override def onMessage(message: String) {
>                  System.out.println("Message in Spark client is --> " + 
> message)
>            }
>     })
>
>
> }
>
> private def setWebSocket(newWebSocket: WebSocket) = synchronized {
> if (webSocket != null) {
> webSocket.shutDown
> }
> webSocket = newWebSocket
> }
>
> }
>
>
> =====
>
> Here is code for Spark job
>
>
> object WebSocketTestApp {
>
>   def main(args: Array[String]) {
>     val conf = new SparkConf()
>       .setAppName("Test Web Socket")
>       .setMaster("local[20]")
>       .set("test", "")
>     val ssc = new StreamingContext(conf, Seconds(5))
>
>
>     val stream: ReceiverInputDStream[String] = ssc.receiverStream(new 
> WebSocketReceiver())
>     stream.print()
>
>     ssc.start()
>     ssc.awaitTermination()
>   }
>
>
> ==============
> }
>
>
> Thanks,
>
> LCassa
>
>

Reply via email to