Why are you wanting to convert? As far as doing the conversion, createStream doesn't take the same arguments, look at the docs.
On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi, > > What is the best way of converting this program of that uses > KafkaUtils.createDirectStream to Sliding window using > > val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, topic) > > to > > val dstream = KafkaUtils.createStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, topic) > > > The program below works > > > import org.apache.spark.SparkContext > import org.apache.spark.SparkConf > import org.apache.spark.sql.Row > import org.apache.spark.sql.hive.HiveContext > import org.apache.spark.sql.types._ > import org.apache.spark.sql.SQLContext > import org.apache.spark.sql.functions._ > import _root_.kafka.serializer.StringDecoder > import org.apache.spark.streaming._ > import org.apache.spark.streaming.kafka.KafkaUtils > // > object CEP_assembly { > def main(args: Array[String]) { > val conf = new SparkConf(). > setAppName("CEP_assembly"). > setMaster("local[2]"). > set("spark.driver.allowMultipleContexts", "true"). > set("spark.hadoop.validateOutputSpecs", "false") > val sc = new SparkContext(conf) > // Create sqlContext based on HiveContext > val sqlContext = new HiveContext(sc) > import sqlContext.implicits._ > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > println ("\nStarted at"); sqlContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > val ssc = new StreamingContext(conf, Seconds(1)) > ssc.checkpoint("checkpoint") > val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", > "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" -> > "rhes564:2181", "group.id" -> "StreamTest" ) > val topic = Set("newtopic") > //val dstream = KafkaUtils.createStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, topic) > val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, topic) > dstream.cache() > //val windowed_dstream = dstream.window(new Duration(sliding_window_length), > new Duration(sliding_window_interval)) > dstream.print(1000) > val lines = dstream.map(_._2) > // Check for message > val showResults = lines.filter(_.contains("Sending dstream")).flatMap(line > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000) > // Check for statement cache > val showResults2 = lines.filter(_.contains("statement cache")).flatMap(line > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000) > ssc.start() > ssc.awaitTermination() > //ssc.stop() > println ("\nFinished at"); sqlContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > } > } > > Thanks > > > Dr Mich Talebzadeh > > > > LinkedIn > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > > > http://talebzadehmich.wordpress.com > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org