Solution provided by Cody K :

I may be misunderstanding, but you need to take each kafka message,
and turn it into multiple items in the transformed rdd?

so something like (pseudocode):

stream.flatMap { message =>
  val items = new ArrayBuffer
 var parser = null
  message.split("\n").foreach { line =>
     if  // it's a header
        parser = someParserBasedOn(line)
    else
       items += parser.parse(line)
 }
 items.iterator
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037p28054.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to