Re: FW: converting DStream[String] into RDD[String] in spark streaming [I]
You're just describing the operation of Spark Streaming at its simplest, without windowing. You get non-overlapping RDDs of the most recent data each time. On Sun, Mar 29, 2015 at 8:44 AM, Deenar Toraskar wrote: > Sean > > Thank you very much for your response. I have a requirement run a function > only over the new inputs in a Spark Streaming sliding window, i.e. the > latest batch of events only, do I just get a new Dstream using the slide > duration equal to the window duration ? such as > > > val sparkConf = new SparkConf().setAppName("TwitterRawJSON") > val ssc = new StreamingContext(sparkConf, Seconds(30)) > // write all new tweets in the last 10mins > stream.window(Seconds(600), Seconds(600), > saveAsTextFiles("hdfs://localhost:9000/twitterRawJSON") > > > Alternatively I could find the time of the new batch, i could do something > like this > > def saveAsTextFiles(prefix: String, suffix: String = "") { > val saveFunc = (rdd: RDD[T], time: Time) => { >if (time == currentBatchTime) { > val file = rddToFileName(prefix, suffix, time) > rdd.saveAsTextFile(file) >} > } > this.foreachRDD(saveFunc) > } > > Regards > Deenar > > P.S. The mail archive on nabble does not seem to show all responses. > -Original Mes > > P.sage----- > From: Sean Owen [mailto:so...@cloudera.com] > Sent: 22 March 2015 11:49 > To: Deenar Toraskar > Cc: user@spark.apache.org > Subject: Re: converting DStream[String] into RDD[String] in spark streaming > > On Sun, Mar 22, 2015 at 8:43 AM, deenar.toraskar > wrote: >> 1) if there are no sliding window calls in this streaming context, >> will there just one file written per interval? > > As many files as there are partitions will be written in each interval. > >> 2) if there is a sliding window call in the same context, such as >> >> val hashTags = stream.flatMap(json => >> DataObjectFactory.createStatus(json).getText.split(" >> ").filter(_.startsWith("#"))) >> >> val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, >> Seconds(600)) >> .map{case (topic, count) => (count, topic)} >> .transform(_.sortByKey(false)) >> >> will the some files get written multiples time (as long as the >> interval is in the batch) > > I don't think it's right to say files will be written many times, but yes it > is my understanding that data will be written many times since a datum lies > in many windows. > > > --- > This e-mail may contain confidential and/or privileged information. If you > are not the intended recipient (or have received this e-mail in error) > please notify the sender immediately and delete this e-mail. Any > unauthorized copying, disclosure or distribution of the material in this > e-mail is strictly forbidden. > > Please refer to http://www.db.com/en/content/eu_disclosures.htm for > additional EU corporate and regulatory disclosures and to > http://www.db.com/unitedkingdom/content/privacy.htm for information about > privacy. > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: converting DStream[String] into RDD[String] in spark streaming [I]
Sean Thank you very much for your response. I have a requirement run a function only over the new inputs in a Spark Streaming sliding window, i.e. the latest batch of events only, do I just get a new Dstream using the slide duration equal to the window duration ? such as val sparkConf = new SparkConf().setAppName("TwitterRawJSON") val ssc = new StreamingContext(sparkConf, Seconds(30)) // write all new tweets in the last 10mins stream.window(Seconds(600), Seconds(600), saveAsTextFiles("hdfs://localhost:9000/twitterRawJSON") Alternatively I could find the time of the new batch, i could do something like this def saveAsTextFiles(prefix: String, suffix: String = "") { val saveFunc = (rdd: RDD[T], time: Time) => { if (time == currentBatchTime) { val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } } this.foreachRDD(saveFunc) } Regards Deenar P.S. The mail archive on nabble does not seem to show all responses. -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: 22 March 2015 11:49 To: Deenar Toraskar Cc: user@spark.apache.org Subject: Re: converting DStream[String] into RDD[String] in spark streaming On Sun, Mar 22, 2015 at 8:43 AM, deenar.toraskar wrote: > 1) if there are no sliding window calls in this streaming context, > will there just one file written per interval? As many files as there are partitions will be written in each interval. > 2) if there is a sliding window call in the same context, such as > > val hashTags = stream.flatMap(json => > DataObjectFactory.createStatus(json).getText.split(" > ").filter(_.startsWith("#"))) > > val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, > Seconds(600)) > .map{case (topic, count) => (count, topic)} > .transform(_.sortByKey(false)) > > will the some files get written multiples time (as long as the > interval is in the batch) I don't think it's right to say files will be written many times, but yes it is my understanding that data will be written many times since a datum lies in many windows. --- This e-mail may contain confidential and/or privileged information. If you are not the intended recipient (or have received this e-mail in error) please notify the sender immediately and delete this e-mail. Any unauthorized copying, disclosure or distribution of the material in this e-mail is strictly forbidden. Please refer to http://www.db.com/en/content/eu_disclosures.htm for additional EU corporate and regulatory disclosures and to http://www.db.com/unitedkingdom/content/privacy.htm for information about privacy.
Re: converting DStream[String] into RDD[String] in spark streaming
On Sun, Mar 22, 2015 at 8:43 AM, deenar.toraskar wrote: > 1) if there are no sliding window calls in this streaming context, will > there just one file written per interval? As many files as there are partitions will be written in each interval. > 2) if there is a sliding window call in the same context, such as > > val hashTags = stream.flatMap(json => > DataObjectFactory.createStatus(json).getText.split(" > ").filter(_.startsWith("#"))) > > val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, > Seconds(600)) > .map{case (topic, count) => (count, topic)} > .transform(_.sortByKey(false)) > > will the some files get written multiples time (as long as the interval is > in the batch) I don't think it's right to say files will be written many times, but yes it is my understanding that data will be written many times since a datum lies in many windows. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: converting DStream[String] into RDD[String] in spark streaming
Sean Dstream.saveAsTextFiles internally calls foreachRDD and saveAsTextFile for each interval def saveAsTextFiles(prefix: String, suffix: String = "") { val saveFunc = (rdd: RDD[T], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } this.foreachRDD(saveFunc) } val sparkConf = new SparkConf().setAppName("TwitterRawJSON") val ssc = new StreamingContext(sparkConf, Seconds(30)) stream.saveAsTextFiles("hdfs://localhost:9000/twitterRawJSON") 1) if there are no sliding window calls in this streaming context, will there just one file written per interval? 2) if there is a sliding window call in the same context, such as val hashTags = stream.flatMap(json => DataObjectFactory.createStatus(json).getText.split(" ").filter(_.startsWith("#"))) val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(600)) .map{case (topic, count) => (count, topic)} .transform(_.sortByKey(false)) will the some files get written multiples time (as long as the interval is in the batch) Deenar >>DStream.foreachRDD gives you an RDD[String] for each interval of course. I don't think it makes sense to say a DStream can be converted into one RDD since it is a stream. The past elements are inherently not supposed to stick around for a long time, and future elements aren't known. You may consider saving each RDD[String] to HDFS, and then simply loading it from HDFS as an RDD[String]. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/converting-DStream-String-into-RDD-String-in-spark-streaming-tp20253p22175.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: converting DStream[String] into RDD[String] in spark streaming
Thanks Dear, It is good to save this data to HDFS and then load back into an RDD :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/converting-DStream-String-into-RDD-String-in-spark-streaming-tp20253p20258.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: converting DStream[String] into RDD[String] in spark streaming
DStream.foreachRDD gives you an RDD[String] for each interval of course. I don't think it makes sense to say a DStream can be converted into one RDD since it is a stream. The past elements are inherently not supposed to stick around for a long time, and future elements aren't known. You may consider saving each RDD[String] to HDFS, and then simply loading it from HDFS as an RDD[String]. On Wed, Dec 3, 2014 at 7:45 AM, Hafiz Mujadid wrote: > Hi everyOne! > > I want to convert a DStream[String] into an RDD[String]. I could not find > how to do this. > > var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, > DefaultDecoder](ssc, consumerConfig, topicMap, > StorageLevel.MEMORY_ONLY).map(_._2) > val streams = data.window(Seconds(interval), Seconds(interval)).map(x => > new String(x)) > > Now I want to convert this streams into a single RDD[String]. > > > Any help please. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/converting-DStream-String-into-RDD-String-in-spark-streaming-tp20253.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
converting DStream[String] into RDD[String] in spark streaming
Hi everyOne! I want to convert a DStream[String] into an RDD[String]. I could not find how to do this. var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap, StorageLevel.MEMORY_ONLY).map(_._2) val streams = data.window(Seconds(interval), Seconds(interval)).map(x => new String(x)) Now I want to convert this streams into a single RDD[String]. Any help please. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/converting-DStream-String-into-RDD-String-in-spark-streaming-tp20253.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org