Re: Spark Streaming - stream between 2 applications
Instead of sending the results of the one spark app directly to the other one, you could write the results to a Kafka topic which is consumed by your other spark application. On Fri, Nov 20, 2015 at 12:07 PM Saiph Kappa wrote: > I think my problem persists whether I use Kafka or sockets. Or am I wrong? > How would you use Kafka here? > > On Fri, Nov 20, 2015 at 7:12 PM, Christian wrote: > >> Have you considered using Kafka? >> >> On Fri, Nov 20, 2015 at 6:48 AM Saiph Kappa >> wrote: >> >>> Hi, >>> >>> I have a basic spark streaming application like this: >>> >>> « >>> ... >>> >>> val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) >>> val rawStreams = (1 to numStreams).map(_ => >>> ssc.rawSocketStream[String](host, port, >>> StorageLevel.MEMORY_ONLY_SER)).toArray >>> val union = ssc.union(rawStreams) >>> >>> union.flatMap(line => line.split(' ')).foreachRDD(rdd => { >>> >>> // TODO >>> >>> } >>> ... >>> » >>> >>> >>> My question is: what is the best and fastest way to send the resulting rdds >>> as input to be consumed by another spark streaming application? >>> >>> I tried to add this code in place of the "TODO" comment: >>> >>> « >>> val serverSocket = new ServerSocket(9998) >>> while (true) { >>> val socket = serverSocket.accept() >>> @transient val out = new PrintWriter(socket.getOutputStream) >>> try { >>> rdd.foreach(out.write) >>> } catch { >>> case e: IOException => >>> socket.close() >>> } >>> } >>> » >>> >>> >>> I also tried to create a thread in the driver application code to launch the >>> socket server and then share state (the PrintWriter object) between the >>> driver program and tasks. >>> But got an exception saying that task is not serializable - PrintWriter is >>> not serializable >>> (despite the @trasient annotation). I know this is not a very elegant >>> solution, but what other >>> directions should I explore? >>> >>> Thanks. >>> >>> >
Re: Spark Streaming - stream between 2 applications
You're confused about which parts of your code are running on the driver vs the executor, which is why you're getting serialization errors. Read http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd On Fri, Nov 20, 2015 at 1:07 PM, Saiph Kappa wrote: > I think my problem persists whether I use Kafka or sockets. Or am I wrong? > How would you use Kafka here? > > On Fri, Nov 20, 2015 at 7:12 PM, Christian wrote: > >> Have you considered using Kafka? >> >> On Fri, Nov 20, 2015 at 6:48 AM Saiph Kappa >> wrote: >> >>> Hi, >>> >>> I have a basic spark streaming application like this: >>> >>> « >>> ... >>> >>> val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) >>> val rawStreams = (1 to numStreams).map(_ => >>> ssc.rawSocketStream[String](host, port, >>> StorageLevel.MEMORY_ONLY_SER)).toArray >>> val union = ssc.union(rawStreams) >>> >>> union.flatMap(line => line.split(' ')).foreachRDD(rdd => { >>> >>> // TODO >>> >>> } >>> ... >>> » >>> >>> >>> My question is: what is the best and fastest way to send the resulting rdds >>> as input to be consumed by another spark streaming application? >>> >>> I tried to add this code in place of the "TODO" comment: >>> >>> « >>> val serverSocket = new ServerSocket(9998) >>> while (true) { >>> val socket = serverSocket.accept() >>> @transient val out = new PrintWriter(socket.getOutputStream) >>> try { >>> rdd.foreach(out.write) >>> } catch { >>> case e: IOException => >>> socket.close() >>> } >>> } >>> » >>> >>> >>> I also tried to create a thread in the driver application code to launch the >>> socket server and then share state (the PrintWriter object) between the >>> driver program and tasks. >>> But got an exception saying that task is not serializable - PrintWriter is >>> not serializable >>> (despite the @trasient annotation). I know this is not a very elegant >>> solution, but what other >>> directions should I explore? >>> >>> Thanks. >>> >>> >
Re: Spark Streaming - stream between 2 applications
I think my problem persists whether I use Kafka or sockets. Or am I wrong? How would you use Kafka here? On Fri, Nov 20, 2015 at 7:12 PM, Christian wrote: > Have you considered using Kafka? > > On Fri, Nov 20, 2015 at 6:48 AM Saiph Kappa wrote: > >> Hi, >> >> I have a basic spark streaming application like this: >> >> « >> ... >> >> val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) >> val rawStreams = (1 to numStreams).map(_ => >> ssc.rawSocketStream[String](host, port, >> StorageLevel.MEMORY_ONLY_SER)).toArray >> val union = ssc.union(rawStreams) >> >> union.flatMap(line => line.split(' ')).foreachRDD(rdd => { >> >> // TODO >> >> } >> ... >> » >> >> >> My question is: what is the best and fastest way to send the resulting rdds >> as input to be consumed by another spark streaming application? >> >> I tried to add this code in place of the "TODO" comment: >> >> « >> val serverSocket = new ServerSocket(9998) >> while (true) { >> val socket = serverSocket.accept() >> @transient val out = new PrintWriter(socket.getOutputStream) >> try { >> rdd.foreach(out.write) >> } catch { >> case e: IOException => >> socket.close() >> } >> } >> » >> >> >> I also tried to create a thread in the driver application code to launch the >> socket server and then share state (the PrintWriter object) between the >> driver program and tasks. >> But got an exception saying that task is not serializable - PrintWriter is >> not serializable >> (despite the @trasient annotation). I know this is not a very elegant >> solution, but what other >> directions should I explore? >> >> Thanks. >> >>
Re: Spark Streaming - stream between 2 applications
Have you considered using Kafka? On Fri, Nov 20, 2015 at 6:48 AM Saiph Kappa wrote: > Hi, > > I have a basic spark streaming application like this: > > « > ... > > val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) > val rawStreams = (1 to numStreams).map(_ => > ssc.rawSocketStream[String](host, port, > StorageLevel.MEMORY_ONLY_SER)).toArray > val union = ssc.union(rawStreams) > > union.flatMap(line => line.split(' ')).foreachRDD(rdd => { > > // TODO > > } > ... > » > > > My question is: what is the best and fastest way to send the resulting rdds > as input to be consumed by another spark streaming application? > > I tried to add this code in place of the "TODO" comment: > > « > val serverSocket = new ServerSocket(9998) > while (true) { > val socket = serverSocket.accept() > @transient val out = new PrintWriter(socket.getOutputStream) > try { > rdd.foreach(out.write) > } catch { > case e: IOException => > socket.close() > } > } > » > > > I also tried to create a thread in the driver application code to launch the > socket server and then share state (the PrintWriter object) between the > driver program and tasks. > But got an exception saying that task is not serializable - PrintWriter is > not serializable > (despite the @trasient annotation). I know this is not a very elegant > solution, but what other > directions should I explore? > > Thanks. > >
Spark Streaming - stream between 2 applications
Hi, I have a basic spark streaming application like this: « ... val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) val rawStreams = (1 to numStreams).map(_ => ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER)).toArray val union = ssc.union(rawStreams) union.flatMap(line => line.split(' ')).foreachRDD(rdd => { // TODO } ... » My question is: what is the best and fastest way to send the resulting rdds as input to be consumed by another spark streaming application? I tried to add this code in place of the "TODO" comment: « val serverSocket = new ServerSocket(9998) while (true) { val socket = serverSocket.accept() @transient val out = new PrintWriter(socket.getOutputStream) try { rdd.foreach(out.write) } catch { case e: IOException => socket.close() } } » I also tried to create a thread in the driver application code to launch the socket server and then share state (the PrintWriter object) between the driver program and tasks. But got an exception saying that task is not serializable - PrintWriter is not serializable (despite the @trasient annotation). I know this is not a very elegant solution, but what other directions should I explore? Thanks.