Ilias Karalis created SPARK-27895:
-------------------------------------

             Summary: Spark streaming - RDD filter is always refreshing 
providing updated filtered items
                 Key: SPARK-27895
                 URL: https://issues.apache.org/jira/browse/SPARK-27895
             Project: Spark
          Issue Type: Bug
          Components: DStreams
    Affects Versions: 2.4.3, 2.4.2, 2.4.0
         Environment: Intellij, running local in windows10 laptop.

 
            Reporter: Ilias Karalis


Spark streaming: 2.4.x

Scala: 2.11.11

 

foreachRDD of DStream,

in case filter is used on RDD then filter is always refreshing, providing new 
results continuously until new batch is processed. For the new batch, the same 
occurs.

With the same code, if we do rdd.collect() and then run the filter on the 
collection, we get just one time results, which remains stable until new batch 
is coming in.

Filter function is based on random probability (reservoir sampling).

 

{color:#000080}val {color}toSampleRDD: RDD[(Long, Long)] = inputRdd.filter(x=> 
chooseX(x) )

 

{color:#000080}def {color}chooseX (x:(Long, Long)) : Boolean = {
{color:#808080}
{color} {color:#000080}val {color}r = scala.util.Random
 {color:#000080}val {color}p = r.nextFloat()
 edgeTotalCounter += {color:#0000ff}1
{color} {color:#808080}
{color} {color:#000080}if {color}(p < (sampleLength.toFloat / 
edgeTotalCounter.toFloat)) {
 edgeLocalRDDCounter += {color:#0000ff}1
{color} println({color:#008000}"Edge " {color}+x + {color:#008000}" has been 
selected and is number : " {color}+ edgeLocalRDDCounter 
+{color:#008000}"."{color})
 {color:#000080}true
{color} }
 {color:#000080}else
{color}{color:#000080} false
{color}}

 

edgeLocalRDDCounter counts selected edges from inputRDD.

Strange is that the counter is increased 1st time from 1 to y, then filter 
continues to run unexpectedly again and the counter is increased again starting 
from y+1 to z. After that each time filter unexpectedly continues to run, it 
provides results for which the counter starts from y+1. Each time filter runs 
provides different results and filters different number of edges.

toSampleRDD always changes accordingly to new provided results.

When new batch is coming in then it starts the same behavior for the new batch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to