Hi, all
I find a problem in spark streaming, when I use the time in function
foreachRDD... I find the time is very interesting.
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topicsSet)
dataStream.map(x => createGroup(x._2,
dimensions)).groupByKey().foreachRDD((rdd, time) => {
try {
if (!rdd.partitions.isEmpty) {
rdd.foreachPartition(partition => {
handlePartition(partition, timeType, time, dimensions, outputTopic,
brokerList)
})
}
} catch {
case e: Exception => e.printStackTrace()
}
})
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
var date = dateFormat.format(new Date(time.milliseconds))
Then I insert the 'date' into Kafka , but I found .....
{"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
{"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
{"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
{"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
{"timestamp":"0020-06-26T16:50:36","status":"7","type":"0","waittime":"0","count":1722}
{"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
{"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
{"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
{"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}