Hi Cody, I want to use sliding windows for Complex Event Processing micro-batching
Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 22 April 2016 at 21:51, Cody Koeninger <c...@koeninger.org> wrote: > Why are you wanting to convert? > > As far as doing the conversion, createStream doesn't take the same > arguments, look at the docs. > > On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh > <mich.talebza...@gmail.com> wrote: > > Hi, > > > > What is the best way of converting this program of that uses > > KafkaUtils.createDirectStream to Sliding window using > > > > val dstream = KafkaUtils.createDirectStream[String, String, > StringDecoder, > > StringDecoder](ssc, kafkaParams, topic) > > > > to > > > > val dstream = KafkaUtils.createStream[String, String, StringDecoder, > > StringDecoder](ssc, kafkaParams, topic) > > > > > > The program below works > > > > > > import org.apache.spark.SparkContext > > import org.apache.spark.SparkConf > > import org.apache.spark.sql.Row > > import org.apache.spark.sql.hive.HiveContext > > import org.apache.spark.sql.types._ > > import org.apache.spark.sql.SQLContext > > import org.apache.spark.sql.functions._ > > import _root_.kafka.serializer.StringDecoder > > import org.apache.spark.streaming._ > > import org.apache.spark.streaming.kafka.KafkaUtils > > // > > object CEP_assembly { > > def main(args: Array[String]) { > > val conf = new SparkConf(). > > setAppName("CEP_assembly"). > > setMaster("local[2]"). > > set("spark.driver.allowMultipleContexts", "true"). > > set("spark.hadoop.validateOutputSpecs", "false") > > val sc = new SparkContext(conf) > > // Create sqlContext based on HiveContext > > val sqlContext = new HiveContext(sc) > > import sqlContext.implicits._ > > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > > println ("\nStarted at"); sqlContext.sql("SELECT > > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > > ").collect.foreach(println) > > val ssc = new StreamingContext(conf, Seconds(1)) > > ssc.checkpoint("checkpoint") > > val kafkaParams = Map[String, String]("bootstrap.servers" -> > "rhes564:9092", > > "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" -> > > "rhes564:2181", "group.id" -> "StreamTest" ) > > val topic = Set("newtopic") > > //val dstream = KafkaUtils.createStream[String, String, StringDecoder, > > StringDecoder](ssc, kafkaParams, topic) > > val dstream = KafkaUtils.createDirectStream[String, String, > StringDecoder, > > StringDecoder](ssc, kafkaParams, topic) > > dstream.cache() > > //val windowed_dstream = dstream.window(new > Duration(sliding_window_length), > > new Duration(sliding_window_interval)) > > dstream.print(1000) > > val lines = dstream.map(_._2) > > // Check for message > > val showResults = lines.filter(_.contains("Sending > dstream")).flatMap(line > > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + > _).print(1000) > > // Check for statement cache > > val showResults2 = lines.filter(_.contains("statement > cache")).flatMap(line > > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + > _).print(1000) > > ssc.start() > > ssc.awaitTermination() > > //ssc.stop() > > println ("\nFinished at"); sqlContext.sql("SELECT > > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > > ").collect.foreach(println) > > } > > } > > > > Thanks > > > > > > Dr Mich Talebzadeh > > > > > > > > LinkedIn > > > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > > > > > > > http://talebzadehmich.wordpress.com > > > > >