Why are you wanting to convert?

As far as doing the conversion, createStream doesn't take the same
arguments, look at the docs.

On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh
<mich.talebza...@gmail.com> wrote:
> Hi,
>
> What is the best way of converting this program of that uses
> KafkaUtils.createDirectStream to Sliding window using
>
> val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topic)
>
> to
>
> val dstream = KafkaUtils.createStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topic)
>
>
> The program below works
>
>
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.functions._
> import _root_.kafka.serializer.StringDecoder
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka.KafkaUtils
> //
> object CEP_assembly {
>   def main(args: Array[String]) {
>   val conf = new SparkConf().
>                setAppName("CEP_assembly").
>                setMaster("local[2]").
>                set("spark.driver.allowMultipleContexts", "true").
>                set("spark.hadoop.validateOutputSpecs", "false")
>   val sc = new SparkContext(conf)
>   // Create sqlContext based on HiveContext
>   val sqlContext = new HiveContext(sc)
>   import sqlContext.implicits._
>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>   println ("\nStarted at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val ssc = new StreamingContext(conf, Seconds(1))
> ssc.checkpoint("checkpoint")
> val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092",
> "schema.registry.url" -> "http://rhes564:8081";, "zookeeper.connect" ->
> "rhes564:2181", "group.id" -> "StreamTest" )
> val topic = Set("newtopic")
> //val dstream = KafkaUtils.createStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topic)
> val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topic)
> dstream.cache()
> //val windowed_dstream = dstream.window(new Duration(sliding_window_length),
> new Duration(sliding_window_interval))
> dstream.print(1000)
> val lines = dstream.map(_._2)
> // Check for message
> val showResults = lines.filter(_.contains("Sending dstream")).flatMap(line
> => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000)
> // Check for statement cache
> val showResults2 = lines.filter(_.contains("statement cache")).flatMap(line
> => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000)
> ssc.start()
> ssc.awaitTermination()
> //ssc.stop()
>   println ("\nFinished at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
>   }
> }
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to