Are you sharing the SimpleDateFormat instance? This looks a lot more like
the non-thread-safe behaviour of SimpleDateFormat (that has claimed many
unsuspecting victims over the years), than any 'ugly' Spark Streaming. Try
writing the timestamps in millis to Kafka and compare.

-kr, Gerard.

On Fri, Jun 26, 2015 at 11:06 AM, Sea <261810...@qq.com> wrote:

> Hi, all
>
> I find a problem in spark streaming, when I use the time in function 
> foreachRDD...
> I find the time is very interesting.
>
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, topicsSet)
>
> dataStream.map(x => createGroup(x._2, 
> dimensions)).groupByKey().foreachRDD((rdd, time) => {
>   try {
>     if (!rdd.partitions.isEmpty) {
>       rdd.foreachPartition(partition => {
>         handlePartition(partition, timeType, time, dimensions, outputTopic, 
> brokerList)
>       })
>     }
>   } catch {
>     case e: Exception => e.printStackTrace()
>   }
> })
>
>
> val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
>
> var date = dateFormat.format(new Date(time.milliseconds))
>
>
> Then I insert the 'date' into Kafka , but I found .....
>
>
> {"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
>
> {"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
>
> {"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
>
> {"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
> {"timestamp":"0020-06-26T16:50:36
> ","status":"7","type":"0","waittime":"0","count":1722}
>
> {"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
>
> {"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
>
> {"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
>
> {"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}
>
>

Reply via email to