[ https://issues.apache.org/jira/browse/SPARK-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14386540#comment-14386540 ]
SaintBacchus commented on SPARK-6605: ------------------------------------- Hi [~srowen], my test code is this : {code:title=test.scala|borderStyle=solid} val words = ssc.socketTextStream(sIp , sPort).flatMap(_.split(" ")).map(x => (x , 1)) val resultWindow3 = words.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(winDur), Seconds(slideDur) ); val resultWindow4 = words.reduceByKeyAndWindow(_ + _, _ - _, Seconds(winDur), Seconds(slideDur) ); {code} This *resultWindow3* is implemented by {code:title=PairDStreamFunctions.scala|borderStyle=solid} val cleanedReduceFunc = ssc.sc.clean(reduceFunc) self.reduceByKey(cleanedReduceFunc, partitioner) .window(windowDuration, slideDuration) .reduceByKey(cleanedReduceFunc, partitioner) {code} And *resultWindow4* is implemented by {code:title=PairDStreamFunctions.scala|borderStyle=solid} val cleanedReduceFunc = ssc.sc.clean(reduceFunc) val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None new ReducedWindowedDStream[K, V]( self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc, windowDuration, slideDuration, partitioner ) {code} The result of this test code is: {quote} ===== resultWindow3 is: ===== resultWindow4 is: (hello,0) (world,0) {quote} *resultWindow3* is empty but *resultWindow4* has two elements whose keys were received before. > Same transformation in DStream leads to different result > -------------------------------------------------------- > > Key: SPARK-6605 > URL: https://issues.apache.org/jira/browse/SPARK-6605 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.3.0 > Reporter: SaintBacchus > Fix For: 1.4.0 > > > The transformation *reduceByKeyAndWindow* has two implementations: one use > the *WindowDstream* and the other use *ReducedWindowedDStream*. > But the result always is the same, except when an empty windows occurs. > As a wordcount example, if a period of time (larger than window time) has no > data coming, the first *reduceByKeyAndWindow* has no elem inside but the > second has many elem with the zero value inside. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org