i'm sorry I have some error in my code, update here: var count = -1L // a global variable in the main object
val currentBatch = some_DStream val countDStream = currentBatch.map(o=>{ count = 0L // reset the count variable in each batch o }) countDStream.foreachRDD(rdd=> count += rdd.count()) if (count > 0) { currentBatch.map(...).someOtherTransformation } two problems: 1. the variable count just go on accumulate and no reset in each batch 2. if(count > 0) only evaluate in the beginning of running the program, so the next statement will never run Can you all give me some suggestion? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13781.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org