This seems to be a solution. Just recall this is streaming prices coming in consisting of three fields as below:
ID timestamp Signal 5145, 20160424-000321, 99.54898291795853400767 // I am only interested in the third field in the comma separated fields which I call Signal and when it is > 98.0 So I modified the code as follows val lines = dstream.map(_._2) // Interested in Signal field, third in the list val words = lines.map(_.split(',').view(2)) val windowLength = 10 val slidingInterval = 10 val countByValueAndWindow = words.filter(_ > "98.0").countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval)) countByValueAndWindow.print() Time: 1461453480000 ms ------------------------------------------- (99.96499387285531164956,2) (98.09179374443103739485,3) (98.43271813782244629781,1) (99.02524930541705028542,3) (99.40824915468534696789,2) (98.71676655968832937025,1) (98.34124261976762310917,1) (98.10708435174291734263,1) (99.90995946350894201227,1) (99.76973488989534244967,1) Anyway I was not aware that one can pickup a particular field in the RDD by using view(n) where n =0, 1, 2 etc the position of the field Let me know if this approach is sound. Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 23 April 2016 at 19:37, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi, > > I have Spark and Kafka streaming test for CEP signal. > > Pretty basic set up > > val ssc = new StreamingContext(conf, Seconds(10)) > ssc.checkpoint("checkpoint") > val kafkaParams = Map[String, String]("bootstrap.servers" -> > "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", > "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) > val topics = Set("newtopic") > val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, topics) > dstream.cache() > val lines = dstream.map(_._2) > val words = lines.filter(_ > "90.0").flatMap(line => > line.split(",")).map(word => (word, 3)).reduceByKey((x:Int, y:Int) => x+y) > val windowLength = 10 > val slidingInterval = 10 > val countByValueAndWindow = > words.countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval)) > countByValueAndWindow.print() > > > This is format of data streaming in only three columns ID, Date and Signal > separated by comma > > 97,20160423-182633,93.19871243745806169848 > > So I want to pick up lines including Signal > "90.0" and discard the rest > > This is what I am getting from countByValueAndWindow.print() > > Time: 1461437490000 ms > ------------------------------------------- > ((98,3),1) > ((40.80441152620633003508,3),1) > ((60.71243694664215996759,3),1) > ((95,3),1) > ((57.23635208501673894915,3),1) > ((20160423-193322,27),1) > ((46.97871858618538352181,3),1) > ((68.92024376045110883977,3),1) > ((96,3),1) > ((91,3),1) > > I am only interesdted where the long number > 90" but obviously my > selection is incorrect. How can I filter the correct value. This code line > seems to be incorrect > > val words = lines.filter(_ > "90.0").flatMap(line => > line.split(",")).map(word => (word, 3)).reduceByKey((x:Int, y:Int) => x+y) > > Any ideas appreciated > > > > > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > >