Thanks Chris, In a nutshell I don't think one can do that.
So let us see. Here is my program that is looking for share prices > 95.9. It does work. It is pretty simple import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types._ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ import _root_.kafka.serializer.StringDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.streaming.kafka.KafkaUtils import scala.collection.mutable.ArrayBuffer // object CEP_AVG { def main(args: Array[String]) { // Create a local StreamingContext with two working thread and batch interval of n seconds. val sparkConf = new SparkConf(). setAppName("CEP_AVG"). setMaster("local[2]"). set("spark.cores.max", "2"). set("spark.streaming.concurrentJobs", "2"). set("spark.driver.allowMultipleContexts", "true"). set("spark.hadoop.validateOutputSpecs", "false") val sc = new SparkContext(sparkConf) // Create sqlContext based on HiveContext val sqlContext = new HiveContext(sc) val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" -> "rhes564:2181", "group.id" -> "CEP_AVG" ) val topics = Set("newtopic") val DStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) DStream.cache() val lines = DStream.map(_._2) val price = lines.map(_.split(',').view(2)).map(_.toDouble) val windowLength = 4 val slidingInterval = 2 *val countByValueAndWindow = price.filter(_ > 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))countByValueAndWindow.print()* // //Now how I can get the distinct price values here? // //val countDistinctByValueAndWindow = price.filter(_ > 0.0).reduceByKey((t1, t2) -> t1).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval)) //countDistinctByValueAndWindow.print() ssc.start() ssc.awaitTermination() //ssc.stop() } } Ok What can be used here below //val countDistinctByValueAndWindow = price.filter(_ > 0.0).reduceByKey((t1, t2) -> t1).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval)) //countDistinctByValueAndWindow.print() Let me know your thoughts? Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 17 May 2016 at 23:47, Chris Fregly <ch...@fregly.com> wrote: > you can use HyperLogLog with Spark Streaming to accomplish this. > > here is an example from my fluxcapacitor GitHub repo: > > > https://github.com/fluxcapacitor/pipeline/tree/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/rating/approx > > here's an accompanying SlideShare presentation from one of my recent > meetups (slides 70-83): > > > http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037 > > > <http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037> > and a YouTube video for those that prefer video (starting at 32 mins into > the video for your convenience): > > https://youtu.be/wM9Z0PLx3cw?t=1922 > > > On Tue, May 17, 2016 at 12:17 PM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> Ok but how about something similar to >> >> val countByValueAndWindow = price.filter(_ > >> 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval)) >> >> >> Using a new count => c*ountDistinctByValueAndWindow ?* >> >> val countDistinctByValueAndWindow = price.filter(_ > >> 95.0).countDistinctByValueAndWindow(Seconds(windowLength), >> Seconds(slidingInterval)) >> >> >> HTH >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 17 May 2016 at 20:02, Michael Armbrust <mich...@databricks.com> wrote: >> >>> In 2.0 you won't be able to do this. The long term vision would be to >>> make this possible, but a window will be required (like the 24 hours you >>> suggest). >>> >>> On Tue, May 17, 2016 at 1:36 AM, Todd <bit1...@163.com> wrote: >>> >>>> Hi, >>>> We have a requirement to do count(distinct) in a processing batch >>>> against all the streaming data(eg, last 24 hours' data),that is,when we do >>>> count(distinct),we actually want to compute distinct against last 24 hours' >>>> data. >>>> Does structured streaming support this scenario?Thanks! >>>> >>> >>> >> > > -- > *Chris Fregly* > Research Scientist @ Flux Capacitor AI > "Bringing AI Back to the Future!" > San Francisco, CA > http://fluxcapacitor.com >