I think my problem persists whether I use Kafka or sockets. Or am I wrong?
How would you use Kafka here?

On Fri, Nov 20, 2015 at 7:12 PM, Christian <engr...@gmail.com> wrote:

> Have you considered using Kafka?
>
> On Fri, Nov 20, 2015 at 6:48 AM Saiph Kappa <saiph.ka...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a basic spark streaming application like this:
>>
>> «
>> ...
>>
>> val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
>> val rawStreams = (1 to numStreams).map(_ =>
>>   ssc.rawSocketStream[String](host, port, 
>> StorageLevel.MEMORY_ONLY_SER)).toArray
>> val union = ssc.union(rawStreams)
>>
>> union.flatMap(line => line.split(' ')).foreachRDD(rdd => {
>>
>>   // TODO
>>
>> }
>> ...
>> »
>>
>>
>> My question is: what is the best and fastest way to send the resulting rdds
>> as input to be consumed by another spark streaming application?
>>
>> I tried to add this code in place of the "TODO" comment:
>>
>> «
>> val serverSocket = new ServerSocket(9998)
>> while (true) {
>>   val socket = serverSocket.accept()
>>   @transient val out = new PrintWriter(socket.getOutputStream)
>>   try {
>>     rdd.foreach(out.write)
>>   } catch {
>>     case e: IOException =>
>>       socket.close()
>>   }
>> }
>> »
>>
>>
>> I also tried to create a thread in the driver application code to launch the
>> socket server and then share state (the PrintWriter object) between the 
>> driver program and tasks.
>> But got an exception saying that task is not serializable - PrintWriter is 
>> not serializable
>> (despite the @trasient annotation). I know this is not a very elegant 
>> solution, but what other
>> directions should I explore?
>>
>> Thanks.
>>
>>

Reply via email to