Hi Cody,

I want to use sliding windows for Complex Event Processing micro-batching

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 22 April 2016 at 21:51, Cody Koeninger <c...@koeninger.org> wrote:

> Why are you wanting to convert?
>
> As far as doing the conversion, createStream doesn't take the same
> arguments, look at the docs.
>
> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh
> <mich.talebza...@gmail.com> wrote:
> > Hi,
> >
> > What is the best way of converting this program of that uses
> > KafkaUtils.createDirectStream to Sliding window using
> >
> > val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder,
> > StringDecoder](ssc, kafkaParams, topic)
> >
> > to
> >
> > val dstream = KafkaUtils.createStream[String, String, StringDecoder,
> > StringDecoder](ssc, kafkaParams, topic)
> >
> >
> > The program below works
> >
> >
> > import org.apache.spark.SparkContext
> > import org.apache.spark.SparkConf
> > import org.apache.spark.sql.Row
> > import org.apache.spark.sql.hive.HiveContext
> > import org.apache.spark.sql.types._
> > import org.apache.spark.sql.SQLContext
> > import org.apache.spark.sql.functions._
> > import _root_.kafka.serializer.StringDecoder
> > import org.apache.spark.streaming._
> > import org.apache.spark.streaming.kafka.KafkaUtils
> > //
> > object CEP_assembly {
> >   def main(args: Array[String]) {
> >   val conf = new SparkConf().
> >                setAppName("CEP_assembly").
> >                setMaster("local[2]").
> >                set("spark.driver.allowMultipleContexts", "true").
> >                set("spark.hadoop.validateOutputSpecs", "false")
> >   val sc = new SparkContext(conf)
> >   // Create sqlContext based on HiveContext
> >   val sqlContext = new HiveContext(sc)
> >   import sqlContext.implicits._
> >   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> >   println ("\nStarted at"); sqlContext.sql("SELECT
> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> > ").collect.foreach(println)
> > val ssc = new StreamingContext(conf, Seconds(1))
> > ssc.checkpoint("checkpoint")
> > val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "rhes564:9092",
> > "schema.registry.url" -> "http://rhes564:8081";, "zookeeper.connect" ->
> > "rhes564:2181", "group.id" -> "StreamTest" )
> > val topic = Set("newtopic")
> > //val dstream = KafkaUtils.createStream[String, String, StringDecoder,
> > StringDecoder](ssc, kafkaParams, topic)
> > val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder,
> > StringDecoder](ssc, kafkaParams, topic)
> > dstream.cache()
> > //val windowed_dstream = dstream.window(new
> Duration(sliding_window_length),
> > new Duration(sliding_window_interval))
> > dstream.print(1000)
> > val lines = dstream.map(_._2)
> > // Check for message
> > val showResults = lines.filter(_.contains("Sending
> dstream")).flatMap(line
> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
> _).print(1000)
> > // Check for statement cache
> > val showResults2 = lines.filter(_.contains("statement
> cache")).flatMap(line
> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
> _).print(1000)
> > ssc.start()
> > ssc.awaitTermination()
> > //ssc.stop()
> >   println ("\nFinished at"); sqlContext.sql("SELECT
> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> > ").collect.foreach(println)
> >   }
> > }
> >
> > Thanks
> >
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
>

Reply via email to