Could you print the "time" on the driver (that is, in foreachRDD but before
RDD.foreachPartition) and see if it is behaving weird?

TD

On Fri, Jun 26, 2015 at 3:57 PM, Emrehan Tüzün <emrehan.tu...@gmail.com>
wrote:

>
>
>
>
> On Fri, Jun 26, 2015 at 12:30 PM, Sea <261810...@qq.com> wrote:
>
>> Hi, all
>>
>> I find a problem in spark streaming, when I use the time in function 
>> foreachRDD...
>> I find the time is very interesting.
>>
>> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
>> StringDecoder](ssc, kafkaParams, topicsSet)
>>
>> dataStream.map(x => createGroup(x._2, 
>> dimensions)).groupByKey().foreachRDD((rdd, time) => {
>> try {
>> if (!rdd.partitions.isEmpty) {
>>       rdd.foreachPartition(partition => {
>> handlePartition(partition, timeType, time, dimensions, outputTopic, 
>> brokerList)
>>       })
>>     }
>>   } catch {
>> case e: Exception => e.printStackTrace()
>>   }
>> })
>>
>>
>> val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
>>
>>  var date = dateFormat.format(new Date(time.milliseconds))
>>
>>
>>  Then I insert the 'date' into Kafka , but I found .....
>>
>>
>> {"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
>>
>> {"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
>>
>> {"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
>>
>> {"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
>> {"timestamp":"0020-06-26T16:50:36
>> ","status":"7","type":"0","waittime":"0","count":1722}
>>
>> {"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
>>
>> {"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
>>
>> {"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
>>
>> {"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}
>>
>>
>

Reply via email to