yep actually using createDirectStream sounds a better way of doing it. Am I correct that createDirectStream was introduced to overcome micro-batching limitations?
In a nutshell I want to pickup all the messages and keep signal according to pre-built criteria (say indicating a* buy signal*) and ignore the pedestals Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 22 April 2016 at 21:56, Cody Koeninger <c...@koeninger.org> wrote: > You can still do sliding windows with createDirectStream, just do your > map / extraction of fields before the window. > > On Fri, Apr 22, 2016 at 3:53 PM, Mich Talebzadeh > <mich.talebza...@gmail.com> wrote: > > Hi Cody, > > > > I want to use sliding windows for Complex Event Processing micro-batching > > > > Dr Mich Talebzadeh > > > > > > > > LinkedIn > > > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > > > > > > > http://talebzadehmich.wordpress.com > > > > > > > > > > On 22 April 2016 at 21:51, Cody Koeninger <c...@koeninger.org> wrote: > >> > >> Why are you wanting to convert? > >> > >> As far as doing the conversion, createStream doesn't take the same > >> arguments, look at the docs. > >> > >> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh > >> <mich.talebza...@gmail.com> wrote: > >> > Hi, > >> > > >> > What is the best way of converting this program of that uses > >> > KafkaUtils.createDirectStream to Sliding window using > >> > > >> > val dstream = KafkaUtils.createDirectStream[String, String, > >> > StringDecoder, > >> > StringDecoder](ssc, kafkaParams, topic) > >> > > >> > to > >> > > >> > val dstream = KafkaUtils.createStream[String, String, StringDecoder, > >> > StringDecoder](ssc, kafkaParams, topic) > >> > > >> > > >> > The program below works > >> > > >> > > >> > import org.apache.spark.SparkContext > >> > import org.apache.spark.SparkConf > >> > import org.apache.spark.sql.Row > >> > import org.apache.spark.sql.hive.HiveContext > >> > import org.apache.spark.sql.types._ > >> > import org.apache.spark.sql.SQLContext > >> > import org.apache.spark.sql.functions._ > >> > import _root_.kafka.serializer.StringDecoder > >> > import org.apache.spark.streaming._ > >> > import org.apache.spark.streaming.kafka.KafkaUtils > >> > // > >> > object CEP_assembly { > >> > def main(args: Array[String]) { > >> > val conf = new SparkConf(). > >> > setAppName("CEP_assembly"). > >> > setMaster("local[2]"). > >> > set("spark.driver.allowMultipleContexts", "true"). > >> > set("spark.hadoop.validateOutputSpecs", "false") > >> > val sc = new SparkContext(conf) > >> > // Create sqlContext based on HiveContext > >> > val sqlContext = new HiveContext(sc) > >> > import sqlContext.implicits._ > >> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > >> > println ("\nStarted at"); sqlContext.sql("SELECT > >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > >> > ").collect.foreach(println) > >> > val ssc = new StreamingContext(conf, Seconds(1)) > >> > ssc.checkpoint("checkpoint") > >> > val kafkaParams = Map[String, String]("bootstrap.servers" -> > >> > "rhes564:9092", > >> > "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" > -> > >> > "rhes564:2181", "group.id" -> "StreamTest" ) > >> > val topic = Set("newtopic") > >> > //val dstream = KafkaUtils.createStream[String, String, StringDecoder, > >> > StringDecoder](ssc, kafkaParams, topic) > >> > val dstream = KafkaUtils.createDirectStream[String, String, > >> > StringDecoder, > >> > StringDecoder](ssc, kafkaParams, topic) > >> > dstream.cache() > >> > //val windowed_dstream = dstream.window(new > >> > Duration(sliding_window_length), > >> > new Duration(sliding_window_interval)) > >> > dstream.print(1000) > >> > val lines = dstream.map(_._2) > >> > // Check for message > >> > val showResults = lines.filter(_.contains("Sending > >> > dstream")).flatMap(line > >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + > >> > _).print(1000) > >> > // Check for statement cache > >> > val showResults2 = lines.filter(_.contains("statement > >> > cache")).flatMap(line > >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + > >> > _).print(1000) > >> > ssc.start() > >> > ssc.awaitTermination() > >> > //ssc.stop() > >> > println ("\nFinished at"); sqlContext.sql("SELECT > >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > >> > ").collect.foreach(println) > >> > } > >> > } > >> > > >> > Thanks > >> > > >> > > >> > Dr Mich Talebzadeh > >> > > >> > > >> > > >> > LinkedIn > >> > > >> > > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > >> > > >> > > >> > > >> > http://talebzadehmich.wordpress.com > >> > > >> > > > > > >