Hi, I have Spark and Kafka streaming test for CEP signal.
Pretty basic set up val ssc = new StreamingContext(conf, Seconds(10)) ssc.checkpoint("checkpoint") val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) val topics = Set("newtopic") val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) dstream.cache() val lines = dstream.map(_._2) val words = lines.filter(_ > "90.0").flatMap(line => line.split(",")).map(word => (word, 3)).reduceByKey((x:Int, y:Int) => x+y) val windowLength = 10 val slidingInterval = 10 val countByValueAndWindow = words.countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval)) countByValueAndWindow.print() This is format of data streaming in only three columns ID, Date and Signal separated by comma 97,20160423-182633,93.19871243745806169848 So I want to pick up lines including Signal > "90.0" and discard the rest This is what I am getting from countByValueAndWindow.print() Time: 1461437490000 ms ------------------------------------------- ((98,3),1) ((40.80441152620633003508,3),1) ((60.71243694664215996759,3),1) ((95,3),1) ((57.23635208501673894915,3),1) ((20160423-193322,27),1) ((46.97871858618538352181,3),1) ((68.92024376045110883977,3),1) ((96,3),1) ((91,3),1) I am only interesdted where the long number > 90" but obviously my selection is incorrect. How can I filter the correct value. This code line seems to be incorrect val words = lines.filter(_ > "90.0").flatMap(line => line.split(",")).map(word => (word, 3)).reduceByKey((x:Int, y:Int) => x+y) Any ideas appreciated Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com