Hi, I have this code that filters out those prices that are over 99.8 within the the sliding window. The code works OK as shown below.
Now I need to work out min(price), max(price) and avg(price) in the sliding window. What I need is to have a counter and method of getting these values. Any way of doing this using filtering import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types._ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ import _root_.kafka.serializer.StringDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.streaming.kafka.KafkaUtils // object CEP_AVG { def main(args: Array[String]) { // Create a local StreamingContext with two working thread and batch interval of 10 seconds. val sparkConf = new SparkConf(). setAppName("CEP_AVG"). setMaster("local[12]"). set("spark.cores.max", "2"). set("spark.streaming.concurrentJobs", "2"). set("spark.driver.allowMultipleContexts", "true"). set("spark.hadoop.validateOutputSpecs", "false") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" -> "rhes564:2181", "group.id" -> "CEP_AVG" ) val topics = Set("newtopic") val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) dstream.cache() val lines = dstream.map(_._2) *val price = lines.map(_.split(',').view(2))* val windowLength = 4 val slidingInterval = 2 // keep this the same as batch window for continous streaming. You are aggregating data that you are collecting over the batch Window *val countByValueAndWindow = price.filter(_ > "99.8").countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))*countByValueAndWindow.print() // ssc.start() ssc.awaitTermination() //ssc.stop() } } Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com