Thanks you guys for the help.I will try





At 2016-05-18 07:17:08, "Mich Talebzadeh" <mich.talebza...@gmail.com> wrote:

Thanks Chris,


In a nutshell I don't think one can do that.


So let us see.  Here is my program that is looking for share prices > 95.9. It 
does work. It is pretty simple


import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.collection.mutable.ArrayBuffer
//
object CEP_AVG {
  def main(args: Array[String]) {
// Create a local StreamingContext with two working thread and batch interval 
of n seconds.
val sparkConf = new SparkConf().
             setAppName("CEP_AVG").
             setMaster("local[2]").
             set("spark.cores.max", "2").
             set("spark.streaming.concurrentJobs", "2").
             set("spark.driver.allowMultipleContexts", "true").
             set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(sparkConf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", 
"schema.registry.url" -> "http://rhes564:8081";, "zookeeper.connect" -> 
"rhes564:2181", "group.id" -> "CEP_AVG" )
val topics = Set("newtopic")


val DStream = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topics)
DStream.cache()
val lines = DStream.map(_._2)
val price = lines.map(_.split(',').view(2)).map(_.toDouble)

val windowLength = 4
val slidingInterval = 2
val countByValueAndWindow = price.filter(_ > 
95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
countByValueAndWindow.print()

//
//Now how I can get the distinct price values here?
//
//val countDistinctByValueAndWindow = price.filter(_ > 0.0).reduceByKey((t1, 
t2) -> t1).countByValueAndWindow(Seconds(windowLength), 
Seconds(slidingInterval))
//countDistinctByValueAndWindow.print()

ssc.start()
ssc.awaitTermination()
//ssc.stop()
  }
}


Ok What can be used here below


//val countDistinctByValueAndWindow = price.filter(_ > 0.0).reduceByKey((t1, 
t2) -> t1).countByValueAndWindow(Seconds(windowLength), 
Seconds(slidingInterval))
//countDistinctByValueAndWindow.print()


Let me know your thoughts?


Thanks







Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 



On 17 May 2016 at 23:47, Chris Fregly <ch...@fregly.com> wrote:

you can use HyperLogLog with Spark Streaming to accomplish this.


here is an example from my fluxcapacitor GitHub repo:


https://github.com/fluxcapacitor/pipeline/tree/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/rating/approx


here's an accompanying SlideShare presentation from one of my recent meetups 
(slides 70-83):


http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037



and a YouTube video for those that prefer video (starting at 32 mins into the 
video for your convenience):


https://youtu.be/wM9Z0PLx3cw?t=1922




On Tue, May 17, 2016 at 12:17 PM, Mich Talebzadeh <mich.talebza...@gmail.com> 
wrote:

Ok but how about something similar to


val countByValueAndWindow = price.filter(_ > 
95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))





Using a new count => countDistinctByValueAndWindow ?


val countDistinctByValueAndWindow = price.filter(_ > 
95.0).countDistinctByValueAndWindow(Seconds(windowLength), 
Seconds(slidingInterval))




HTH



Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 



On 17 May 2016 at 20:02, Michael Armbrust <mich...@databricks.com> wrote:

In 2.0 you won't be able to do this.  The long term vision would be to make 
this possible, but a window will be required (like the 24 hours you suggest).


On Tue, May 17, 2016 at 1:36 AM, Todd <bit1...@163.com> wrote:

Hi,
We have a requirement to do count(distinct) in a processing batch against all 
the streaming data(eg, last 24 hours' data),that is,when we do 
count(distinct),we actually want to compute distinct against last 24 hours' 
data.
Does structured streaming support this scenario?Thanks!









--

Chris Fregly
Research Scientist @ Flux Capacitor AI
"Bringing AI Back to the Future!"
San Francisco, CA
http://fluxcapacitor.com

Reply via email to