Thanks Chris,

In a nutshell I don't think one can do that.

So let us see.  Here is my program that is looking for share prices > 95.9.
It does work. It is pretty simple

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.collection.mutable.ArrayBuffer
//
object CEP_AVG {
  def main(args: Array[String]) {
// Create a local StreamingContext with two working thread and batch
interval of n seconds.
val sparkConf = new SparkConf().
             setAppName("CEP_AVG").
             setMaster("local[2]").
             set("spark.cores.max", "2").
             set("spark.streaming.concurrentJobs", "2").
             set("spark.driver.allowMultipleContexts", "true").
             set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(sparkConf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "CEP_AVG" )
val topics = Set("newtopic")

val DStream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topics)
DStream.cache()
val lines = DStream.map(_._2)
val price = lines.map(_.split(',').view(2)).map(_.toDouble)

val windowLength = 4
val slidingInterval = 2


*val countByValueAndWindow = price.filter(_ >
95.0).countByValueAndWindow(Seconds(windowLength),
Seconds(slidingInterval))countByValueAndWindow.print()*
//
//Now how I can get the distinct price values here?
//
//val countDistinctByValueAndWindow = price.filter(_ >
0.0).reduceByKey((t1, t2) ->
t1).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
//countDistinctByValueAndWindow.print()

ssc.start()
ssc.awaitTermination()
//ssc.stop()
  }
}
Ok What can be used here below

//val countDistinctByValueAndWindow = price.filter(_ >
0.0).reduceByKey((t1, t2) ->
t1).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
//countDistinctByValueAndWindow.print()

Let me know your thoughts?

Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 17 May 2016 at 23:47, Chris Fregly <ch...@fregly.com> wrote:

> you can use HyperLogLog with Spark Streaming to accomplish this.
>
> here is an example from my fluxcapacitor GitHub repo:
>
>
> https://github.com/fluxcapacitor/pipeline/tree/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/rating/approx
>
> here's an accompanying SlideShare presentation from one of my recent
> meetups (slides 70-83):
>
>
> http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037
>
>
> <http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037>
> and a YouTube video for those that prefer video (starting at 32 mins into
> the video for your convenience):
>
> https://youtu.be/wM9Z0PLx3cw?t=1922
>
>
> On Tue, May 17, 2016 at 12:17 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Ok but how about something similar to
>>
>> val countByValueAndWindow = price.filter(_ >
>> 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
>>
>>
>> Using a new count => c*ountDistinctByValueAndWindow ?*
>>
>> val countDistinctByValueAndWindow = price.filter(_ >
>> 95.0).countDistinctByValueAndWindow(Seconds(windowLength),
>> Seconds(slidingInterval))
>>
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 17 May 2016 at 20:02, Michael Armbrust <mich...@databricks.com> wrote:
>>
>>> In 2.0 you won't be able to do this.  The long term vision would be to
>>> make this possible, but a window will be required (like the 24 hours you
>>> suggest).
>>>
>>> On Tue, May 17, 2016 at 1:36 AM, Todd <bit1...@163.com> wrote:
>>>
>>>> Hi,
>>>> We have a requirement to do count(distinct) in a processing batch
>>>> against all the streaming data(eg, last 24 hours' data),that is,when we do
>>>> count(distinct),we actually want to compute distinct against last 24 hours'
>>>> data.
>>>> Does structured streaming support this scenario?Thanks!
>>>>
>>>
>>>
>>
>
> --
> *Chris Fregly*
> Research Scientist @ Flux Capacitor AI
> "Bringing AI Back to the Future!"
> San Francisco, CA
> http://fluxcapacitor.com
>

Reply via email to