i'm sorry I have some error in my code, update here:

var count = -1L // a global variable in the main object 

val currentBatch = some_DStream 
val countDStream = currentBatch.map(o=>{ 
      count = 0L  // reset the count variable in each batch 
      o 
    }) 
countDStream.foreachRDD(rdd=> count += rdd.count())

if (count > 0) {
  currentBatch.map(...).someOtherTransformation
}

two problems:
1. the variable count just go on accumulate and no reset in each batch
2. if(count > 0) only evaluate in the beginning of running the program, so
the next statement will never run

Can you all give me some suggestion? thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13781.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to