Hi,

I have this code that filters out those prices that are over 99.8 within
the the sliding window. The code works OK  as shown below.

Now I need to work out min(price), max(price) and avg(price) in the sliding
window. What I need is to have a counter and method of getting these values.

Any way of doing this using filtering


import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
//
object CEP_AVG {
  def main(args: Array[String]) {
// Create a local StreamingContext with two working thread and batch
interval of 10 seconds.
val sparkConf = new SparkConf().
             setAppName("CEP_AVG").
             setMaster("local[12]").
             set("spark.cores.max", "2").
             set("spark.streaming.concurrentJobs", "2").
             set("spark.driver.allowMultipleContexts", "true").
             set("spark.hadoop.validateOutputSpecs", "false")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "CEP_AVG" )
val topics = Set("newtopic")
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topics)
dstream.cache()
val lines = dstream.map(_._2)
*val price = lines.map(_.split(',').view(2))*
val windowLength = 4
val slidingInterval = 2  // keep this the same as batch window for
continous streaming. You are aggregating data that you are collecting over
the  batch Window

*val countByValueAndWindow = price.filter(_ >
"99.8").countByValueAndWindow(Seconds(windowLength),
Seconds(slidingInterval))*countByValueAndWindow.print()
//
ssc.start()
ssc.awaitTermination()
//ssc.stop()
  }
}

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com

Reply via email to