Hi,

I have Spark and Kafka streaming test for CEP signal.

Pretty basic set up

val ssc = new StreamingContext(conf, Seconds(10))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
val topics = Set("newtopic")
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topics)
dstream.cache()
val lines = dstream.map(_._2)
val words = lines.filter(_ > "90.0").flatMap(line =>
line.split(",")).map(word => (word, 3)).reduceByKey((x:Int, y:Int) => x+y)
val windowLength = 10
val slidingInterval = 10
val countByValueAndWindow =
words.countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
countByValueAndWindow.print()


This is format of data streaming in only three columns ID, Date and Signal
separated by comma

97,20160423-182633,93.19871243745806169848

So I want to pick up lines including Signal > "90.0" and discard the rest

This is what I am getting from countByValueAndWindow.print()

Time: 1461437490000 ms
-------------------------------------------
((98,3),1)
((40.80441152620633003508,3),1)
((60.71243694664215996759,3),1)
((95,3),1)
((57.23635208501673894915,3),1)
((20160423-193322,27),1)
((46.97871858618538352181,3),1)
((68.92024376045110883977,3),1)
((96,3),1)
((91,3),1)

I am only interesdted where the long number > 90" but obviously my
selection is incorrect. How can I filter the correct value. This code line
seems to be incorrect

val words = lines.filter(_ > "90.0").flatMap(line =>
line.split(",")).map(word => (word, 3)).reduceByKey((x:Int, y:Int) => x+y)

Any ideas appreciated







Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com

Reply via email to