HI all,
I have code like below:
Logger.getLogger("org.apache.spark").setLevel( Level.ERROR)
// Logger.getLogger("org.apache.spark.streaming.dstream").setLevel(
Level.DEBUG)
val conf = new SparkConf().setAppName("testDstream").setMaster("local[4]")
// val sc = SparkContext.getOrCreate( conf)
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint( "E:\\spark\\tmp\\cp")
val lines = ssc.socketTextStream("127.0.0.1", 9999)
lines.foreachRDD( r=>{
println("RDD" + r.id + "begin" + " " + new SimpleDateFormat("yyyy-mm-dd
HH:MM:SS").format( new Date()))
r.foreach( ele => println(":::" + ele))
println("RDD" + r.id + "end")
})
lines.countByValueAndWindow( Seconds(4), Seconds(1)).foreachRDD( s => {
// here is key code
println( "countByValueAndWindow RDD ID IS : " + s.id + "begin")
println("time is " + new SimpleDateFormat("yyyy-mm-dd HH:MM:SS").format(
new Date()))
s.foreach( e => println("data is " + e._1 + " :" + e._2))
println("countByValueAndWindow RDD ID IS : " + s.id + "end")
})
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
I run the code and use "nc" send the message manually. The speed I input
message is about one letter per seconds.I know the time in log does not equal
the window duration, but I think they are very near.the output and my comment
is :-----------------------------------------------------------RDD1begin
2017-41-27 22:06:16 RDD1end countByValueAndWindow RDD ID IS : 7 begin time is
2017-41-27 22:06:16 countByValueAndWindow RDD ID IS : 7 end RDD8begin
2017-41-27 22:06:17 RDD8end countByValueAndWindow RDD ID IS : 13 begin time
is 2017-41-27 22:06:17 countByValueAndWindow RDD ID IS : 13 end RDD14begin
2017-41-27 22:06:18 :::1 RDD14end countByValueAndWindow RDD ID IS : 19 begin
time is 2017-41-27 22:06:18 <== data from 22:06:15 -- 22:06:18 is in RDD 14.
data is 1 :1 countByValueAndWindow RDD ID IS : 19 end RDD20begin 2017-41-27
22:06:19 :::2 RDD20end countByValueAndWindow RDD ID IS : 25 begin time is
2017-41-27 22:06:19 <== data from 22:06:16 -- 22:06:19 is in RDD 14 ,20. data
is 1 :1 data is 2 :1 countByValueAndWindow RDD ID IS : 25 end RDD26begin
2017-41-27 22:06:20 :::3 RDD26end countByValueAndWindow RDD ID IS : 31 begin
time is 2017-41-27 22:06:20 <== data from 22:06:17 -- 22:06:20 is in RDD 14 ,
20, 26 data is 2 :1 data is 1 :1 data is 3 :1 countByValueAndWindow RDD ID IS :
31 end RDD32begin 2017-41-27 22:06:21 :::4 RDD32end countByValueAndWindow
RDD ID IS : 37 begin time is 2017-41-27 22:06:21 <== data from 22:06:18 --
22:06:21 is in RDD 14 , 20, 26, 32 data is 2 :1 data is 1 :1 data is 4 :1 data
is 3 :1 countByValueAndWindow RDD ID IS : 37 end RDD38begin 2017-41-27
22:06:22:::5:::6 RDD38end countByValueAndWindow RDD ID IS : 43 begin time is
2017-41-27 22:06:22<== data from 22:06:19 -- 22:06:22 is in RDD 20, 26,
32,38. Here 14 is out of window. data is 4 :1 data is 5 :1 data is 6 :1 data is
2 :1 data is 3 :1 countByValueAndWindow RDD ID IS : 43 end RDD44begin
2017-41-27 22:06:23 :::7 RDD44end countByValueAndWindow RDD ID IS : 49 begin
time is 2017-41-27 22:06:23 <== data from 22:06:29 -- 22:06:23 is in RDD
26, 32,38, 44. Here 20is out of window. data is 5 :1 data is 4 :1 data is 6 :1
data is 7 :1 data is 3 :1 countByValueAndWindow RDD ID IS : 49
end-----------------------------------------------------------I think the
foreachRDD function outputs the last RDD calculated by countByValueAndWindow,
and the above log validate my idea.Now, I change the red code
tolines.countByValueAndWindow( Seconds(4), Seconds(6)).foreachRDD( s => {
// here is key code the slide duration is 6 seconds. The log and my comment is
below:-----------------------------------------------------------DD1begin
2017-59-27 10:59:12 RDD1end RDD2begin 2017-59-27 10:59:13 :::1 :::2 RDD2end
RDD3begin 2017-59-27 10:59:14 :::3 RDD3end RDD4begin 2017-59-27 10:59:15
:::4 RDD4end RDD5begin 2017-59-27 10:59:16 :::5 RDD5end RDD6begin
2017-59-27 10:59:17 RDD6end countByValueAndWindow RDD ID IS : 22 begin time
is 2017-59-27 10:59:17 <== I think here is OK, event RDD2 is calculated. data
is 4 :1 data is 5 :1 data is 1 :1 data is 2 :1 data is 3 :1
countByValueAndWindow RDD ID IS : 22 end RDD23begin 2017-59-27 10:59:18
:::6 RDD23end RDD24begin 2017-59-27 10:59:19 :::8 :::7 RDD24end RDD25begin
2017-59-27 10:59:20 :::9 RDD25end RDD26begin 2017-59-27 10:59:21 :::0
RDD26end RDD27begin 2017-59-27 10:59:22 :::- RDD27end RDD28begin
2017-59-27 10:59:23 :::p RDD28end countByValueAndWindow RDD ID IS : 43 begin
time is 2017-59-27 10:59:23 <==the data between 10:59:20 --10:59:23 should be
RDD 25, 26, 27, 28. but the data is wrong. data is 6 :1 data is 2 :1 data is 9
:1 data is - :1 data is 1 :1 data is 8 :1 data is p :1 data is 0 :1 data is 7
:1 countByValueAndWindow RDD ID IS : 43 end RDD44begin 2017-59-27 10:59:24
:::o RDD44end RDD46begin 2017-59-27 10:59:25 :::i RDD46end RDD47begin
2017-59-27 10:59:26 :::u RDD47end RDD48begin 2017-59-27 10:59:27 :::y
RDD48end RDD49begin 2017-59-27 10:59:28 :::t RDD49end RDD50begin
2017-59-27 10:59:29 :::r RDD50end countByValueAndWindow RDD ID IS : 65 begin
time is 2017-59-27 10:59:29<==here is wrong too. data is 6 :1 data is 2 :1
data is r :1 data is 8 :1 data is t :1 data is i :1 data is y :1 data is u :1
data is 1 :1 data is 7 :1 data is o :1 countByValueAndWindow RDD ID IS : 65
end-----------------------------------------------------------
Would you like tell me why the log of second time is not same with my
understanding please?This issue besets me several days.ThanksFei Shao