Hi,

What is the best way of converting this program of that uses
KafkaUtils.createDirectStream to Sliding window using

val dstream = *KafkaUtils.createDirectStream*[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topic)

to

val dstream = *KafkaUtils.createStream*[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topic)


The program below works


import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
//
object CEP_assembly {
  def main(args: Array[String]) {
  val conf = new SparkConf().
               setAppName("CEP_assembly").
               setMaster("local[2]").
               set("spark.driver.allowMultipleContexts", "true").
               set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
val topic = Set("newtopic")
//val dstream = KafkaUtils.createStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topic)
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topic)
dstream.cache()
//val windowed_dstream = dstream.window(new Duration(sliding_window_length),
new Duration(sliding_window_interval))
dstream.print(1000)
val lines = dstream.map(_._2)
// Check for message
val showResults = lines.filter(_.contains("Sending dstream")).flatMap(line
=> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000)
// Check for statement cache
val showResults2 = lines.filter(_.contains("statement cache")).flatMap(line
=> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000)
ssc.start()
ssc.awaitTermination()
//ssc.stop()
  println ("\nFinished at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
  }
}

Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com

Reply via email to