Instead of sending the results of the one spark app directly to the other
one, you could write the results to a Kafka topic which is consumed by your
other spark application.

On Fri, Nov 20, 2015 at 12:07 PM Saiph Kappa <saiph.ka...@gmail.com> wrote:

> I think my problem persists whether I use Kafka or sockets. Or am I wrong?
> How would you use Kafka here?
>
> On Fri, Nov 20, 2015 at 7:12 PM, Christian <engr...@gmail.com> wrote:
>
>> Have you considered using Kafka?
>>
>> On Fri, Nov 20, 2015 at 6:48 AM Saiph Kappa <saiph.ka...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a basic spark streaming application like this:
>>>
>>> «
>>> ...
>>>
>>> val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
>>> val rawStreams = (1 to numStreams).map(_ =>
>>>   ssc.rawSocketStream[String](host, port, 
>>> StorageLevel.MEMORY_ONLY_SER)).toArray
>>> val union = ssc.union(rawStreams)
>>>
>>> union.flatMap(line => line.split(' ')).foreachRDD(rdd => {
>>>
>>>   // TODO
>>>
>>> }
>>> ...
>>> »
>>>
>>>
>>> My question is: what is the best and fastest way to send the resulting rdds
>>> as input to be consumed by another spark streaming application?
>>>
>>> I tried to add this code in place of the "TODO" comment:
>>>
>>> «
>>> val serverSocket = new ServerSocket(9998)
>>> while (true) {
>>>   val socket = serverSocket.accept()
>>>   @transient val out = new PrintWriter(socket.getOutputStream)
>>>   try {
>>>     rdd.foreach(out.write)
>>>   } catch {
>>>     case e: IOException =>
>>>       socket.close()
>>>   }
>>> }
>>> »
>>>
>>>
>>> I also tried to create a thread in the driver application code to launch the
>>> socket server and then share state (the PrintWriter object) between the 
>>> driver program and tasks.
>>> But got an exception saying that task is not serializable - PrintWriter is 
>>> not serializable
>>> (despite the @trasient annotation). I know this is not a very elegant 
>>> solution, but what other
>>> directions should I explore?
>>>
>>> Thanks.
>>>
>>>
>

Reply via email to