Hi,

I have a basic spark streaming application like this:

«
...

val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
val rawStreams = (1 to numStreams).map(_ =>
  ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER)).toArray
val union = ssc.union(rawStreams)

union.flatMap(line => line.split(' ')).foreachRDD(rdd => {

  // TODO

}
...
»


My question is: what is the best and fastest way to send the resulting rdds
as input to be consumed by another spark streaming application?

I tried to add this code in place of the "TODO" comment:

«
val serverSocket = new ServerSocket(9998)
while (true) {
  val socket = serverSocket.accept()
  @transient val out = new PrintWriter(socket.getOutputStream)
  try {
    rdd.foreach(out.write)
  } catch {
    case e: IOException =>
      socket.close()
  }
}
»


I also tried to create a thread in the driver application code to launch the
socket server and then share state (the PrintWriter object) between
the driver program and tasks.
But got an exception saying that task is not serializable -
PrintWriter is not serializable
(despite the @trasient annotation). I know this is not a very elegant
solution, but what other
directions should I explore?

Thanks.

Reply via email to