You can still do sliding windows with createDirectStream, just do your map / extraction of fields before the window.
On Fri, Apr 22, 2016 at 3:53 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi Cody, > > I want to use sliding windows for Complex Event Processing micro-batching > > Dr Mich Talebzadeh > > > > LinkedIn > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > > > http://talebzadehmich.wordpress.com > > > > > On 22 April 2016 at 21:51, Cody Koeninger <c...@koeninger.org> wrote: >> >> Why are you wanting to convert? >> >> As far as doing the conversion, createStream doesn't take the same >> arguments, look at the docs. >> >> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh >> <mich.talebza...@gmail.com> wrote: >> > Hi, >> > >> > What is the best way of converting this program of that uses >> > KafkaUtils.createDirectStream to Sliding window using >> > >> > val dstream = KafkaUtils.createDirectStream[String, String, >> > StringDecoder, >> > StringDecoder](ssc, kafkaParams, topic) >> > >> > to >> > >> > val dstream = KafkaUtils.createStream[String, String, StringDecoder, >> > StringDecoder](ssc, kafkaParams, topic) >> > >> > >> > The program below works >> > >> > >> > import org.apache.spark.SparkContext >> > import org.apache.spark.SparkConf >> > import org.apache.spark.sql.Row >> > import org.apache.spark.sql.hive.HiveContext >> > import org.apache.spark.sql.types._ >> > import org.apache.spark.sql.SQLContext >> > import org.apache.spark.sql.functions._ >> > import _root_.kafka.serializer.StringDecoder >> > import org.apache.spark.streaming._ >> > import org.apache.spark.streaming.kafka.KafkaUtils >> > // >> > object CEP_assembly { >> > def main(args: Array[String]) { >> > val conf = new SparkConf(). >> > setAppName("CEP_assembly"). >> > setMaster("local[2]"). >> > set("spark.driver.allowMultipleContexts", "true"). >> > set("spark.hadoop.validateOutputSpecs", "false") >> > val sc = new SparkContext(conf) >> > // Create sqlContext based on HiveContext >> > val sqlContext = new HiveContext(sc) >> > import sqlContext.implicits._ >> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >> > println ("\nStarted at"); sqlContext.sql("SELECT >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >> > ").collect.foreach(println) >> > val ssc = new StreamingContext(conf, Seconds(1)) >> > ssc.checkpoint("checkpoint") >> > val kafkaParams = Map[String, String]("bootstrap.servers" -> >> > "rhes564:9092", >> > "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" -> >> > "rhes564:2181", "group.id" -> "StreamTest" ) >> > val topic = Set("newtopic") >> > //val dstream = KafkaUtils.createStream[String, String, StringDecoder, >> > StringDecoder](ssc, kafkaParams, topic) >> > val dstream = KafkaUtils.createDirectStream[String, String, >> > StringDecoder, >> > StringDecoder](ssc, kafkaParams, topic) >> > dstream.cache() >> > //val windowed_dstream = dstream.window(new >> > Duration(sliding_window_length), >> > new Duration(sliding_window_interval)) >> > dstream.print(1000) >> > val lines = dstream.map(_._2) >> > // Check for message >> > val showResults = lines.filter(_.contains("Sending >> > dstream")).flatMap(line >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >> > _).print(1000) >> > // Check for statement cache >> > val showResults2 = lines.filter(_.contains("statement >> > cache")).flatMap(line >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >> > _).print(1000) >> > ssc.start() >> > ssc.awaitTermination() >> > //ssc.stop() >> > println ("\nFinished at"); sqlContext.sql("SELECT >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >> > ").collect.foreach(println) >> > } >> > } >> > >> > Thanks >> > >> > >> > Dr Mich Talebzadeh >> > >> > >> > >> > LinkedIn >> > >> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> > >> > >> > >> > http://talebzadehmich.wordpress.com >> > >> > > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org