[ 
https://issues.apache.org/jira/browse/SPARK-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14386540#comment-14386540
 ] 

SaintBacchus commented on SPARK-6605:
-------------------------------------

Hi [~srowen], my test code is this :
{code:title=test.scala|borderStyle=solid}
        val words = ssc.socketTextStream(sIp , sPort).flatMap(_.split(" 
")).map(x => (x , 1))
        val resultWindow3 = words.reduceByKeyAndWindow((a:Int,b:Int) => (a + 
b), Seconds(winDur), Seconds(slideDur) );
        val resultWindow4 = words.reduceByKeyAndWindow(_ + _, _ - _, 
Seconds(winDur), Seconds(slideDur) );
{code}
This *resultWindow3* is implemented by
{code:title=PairDStreamFunctions.scala|borderStyle=solid}
    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
    self.reduceByKey(cleanedReduceFunc, partitioner)
        .window(windowDuration, slideDuration)
        .reduceByKey(cleanedReduceFunc, partitioner)
{code}
And *resultWindow4* is implemented by
{code:title=PairDStreamFunctions.scala|borderStyle=solid}
    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
    val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
    val cleanedFilterFunc = if (filterFunc != null) 
Some(ssc.sc.clean(filterFunc)) else None
    new ReducedWindowedDStream[K, V](
      self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
      windowDuration, slideDuration, partitioner
    )
{code}
The result of this test code is:
{quote}
===== resultWindow3 is:
===== resultWindow4 is:
(hello,0)
(world,0)
{quote}
*resultWindow3* is empty but *resultWindow4* has two elements whose keys were 
received before.


> Same transformation in DStream leads to different result
> --------------------------------------------------------
>
>                 Key: SPARK-6605
>                 URL: https://issues.apache.org/jira/browse/SPARK-6605
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.0
>            Reporter: SaintBacchus
>             Fix For: 1.4.0
>
>
> The transformation *reduceByKeyAndWindow* has two implementations: one use 
> the *WindowDstream* and the other use *ReducedWindowedDStream*.
> But the result always is the same, except when an empty windows occurs.
> As a wordcount example, if a period of time (larger than window time) has no 
> data coming, the first *reduceByKeyAndWindow*  has no elem inside but the 
> second has many elem with the zero value inside.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to