Thanks. Currently this is what I am doing
// Get the lines // val lines = messages.map(_._2) // Check for message val showResults = lines.filter(_.contains("Sending messages")).flatMap(line => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000) So it prints max of 1000 lines to terminal after filter and map. Can this be done as suggested? Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 5 April 2016 at 06:53, Sachin Aggarwal <different.sac...@gmail.com> wrote: > Hi > > Instead of using print() directly on Dstream, I will suggest you use > foreachRDD > if you wanted to materialize all rows , example shown here:- > > > https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams > > dstream.foreachRDD(rdd => { > val connection = createNewConnection() // executed at the driver > rdd.foreach(record => { > connection.send(record) // executed at the worker > }) > }) > > > On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> I am afraid print(Integer.MAX_VALUE) does not return any lines! However, >> print(1000) does >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 3 April 2016 at 19:46, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Since num is an Int, you can specify Integer.MAX_VALUE >>> >>> FYI >>> >>> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Thanks Ted. >>>> >>>> As I see print() materializes the first 10 rows. On the other hand >>>> print(n) will materialise n rows. >>>> >>>> How about if I wanted to materialize all rows? >>>> >>>> Cheers >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> >>>> On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> Mich: >>>>> See the following method of DStream: >>>>> >>>>> * Print the first num elements of each RDD generated in this >>>>> DStream. This is an output >>>>> * operator, so this DStream will be registered as an output stream >>>>> and there materialized. >>>>> */ >>>>> def print(num: Int): Unit = ssc.withScope { >>>>> >>>>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh < >>>>> mich.talebza...@gmail.com> wrote: >>>>> >>>>>> However this works. I am checking the logic to see if it does what I >>>>>> asked it to do >>>>>> >>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE >>>>>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, >>>>>> 1)).reduceByKey(_ + _).print >>>>>> >>>>>> scala> ssc.start() >>>>>> >>>>>> scala> ------------------------------------------- >>>>>> Time: 1459701925000 ms >>>>>> ------------------------------------------- >>>>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15 >>>>>> databases,27) >>>>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 OR,27) >>>>>> (Once databases are loaded to ASE 15, then you will need to maintain >>>>>> them the way you maintain your PROD. For example run UPDATE INDEX >>>>>> STATISTICS and REORG COMPACT as necessary. One of the frequent mistakes >>>>>> that people do is NOT pruning data from daily log tables in ASE 15 etc as >>>>>> they do it in PROD. This normally results in slower performance on ASE 15 >>>>>> databases as test cycles continue. Use MDA readings to measure daily DML >>>>>> activities on ASE 15 tables and compare them with those of PROD. A 24 >>>>>> hour >>>>>> cycle measurement should be good. If you notice that certain tables have >>>>>> different DML hits (insert/update/delete) compared to PROD you will know >>>>>> that either ASE 15 is not doing everything in terms of batch activity >>>>>> (some >>>>>> jobs are missing), or there is something inconsistent somewhere. ,27) >>>>>> (* Make sure that you have enough tempdb system segment space for >>>>>> UPDATE INDEX STATISTICS. It is always advisable to gauge the tempdb size >>>>>> required in ASE 15 QA and expand the tempdb database in production >>>>>> accordingly. The last thing you want is to blow up tempdb over the >>>>>> migration weekend.,27) >>>>>> (o In ASE 15 you can subdivide the task by running parallel UPDATE >>>>>> INDEX STATISTICS on different tables in the same database at the same >>>>>> time. >>>>>> Watch tempdb segment growth though! OR,27) >>>>>> >>>>>> >>>>>> >>>>>> Dr Mich Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> LinkedIn * >>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>> >>>>>> >>>>>> >>>>>> http://talebzadehmich.wordpress.com >>>>>> >>>>>> >>>>>> >>>>>> On 3 April 2016 at 17:06, Mich Talebzadeh <mich.talebza...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks Ted >>>>>>> >>>>>>> This works >>>>>>> >>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, String, >>>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String, >>>>>>> String)] = >>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063 >>>>>>> >>>>>>> scala> // Get the lines >>>>>>> scala> val lines = messages.map(_._2) >>>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] = >>>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64 >>>>>>> >>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) >>>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] = >>>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d >>>>>>> >>>>>>> However, this fails >>>>>>> >>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>> _).collect.foreach(println) >>>>>>> <console>:43: error: value collect is not a member of >>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)] >>>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>> _).collect.foreach(println) >>>>>>> >>>>>>> >>>>>>> Dr Mich Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> LinkedIn * >>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>> >>>>>>> >>>>>>> >>>>>>> http://talebzadehmich.wordpress.com >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>> >>>>>>>> bq. is not a member of (String, String) >>>>>>>> >>>>>>>> As shown above, contains shouldn't be applied directly on a tuple. >>>>>>>> >>>>>>>> Choose the element of the tuple and then apply contains on it. >>>>>>>> >>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh < >>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thank you gents. >>>>>>>>> >>>>>>>>> That should "\n" as carriage return >>>>>>>>> >>>>>>>>> OK I am using spark streaming to analyse the message >>>>>>>>> >>>>>>>>> It does the streaming >>>>>>>>> >>>>>>>>> import _root_.kafka.serializer.StringDecoder >>>>>>>>> import org.apache.spark.SparkConf >>>>>>>>> import org.apache.spark.streaming._ >>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils >>>>>>>>> // >>>>>>>>> scala> val sparkConf = new SparkConf(). >>>>>>>>> | setAppName("StreamTest"). >>>>>>>>> | setMaster("local[12]"). >>>>>>>>> | set("spark.driver.allowMultipleContexts", >>>>>>>>> "true"). >>>>>>>>> | set("spark.hadoop.validateOutputSpecs", >>>>>>>>> "false") >>>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55)) >>>>>>>>> scala> >>>>>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" >>>>>>>>> -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", >>>>>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" >>>>>>>>> ) >>>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] = >>>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url -> >>>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id >>>>>>>>> -> StreamTest) >>>>>>>>> scala> val topic = Set("newtopic") >>>>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic) >>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, >>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String, >>>>>>>>> String)] = >>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c >>>>>>>>> >>>>>>>>> This part is tricky >>>>>>>>> >>>>>>>>> scala> val showlines = messages.filter(_ contains("ASE >>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>> _).collect.foreach(println) >>>>>>>>> <console>:47: error: value contains is not a member of (String, >>>>>>>>> String) >>>>>>>>> val showlines = messages.filter(_ contains("ASE >>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>> _).collect.foreach(println) >>>>>>>>> >>>>>>>>> >>>>>>>>> How does one refer to the content of the stream here? >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> // >>>>>>>>> // Now want to do some analysis on the same text file >>>>>>>>> // >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Dr Mich Talebzadeh >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> LinkedIn * >>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> bq. split"\t," splits the filter by carriage return >>>>>>>>>> >>>>>>>>>> Minor correction: "\t" denotes tab character. >>>>>>>>>> >>>>>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Mich, >>>>>>>>>>> >>>>>>>>>>> 1. The first underscore in your filter call is refering to a >>>>>>>>>>> line in the file (as textFile() results in a collection of strings) >>>>>>>>>>> 2. You're correct. No need for it. >>>>>>>>>>> 3. Filter is expecting a Boolean result. So you can merge your >>>>>>>>>>> contains filters to one with AND (&&) statement. >>>>>>>>>>> 4. Correct. Each character in split() is used as a divider. >>>>>>>>>>> >>>>>>>>>>> Eliran Bivas >>>>>>>>>>> >>>>>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com> >>>>>>>>>>> *Sent:* Apr 3, 2016 15:06 >>>>>>>>>>> *To:* Eliran Bivas >>>>>>>>>>> *Cc:* user @spark >>>>>>>>>>> *Subject:* Re: multiple splits fails >>>>>>>>>>> >>>>>>>>>>> Hi Eliran, >>>>>>>>>>> >>>>>>>>>>> Many thanks for your input on this. >>>>>>>>>>> >>>>>>>>>>> I thought about what I was trying to achieve so I rewrote the >>>>>>>>>>> logic as follows: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 1. Read the text file in >>>>>>>>>>> 2. Filter out empty lines (well not really needed here) >>>>>>>>>>> 3. Search for lines that contain "ASE 15" and further have >>>>>>>>>>> sentence "UPDATE INDEX STATISTICS" in the said line >>>>>>>>>>> 4. Split the text by "\t" and "," >>>>>>>>>>> 5. Print the outcome >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> This was what I did with your suggestions included >>>>>>>>>>> >>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>>>>> f.cache() >>>>>>>>>>> f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_ >>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Couple of questions if I may >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 1. I take that "_" refers to content of the file read in by >>>>>>>>>>> default? >>>>>>>>>>> 2. _.length > 0 basically filters out blank lines (not >>>>>>>>>>> really needed here) >>>>>>>>>>> 3. Multiple filters are needed for each *contains* logic >>>>>>>>>>> 4. split"\t," splits the filter by carriage return AND ,? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Regards >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> LinkedIn * >>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Mich, >>>>>>>>>>>> >>>>>>>>>>>> Few comments: >>>>>>>>>>>> >>>>>>>>>>>> When doing .filter(_ > “”) you’re actually doing a >>>>>>>>>>>> lexicographic comparison and not filtering for empty lines (which >>>>>>>>>>>> could be >>>>>>>>>>>> achieved with _.notEmpty or _.length > 0). >>>>>>>>>>>> I think that filtering with _.contains should be sufficient and >>>>>>>>>>>> the first filter can be omitted. >>>>>>>>>>>> >>>>>>>>>>>> As for line => line.split(“\t”).split(“,”): >>>>>>>>>>>> You have to do a second map or (since split() requires a regex >>>>>>>>>>>> as input) .split(“\t,”). >>>>>>>>>>>> The problem is that your first split() call will generate an >>>>>>>>>>>> Array and then your second call will result in an error. >>>>>>>>>>>> e.g. >>>>>>>>>>>> >>>>>>>>>>>> val lines: Array[String] = line.split(“\t”) >>>>>>>>>>>> lines.split(“,”) // Compilation error - no method split() >>>>>>>>>>>> exists for Array >>>>>>>>>>>> >>>>>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or >>>>>>>>>>>> map(_.split(“\t,”)) >>>>>>>>>>>> >>>>>>>>>>>> Hope that helps. >>>>>>>>>>>> >>>>>>>>>>>> *Eliran Bivas* >>>>>>>>>>>> Data Team | iguaz.io >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh < >>>>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I am not sure this is the correct approach >>>>>>>>>>>> >>>>>>>>>>>> Read a text file in >>>>>>>>>>>> >>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Now I want to get rid of empty lines and filter only the lines >>>>>>>>>>>> that contain "ASE15" >>>>>>>>>>>> >>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")). >>>>>>>>>>>> >>>>>>>>>>>> The above works but I am not sure whether I need two filter >>>>>>>>>>>> transformation above? Can it be done in one? >>>>>>>>>>>> >>>>>>>>>>>> Now I want to map the above filter to lines with carriage >>>>>>>>>>>> return ans split them by "," >>>>>>>>>>>> >>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>>>>>>> (line.split("\t"))) >>>>>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] = >>>>>>>>>>>> MapPartitionsRDD[131] at map at <console>:30 >>>>>>>>>>>> >>>>>>>>>>>> Now I want to split the output by "," >>>>>>>>>>>> >>>>>>>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>>>>>>> (line.split("\t").split(","))) >>>>>>>>>>>> <console>:30: error: value split is not a member of >>>>>>>>>>>> Array[String] >>>>>>>>>>>> f.filter(_ > "").filter(_ >>>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(","))) >>>>>>>>>>>> >>>>>>>>>>>> ^ >>>>>>>>>>>> Any advice will be appreciated >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> >>>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> LinkedIn * >>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > > > -- > > Thanks & Regards > > Sachin Aggarwal > 7760502772 >