Instead of sending the results of the one spark app directly to the other one, you could write the results to a Kafka topic which is consumed by your other spark application.
On Fri, Nov 20, 2015 at 12:07 PM Saiph Kappa <saiph.ka...@gmail.com> wrote: > I think my problem persists whether I use Kafka or sockets. Or am I wrong? > How would you use Kafka here? > > On Fri, Nov 20, 2015 at 7:12 PM, Christian <engr...@gmail.com> wrote: > >> Have you considered using Kafka? >> >> On Fri, Nov 20, 2015 at 6:48 AM Saiph Kappa <saiph.ka...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I have a basic spark streaming application like this: >>> >>> « >>> ... >>> >>> val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) >>> val rawStreams = (1 to numStreams).map(_ => >>> ssc.rawSocketStream[String](host, port, >>> StorageLevel.MEMORY_ONLY_SER)).toArray >>> val union = ssc.union(rawStreams) >>> >>> union.flatMap(line => line.split(' ')).foreachRDD(rdd => { >>> >>> // TODO >>> >>> } >>> ... >>> » >>> >>> >>> My question is: what is the best and fastest way to send the resulting rdds >>> as input to be consumed by another spark streaming application? >>> >>> I tried to add this code in place of the "TODO" comment: >>> >>> « >>> val serverSocket = new ServerSocket(9998) >>> while (true) { >>> val socket = serverSocket.accept() >>> @transient val out = new PrintWriter(socket.getOutputStream) >>> try { >>> rdd.foreach(out.write) >>> } catch { >>> case e: IOException => >>> socket.close() >>> } >>> } >>> » >>> >>> >>> I also tried to create a thread in the driver application code to launch the >>> socket server and then share state (the PrintWriter object) between the >>> driver program and tasks. >>> But got an exception saying that task is not serializable - PrintWriter is >>> not serializable >>> (despite the @trasient annotation). I know this is not a very elegant >>> solution, but what other >>> directions should I explore? >>> >>> Thanks. >>> >>> >