Ilias Karalis created SPARK-27895: ------------------------------------- Summary: Spark streaming - RDD filter is always refreshing providing updated filtered items Key: SPARK-27895 URL: https://issues.apache.org/jira/browse/SPARK-27895 Project: Spark Issue Type: Bug Components: DStreams Affects Versions: 2.4.3, 2.4.2, 2.4.0 Environment: Intellij, running local in windows10 laptop.
Reporter: Ilias Karalis Spark streaming: 2.4.x Scala: 2.11.11 foreachRDD of DStream, in case filter is used on RDD then filter is always refreshing, providing new results continuously until new batch is processed. For the new batch, the same occurs. With the same code, if we do rdd.collect() and then run the filter on the collection, we get just one time results, which remains stable until new batch is coming in. Filter function is based on random probability (reservoir sampling). {color:#000080}val {color}toSampleRDD: RDD[(Long, Long)] = inputRdd.filter(x=> chooseX(x) ) {color:#000080}def {color}chooseX (x:(Long, Long)) : Boolean = { {color:#808080} {color} {color:#000080}val {color}r = scala.util.Random {color:#000080}val {color}p = r.nextFloat() edgeTotalCounter += {color:#0000ff}1 {color} {color:#808080} {color} {color:#000080}if {color}(p < (sampleLength.toFloat / edgeTotalCounter.toFloat)) { edgeLocalRDDCounter += {color:#0000ff}1 {color} println({color:#008000}"Edge " {color}+x + {color:#008000}" has been selected and is number : " {color}+ edgeLocalRDDCounter +{color:#008000}"."{color}) {color:#000080}true {color} } {color:#000080}else {color}{color:#000080} false {color}} edgeLocalRDDCounter counts selected edges from inputRDD. Strange is that the counter is increased 1st time from 1 to y, then filter continues to run unexpectedly again and the counter is increased again starting from y+1 to z. After that each time filter unexpectedly continues to run, it provides results for which the counter starts from y+1. Each time filter runs provides different results and filters different number of edges. toSampleRDD always changes accordingly to new provided results. When new batch is coming in then it starts the same behavior for the new batch. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org