Thanks you guys for the help.I will try
At 2016-05-18 07:17:08, "Mich Talebzadeh" <mich.talebza...@gmail.com> wrote: Thanks Chris, In a nutshell I don't think one can do that. So let us see. Here is my program that is looking for share prices > 95.9. It does work. It is pretty simple import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types._ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ import _root_.kafka.serializer.StringDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.streaming.kafka.KafkaUtils import scala.collection.mutable.ArrayBuffer // object CEP_AVG { def main(args: Array[String]) { // Create a local StreamingContext with two working thread and batch interval of n seconds. val sparkConf = new SparkConf(). setAppName("CEP_AVG"). setMaster("local[2]"). set("spark.cores.max", "2"). set("spark.streaming.concurrentJobs", "2"). set("spark.driver.allowMultipleContexts", "true"). set("spark.hadoop.validateOutputSpecs", "false") val sc = new SparkContext(sparkConf) // Create sqlContext based on HiveContext val sqlContext = new HiveContext(sc) val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" -> "rhes564:2181", "group.id" -> "CEP_AVG" ) val topics = Set("newtopic") val DStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) DStream.cache() val lines = DStream.map(_._2) val price = lines.map(_.split(',').view(2)).map(_.toDouble) val windowLength = 4 val slidingInterval = 2 val countByValueAndWindow = price.filter(_ > 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval)) countByValueAndWindow.print() // //Now how I can get the distinct price values here? // //val countDistinctByValueAndWindow = price.filter(_ > 0.0).reduceByKey((t1, t2) -> t1).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval)) //countDistinctByValueAndWindow.print() ssc.start() ssc.awaitTermination() //ssc.stop() } } Ok What can be used here below //val countDistinctByValueAndWindow = price.filter(_ > 0.0).reduceByKey((t1, t2) -> t1).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval)) //countDistinctByValueAndWindow.print() Let me know your thoughts? Thanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 17 May 2016 at 23:47, Chris Fregly <ch...@fregly.com> wrote: you can use HyperLogLog with Spark Streaming to accomplish this. here is an example from my fluxcapacitor GitHub repo: https://github.com/fluxcapacitor/pipeline/tree/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/rating/approx here's an accompanying SlideShare presentation from one of my recent meetups (slides 70-83): http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037 and a YouTube video for those that prefer video (starting at 32 mins into the video for your convenience): https://youtu.be/wM9Z0PLx3cw?t=1922 On Tue, May 17, 2016 at 12:17 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: Ok but how about something similar to val countByValueAndWindow = price.filter(_ > 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval)) Using a new count => countDistinctByValueAndWindow ? val countDistinctByValueAndWindow = price.filter(_ > 95.0).countDistinctByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval)) HTH Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 17 May 2016 at 20:02, Michael Armbrust <mich...@databricks.com> wrote: In 2.0 you won't be able to do this. The long term vision would be to make this possible, but a window will be required (like the 24 hours you suggest). On Tue, May 17, 2016 at 1:36 AM, Todd <bit1...@163.com> wrote: Hi, We have a requirement to do count(distinct) in a processing batch against all the streaming data(eg, last 24 hours' data),that is,when we do count(distinct),we actually want to compute distinct against last 24 hours' data. Does structured streaming support this scenario?Thanks! -- Chris Fregly Research Scientist @ Flux Capacitor AI "Bringing AI Back to the Future!" San Francisco, CA http://fluxcapacitor.com