Re: Using Apache Spark Streaming - how to handle changing data format within stream
Solution provided by Cody K : I may be misunderstanding, but you need to take each kafka message, and turn it into multiple items in the transformed rdd? so something like (pseudocode): stream.flatMap { message => val items = new ArrayBuffer var parser = null message.split("\n").foreach { line => if // it's a header parser = someParserBasedOn(line) else items += parser.parse(line) } items.iterator } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037p28054.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Using Apache Spark Streaming - how to handle changing data format within stream
I may be misunderstanding, but you need to take each kafka message, and turn it into multiple items in the transformed rdd? so something like (pseudocode): stream.flatMap { message => val items = new ArrayBuffer var parser = null message.split("\n").foreach { line => if // it's a header parser = someParserBasedOn(line) else items += parser.parse(line) } items.iterator } On Mon, Nov 7, 2016 at 4:22 PM, coolgar <karllbunn...@gmail.com> wrote: > I'm using apache spark streaming with the kafka direct consumer. The data > stream I'm receiving is log data that includes a header with each block of > messages. Each DStream can therefore have many blocks of messages, each with > it's own header. > > The header is used to know how to interpret the following fields in the > block of messages. My challenge is that I'm building up (K,V) pairs that are > processed by reduceByKey() and I use this header to know how to parse the > fields that follow the header into the (K,V) pairs. > > So each message received by kakfa may appear as follows (# denotes the > header field, \n denotes new line): > #fields field1 field2 field3\ndata1 data2 data3\n#fields field4 field5 > field6 field7\data4 data5 data6 data7\n... > > Is there a way, without collecting all data back to the driver, to "grab" > the header and use it to subsequently process the messages that follow the > header until a new #fields comes along, rinse, repeat? > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Using Apache Spark Streaming - how to handle changing data format within stream
I'm using apache spark streaming with the kafka direct consumer. The data stream I'm receiving is log data that includes a header with each block of messages. Each DStream can therefore have many blocks of messages, each with it's own header. The header is used to know how to interpret the following fields in the block of messages. My challenge is that I'm building up (K,V) pairs that are processed by reduceByKey() and I use this header to know how to parse the fields that follow the header into the (K,V) pairs. So each message received by kakfa may appear as follows (# denotes the header field, \n denotes new line): #fields field1 field2 field3\ndata1 data2 data3\n#fields field4 field5 field6 field7\data4 data5 data6 data7\n... Is there a way, without collecting all data back to the driver, to "grab" the header and use it to subsequently process the messages that follow the header until a new #fields comes along, rinse, repeat? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org