I think my problem persists whether I use Kafka or sockets. Or am I wrong? How would you use Kafka here?
On Fri, Nov 20, 2015 at 7:12 PM, Christian <engr...@gmail.com> wrote: > Have you considered using Kafka? > > On Fri, Nov 20, 2015 at 6:48 AM Saiph Kappa <saiph.ka...@gmail.com> wrote: > >> Hi, >> >> I have a basic spark streaming application like this: >> >> « >> ... >> >> val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) >> val rawStreams = (1 to numStreams).map(_ => >> ssc.rawSocketStream[String](host, port, >> StorageLevel.MEMORY_ONLY_SER)).toArray >> val union = ssc.union(rawStreams) >> >> union.flatMap(line => line.split(' ')).foreachRDD(rdd => { >> >> // TODO >> >> } >> ... >> » >> >> >> My question is: what is the best and fastest way to send the resulting rdds >> as input to be consumed by another spark streaming application? >> >> I tried to add this code in place of the "TODO" comment: >> >> « >> val serverSocket = new ServerSocket(9998) >> while (true) { >> val socket = serverSocket.accept() >> @transient val out = new PrintWriter(socket.getOutputStream) >> try { >> rdd.foreach(out.write) >> } catch { >> case e: IOException => >> socket.close() >> } >> } >> » >> >> >> I also tried to create a thread in the driver application code to launch the >> socket server and then share state (the PrintWriter object) between the >> driver program and tasks. >> But got an exception saying that task is not serializable - PrintWriter is >> not serializable >> (despite the @trasient annotation). I know this is not a very elegant >> solution, but what other >> directions should I explore? >> >> Thanks. >> >>