You can still do sliding windows with createDirectStream, just do your
map / extraction of fields before the window.

On Fri, Apr 22, 2016 at 3:53 PM, Mich Talebzadeh
<mich.talebza...@gmail.com> wrote:
> Hi Cody,
>
> I want to use sliding windows for Complex Event Processing micro-batching
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 22 April 2016 at 21:51, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Why are you wanting to convert?
>>
>> As far as doing the conversion, createStream doesn't take the same
>> arguments, look at the docs.
>>
>> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh
>> <mich.talebza...@gmail.com> wrote:
>> > Hi,
>> >
>> > What is the best way of converting this program of that uses
>> > KafkaUtils.createDirectStream to Sliding window using
>> >
>> > val dstream = KafkaUtils.createDirectStream[String, String,
>> > StringDecoder,
>> > StringDecoder](ssc, kafkaParams, topic)
>> >
>> > to
>> >
>> > val dstream = KafkaUtils.createStream[String, String, StringDecoder,
>> > StringDecoder](ssc, kafkaParams, topic)
>> >
>> >
>> > The program below works
>> >
>> >
>> > import org.apache.spark.SparkContext
>> > import org.apache.spark.SparkConf
>> > import org.apache.spark.sql.Row
>> > import org.apache.spark.sql.hive.HiveContext
>> > import org.apache.spark.sql.types._
>> > import org.apache.spark.sql.SQLContext
>> > import org.apache.spark.sql.functions._
>> > import _root_.kafka.serializer.StringDecoder
>> > import org.apache.spark.streaming._
>> > import org.apache.spark.streaming.kafka.KafkaUtils
>> > //
>> > object CEP_assembly {
>> >   def main(args: Array[String]) {
>> >   val conf = new SparkConf().
>> >                setAppName("CEP_assembly").
>> >                setMaster("local[2]").
>> >                set("spark.driver.allowMultipleContexts", "true").
>> >                set("spark.hadoop.validateOutputSpecs", "false")
>> >   val sc = new SparkContext(conf)
>> >   // Create sqlContext based on HiveContext
>> >   val sqlContext = new HiveContext(sc)
>> >   import sqlContext.implicits._
>> >   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> >   println ("\nStarted at"); sqlContext.sql("SELECT
>> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>> > ").collect.foreach(println)
>> > val ssc = new StreamingContext(conf, Seconds(1))
>> > ssc.checkpoint("checkpoint")
>> > val kafkaParams = Map[String, String]("bootstrap.servers" ->
>> > "rhes564:9092",
>> > "schema.registry.url" -> "http://rhes564:8081";, "zookeeper.connect" ->
>> > "rhes564:2181", "group.id" -> "StreamTest" )
>> > val topic = Set("newtopic")
>> > //val dstream = KafkaUtils.createStream[String, String, StringDecoder,
>> > StringDecoder](ssc, kafkaParams, topic)
>> > val dstream = KafkaUtils.createDirectStream[String, String,
>> > StringDecoder,
>> > StringDecoder](ssc, kafkaParams, topic)
>> > dstream.cache()
>> > //val windowed_dstream = dstream.window(new
>> > Duration(sliding_window_length),
>> > new Duration(sliding_window_interval))
>> > dstream.print(1000)
>> > val lines = dstream.map(_._2)
>> > // Check for message
>> > val showResults = lines.filter(_.contains("Sending
>> > dstream")).flatMap(line
>> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>> > _).print(1000)
>> > // Check for statement cache
>> > val showResults2 = lines.filter(_.contains("statement
>> > cache")).flatMap(line
>> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>> > _).print(1000)
>> > ssc.start()
>> > ssc.awaitTermination()
>> > //ssc.stop()
>> >   println ("\nFinished at"); sqlContext.sql("SELECT
>> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>> > ").collect.foreach(println)
>> >   }
>> > }
>> >
>> > Thanks
>> >
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to