Thanks Ted. As I see print() materializes the first 10 rows. On the other hand print(n) will materialise n rows.
How about if I wanted to materialize all rows? Cheers Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote: > Mich: > See the following method of DStream: > > * Print the first num elements of each RDD generated in this DStream. > This is an output > * operator, so this DStream will be registered as an output stream and > there materialized. > */ > def print(num: Int): Unit = ssc.withScope { > > On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <mich.talebza...@gmail.com > > wrote: > >> However this works. I am checking the logic to see if it does what I >> asked it to do >> >> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE >> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, >> 1)).reduceByKey(_ + _).print >> >> scala> ssc.start() >> >> scala> ------------------------------------------- >> Time: 1459701925000 ms >> ------------------------------------------- >> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15 >> databases,27) >> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 OR,27) >> (Once databases are loaded to ASE 15, then you will need to maintain them >> the way you maintain your PROD. For example run UPDATE INDEX STATISTICS and >> REORG COMPACT as necessary. One of the frequent mistakes that people do is >> NOT pruning data from daily log tables in ASE 15 etc as they do it in PROD. >> This normally results in slower performance on ASE 15 databases as test >> cycles continue. Use MDA readings to measure daily DML activities on ASE 15 >> tables and compare them with those of PROD. A 24 hour cycle measurement >> should be good. If you notice that certain tables have different DML hits >> (insert/update/delete) compared to PROD you will know that either ASE 15 is >> not doing everything in terms of batch activity (some jobs are missing), or >> there is something inconsistent somewhere. ,27) >> (* Make sure that you have enough tempdb system segment space for UPDATE >> INDEX STATISTICS. It is always advisable to gauge the tempdb size required >> in ASE 15 QA and expand the tempdb database in production accordingly. The >> last thing you want is to blow up tempdb over the migration weekend.,27) >> (o In ASE 15 you can subdivide the task by running parallel UPDATE INDEX >> STATISTICS on different tables in the same database at the same time. Watch >> tempdb segment growth though! OR,27) >> >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 3 April 2016 at 17:06, Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> Thanks Ted >>> >>> This works >>> >>> scala> val messages = KafkaUtils.createDirectStream[String, String, >>> StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>> messages: org.apache.spark.streaming.dstream.InputDStream[(String, >>> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063 >>> >>> scala> // Get the lines >>> scala> val lines = messages.map(_._2) >>> lines: org.apache.spark.streaming.dstream.DStream[String] = >>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64 >>> >>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) >>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] = >>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d >>> >>> However, this fails >>> >>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>> _).collect.foreach(println) >>> <console>:43: error: value collect is not a member of >>> org.apache.spark.streaming.dstream.DStream[(String, Int)] >>> val v = lines.filter(_.contains("ASE 15")).filter(_ >>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>> _).collect.foreach(println) >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> bq. is not a member of (String, String) >>>> >>>> As shown above, contains shouldn't be applied directly on a tuple. >>>> >>>> Choose the element of the tuple and then apply contains on it. >>>> >>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> Thank you gents. >>>>> >>>>> That should "\n" as carriage return >>>>> >>>>> OK I am using spark streaming to analyse the message >>>>> >>>>> It does the streaming >>>>> >>>>> import _root_.kafka.serializer.StringDecoder >>>>> import org.apache.spark.SparkConf >>>>> import org.apache.spark.streaming._ >>>>> import org.apache.spark.streaming.kafka.KafkaUtils >>>>> // >>>>> scala> val sparkConf = new SparkConf(). >>>>> | setAppName("StreamTest"). >>>>> | setMaster("local[12]"). >>>>> | set("spark.driver.allowMultipleContexts", "true"). >>>>> | set("spark.hadoop.validateOutputSpecs", "false") >>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55)) >>>>> scala> >>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" -> >>>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", >>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) >>>>> kafkaParams: scala.collection.immutable.Map[String,String] = >>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url -> >>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id -> >>>>> StreamTest) >>>>> scala> val topic = Set("newtopic") >>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic) >>>>> scala> val messages = KafkaUtils.createDirectStream[String, String, >>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String, >>>>> String)] = >>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c >>>>> >>>>> This part is tricky >>>>> >>>>> scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_ >>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>> _).collect.foreach(println) >>>>> <console>:47: error: value contains is not a member of (String, String) >>>>> val showlines = messages.filter(_ contains("ASE >>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>> _).collect.foreach(println) >>>>> >>>>> >>>>> How does one refer to the content of the stream here? >>>>> >>>>> Thanks >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> // >>>>> // Now want to do some analysis on the same text file >>>>> // >>>>> >>>>> >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> >>>>> >>>>> LinkedIn * >>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>> >>>>> >>>>> >>>>> http://talebzadehmich.wordpress.com >>>>> >>>>> >>>>> >>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote: >>>>> >>>>>> bq. split"\t," splits the filter by carriage return >>>>>> >>>>>> Minor correction: "\t" denotes tab character. >>>>>> >>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io> >>>>>> wrote: >>>>>> >>>>>>> Hi Mich, >>>>>>> >>>>>>> 1. The first underscore in your filter call is refering to a line in >>>>>>> the file (as textFile() results in a collection of strings) >>>>>>> 2. You're correct. No need for it. >>>>>>> 3. Filter is expecting a Boolean result. So you can merge your >>>>>>> contains filters to one with AND (&&) statement. >>>>>>> 4. Correct. Each character in split() is used as a divider. >>>>>>> >>>>>>> Eliran Bivas >>>>>>> >>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com> >>>>>>> *Sent:* Apr 3, 2016 15:06 >>>>>>> *To:* Eliran Bivas >>>>>>> *Cc:* user @spark >>>>>>> *Subject:* Re: multiple splits fails >>>>>>> >>>>>>> Hi Eliran, >>>>>>> >>>>>>> Many thanks for your input on this. >>>>>>> >>>>>>> I thought about what I was trying to achieve so I rewrote the logic >>>>>>> as follows: >>>>>>> >>>>>>> >>>>>>> 1. Read the text file in >>>>>>> 2. Filter out empty lines (well not really needed here) >>>>>>> 3. Search for lines that contain "ASE 15" and further have >>>>>>> sentence "UPDATE INDEX STATISTICS" in the said line >>>>>>> 4. Split the text by "\t" and "," >>>>>>> 5. Print the outcome >>>>>>> >>>>>>> >>>>>>> This was what I did with your suggestions included >>>>>>> >>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>> f.cache() >>>>>>> f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_ >>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>> _).collect.foreach(println) >>>>>>> >>>>>>> >>>>>>> Couple of questions if I may >>>>>>> >>>>>>> >>>>>>> 1. I take that "_" refers to content of the file read in by >>>>>>> default? >>>>>>> 2. _.length > 0 basically filters out blank lines (not really >>>>>>> needed here) >>>>>>> 3. Multiple filters are needed for each *contains* logic >>>>>>> 4. split"\t," splits the filter by carriage return AND ,? >>>>>>> >>>>>>> >>>>>>> Regards >>>>>>> >>>>>>> >>>>>>> Dr Mich Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> LinkedIn * >>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>> >>>>>>> >>>>>>> >>>>>>> http://talebzadehmich.wordpress.com >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote: >>>>>>> >>>>>>>> Hi Mich, >>>>>>>> >>>>>>>> Few comments: >>>>>>>> >>>>>>>> When doing .filter(_ > “”) you’re actually doing a lexicographic >>>>>>>> comparison and not filtering for empty lines (which could be achieved >>>>>>>> with >>>>>>>> _.notEmpty or _.length > 0). >>>>>>>> I think that filtering with _.contains should be sufficient and >>>>>>>> the first filter can be omitted. >>>>>>>> >>>>>>>> As for line => line.split(“\t”).split(“,”): >>>>>>>> You have to do a second map or (since split() requires a regex as >>>>>>>> input) .split(“\t,”). >>>>>>>> The problem is that your first split() call will generate an Array >>>>>>>> and then your second call will result in an error. >>>>>>>> e.g. >>>>>>>> >>>>>>>> val lines: Array[String] = line.split(“\t”) >>>>>>>> lines.split(“,”) // Compilation error - no method split() exists >>>>>>>> for Array >>>>>>>> >>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or >>>>>>>> map(_.split(“\t,”)) >>>>>>>> >>>>>>>> Hope that helps. >>>>>>>> >>>>>>>> *Eliran Bivas* >>>>>>>> Data Team | iguaz.io >>>>>>>> >>>>>>>> >>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <mich.talebza...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I am not sure this is the correct approach >>>>>>>> >>>>>>>> Read a text file in >>>>>>>> >>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>> >>>>>>>> >>>>>>>> Now I want to get rid of empty lines and filter only the lines that >>>>>>>> contain "ASE15" >>>>>>>> >>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")). >>>>>>>> >>>>>>>> The above works but I am not sure whether I need two filter >>>>>>>> transformation above? Can it be done in one? >>>>>>>> >>>>>>>> Now I want to map the above filter to lines with carriage return >>>>>>>> ans split them by "," >>>>>>>> >>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>>> (line.split("\t"))) >>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] = >>>>>>>> MapPartitionsRDD[131] at map at <console>:30 >>>>>>>> >>>>>>>> Now I want to split the output by "," >>>>>>>> >>>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>>> (line.split("\t").split(","))) >>>>>>>> <console>:30: error: value split is not a member of Array[String] >>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line >>>>>>>> => (line.split("\t").split(","))) >>>>>>>> >>>>>>>> ^ >>>>>>>> Any advice will be appreciated >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> Dr Mich Talebzadeh >>>>>>>> >>>>>>>> >>>>>>>> LinkedIn * >>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>> >>>>>>>> >>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >