This seems to be a solution.

Just recall this is streaming prices coming in  consisting of three fields
as below:

ID           timestamp             Signal
5145,    20160424-000321,    99.54898291795853400767


// I am only interested in the third field in the comma separated fields
which I call Signal  and when it is > 98.0

So I modified the code as follows

val lines = dstream.map(_._2)
// Interested in Signal field, third in the list
val words = lines.map(_.split(',').view(2))
val windowLength = 10
val slidingInterval = 10
val countByValueAndWindow = words.filter(_ >
"98.0").countByValueAndWindow(Seconds(windowLength),
Seconds(slidingInterval))
countByValueAndWindow.print()


Time: 1461453480000 ms
-------------------------------------------
(99.96499387285531164956,2)
(98.09179374443103739485,3)
(98.43271813782244629781,1)
(99.02524930541705028542,3)
(99.40824915468534696789,2)
(98.71676655968832937025,1)
(98.34124261976762310917,1)
(98.10708435174291734263,1)
(99.90995946350894201227,1)
(99.76973488989534244967,1)

Anyway I was not aware that one can pickup a particular field in the RDD by
using view(n) where n =0, 1, 2 etc the position of the field

Let me know if this approach is sound.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 23 April 2016 at 19:37, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
> I have Spark and Kafka streaming test for CEP signal.
>
> Pretty basic set up
>
> val ssc = new StreamingContext(conf, Seconds(10))
> ssc.checkpoint("checkpoint")
> val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
> val topics = Set("newtopic")
> val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topics)
> dstream.cache()
> val lines = dstream.map(_._2)
> val words = lines.filter(_ > "90.0").flatMap(line =>
> line.split(",")).map(word => (word, 3)).reduceByKey((x:Int, y:Int) => x+y)
> val windowLength = 10
> val slidingInterval = 10
> val countByValueAndWindow =
> words.countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
> countByValueAndWindow.print()
>
>
> This is format of data streaming in only three columns ID, Date and Signal
> separated by comma
>
> 97,20160423-182633,93.19871243745806169848
>
> So I want to pick up lines including Signal > "90.0" and discard the rest
>
> This is what I am getting from countByValueAndWindow.print()
>
> Time: 1461437490000 ms
> -------------------------------------------
> ((98,3),1)
> ((40.80441152620633003508,3),1)
> ((60.71243694664215996759,3),1)
> ((95,3),1)
> ((57.23635208501673894915,3),1)
> ((20160423-193322,27),1)
> ((46.97871858618538352181,3),1)
> ((68.92024376045110883977,3),1)
> ((96,3),1)
> ((91,3),1)
>
> I am only interesdted where the long number > 90" but obviously my
> selection is incorrect. How can I filter the correct value. This code line
> seems to be incorrect
>
> val words = lines.filter(_ > "90.0").flatMap(line =>
> line.split(",")).map(word => (word, 3)).reduceByKey((x:Int, y:Int) => x+y)
>
> Any ideas appreciated
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>

Reply via email to