I am afraid print(Integer.MAX_VALUE) does not return any lines! However, print(1000) does
Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 3 April 2016 at 19:46, Ted Yu <yuzhih...@gmail.com> wrote: > Since num is an Int, you can specify Integer.MAX_VALUE > > FYI > > On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> Thanks Ted. >> >> As I see print() materializes the first 10 rows. On the other hand >> print(n) will materialise n rows. >> >> How about if I wanted to materialize all rows? >> >> Cheers >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Mich: >>> See the following method of DStream: >>> >>> * Print the first num elements of each RDD generated in this DStream. >>> This is an output >>> * operator, so this DStream will be registered as an output stream >>> and there materialized. >>> */ >>> def print(num: Int): Unit = ssc.withScope { >>> >>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> However this works. I am checking the logic to see if it does what I >>>> asked it to do >>>> >>>> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE >>>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, >>>> 1)).reduceByKey(_ + _).print >>>> >>>> scala> ssc.start() >>>> >>>> scala> ------------------------------------------- >>>> Time: 1459701925000 ms >>>> ------------------------------------------- >>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15 >>>> databases,27) >>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 OR,27) >>>> (Once databases are loaded to ASE 15, then you will need to maintain >>>> them the way you maintain your PROD. For example run UPDATE INDEX >>>> STATISTICS and REORG COMPACT as necessary. One of the frequent mistakes >>>> that people do is NOT pruning data from daily log tables in ASE 15 etc as >>>> they do it in PROD. This normally results in slower performance on ASE 15 >>>> databases as test cycles continue. Use MDA readings to measure daily DML >>>> activities on ASE 15 tables and compare them with those of PROD. A 24 hour >>>> cycle measurement should be good. If you notice that certain tables have >>>> different DML hits (insert/update/delete) compared to PROD you will know >>>> that either ASE 15 is not doing everything in terms of batch activity (some >>>> jobs are missing), or there is something inconsistent somewhere. ,27) >>>> (* Make sure that you have enough tempdb system segment space for >>>> UPDATE INDEX STATISTICS. It is always advisable to gauge the tempdb size >>>> required in ASE 15 QA and expand the tempdb database in production >>>> accordingly. The last thing you want is to blow up tempdb over the >>>> migration weekend.,27) >>>> (o In ASE 15 you can subdivide the task by running parallel UPDATE >>>> INDEX STATISTICS on different tables in the same database at the same time. >>>> Watch tempdb segment growth though! OR,27) >>>> >>>> >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> >>>> On 3 April 2016 at 17:06, Mich Talebzadeh <mich.talebza...@gmail.com> >>>> wrote: >>>> >>>>> Thanks Ted >>>>> >>>>> This works >>>>> >>>>> scala> val messages = KafkaUtils.createDirectStream[String, String, >>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String, >>>>> String)] = >>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063 >>>>> >>>>> scala> // Get the lines >>>>> scala> val lines = messages.map(_._2) >>>>> lines: org.apache.spark.streaming.dstream.DStream[String] = >>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64 >>>>> >>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) >>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] = >>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d >>>>> >>>>> However, this fails >>>>> >>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>> _).collect.foreach(println) >>>>> <console>:43: error: value collect is not a member of >>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)] >>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>> _).collect.foreach(println) >>>>> >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> >>>>> >>>>> LinkedIn * >>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>> >>>>> >>>>> >>>>> http://talebzadehmich.wordpress.com >>>>> >>>>> >>>>> >>>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote: >>>>> >>>>>> bq. is not a member of (String, String) >>>>>> >>>>>> As shown above, contains shouldn't be applied directly on a tuple. >>>>>> >>>>>> Choose the element of the tuple and then apply contains on it. >>>>>> >>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Thank you gents. >>>>>>> >>>>>>> That should "\n" as carriage return >>>>>>> >>>>>>> OK I am using spark streaming to analyse the message >>>>>>> >>>>>>> It does the streaming >>>>>>> >>>>>>> import _root_.kafka.serializer.StringDecoder >>>>>>> import org.apache.spark.SparkConf >>>>>>> import org.apache.spark.streaming._ >>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils >>>>>>> // >>>>>>> scala> val sparkConf = new SparkConf(). >>>>>>> | setAppName("StreamTest"). >>>>>>> | setMaster("local[12]"). >>>>>>> | set("spark.driver.allowMultipleContexts", >>>>>>> "true"). >>>>>>> | set("spark.hadoop.validateOutputSpecs", "false") >>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55)) >>>>>>> scala> >>>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" -> >>>>>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", >>>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) >>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] = >>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url -> >>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id -> >>>>>>> StreamTest) >>>>>>> scala> val topic = Set("newtopic") >>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic) >>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, String, >>>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String, >>>>>>> String)] = >>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c >>>>>>> >>>>>>> This part is tricky >>>>>>> >>>>>>> scala> val showlines = messages.filter(_ contains("ASE >>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>> _).collect.foreach(println) >>>>>>> <console>:47: error: value contains is not a member of (String, >>>>>>> String) >>>>>>> val showlines = messages.filter(_ contains("ASE >>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>> _).collect.foreach(println) >>>>>>> >>>>>>> >>>>>>> How does one refer to the content of the stream here? >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> // >>>>>>> // Now want to do some analysis on the same text file >>>>>>> // >>>>>>> >>>>>>> >>>>>>> >>>>>>> Dr Mich Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> LinkedIn * >>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>> >>>>>>> >>>>>>> >>>>>>> http://talebzadehmich.wordpress.com >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>> >>>>>>>> bq. split"\t," splits the filter by carriage return >>>>>>>> >>>>>>>> Minor correction: "\t" denotes tab character. >>>>>>>> >>>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Mich, >>>>>>>>> >>>>>>>>> 1. The first underscore in your filter call is refering to a line >>>>>>>>> in the file (as textFile() results in a collection of strings) >>>>>>>>> 2. You're correct. No need for it. >>>>>>>>> 3. Filter is expecting a Boolean result. So you can merge your >>>>>>>>> contains filters to one with AND (&&) statement. >>>>>>>>> 4. Correct. Each character in split() is used as a divider. >>>>>>>>> >>>>>>>>> Eliran Bivas >>>>>>>>> >>>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com> >>>>>>>>> *Sent:* Apr 3, 2016 15:06 >>>>>>>>> *To:* Eliran Bivas >>>>>>>>> *Cc:* user @spark >>>>>>>>> *Subject:* Re: multiple splits fails >>>>>>>>> >>>>>>>>> Hi Eliran, >>>>>>>>> >>>>>>>>> Many thanks for your input on this. >>>>>>>>> >>>>>>>>> I thought about what I was trying to achieve so I rewrote the >>>>>>>>> logic as follows: >>>>>>>>> >>>>>>>>> >>>>>>>>> 1. Read the text file in >>>>>>>>> 2. Filter out empty lines (well not really needed here) >>>>>>>>> 3. Search for lines that contain "ASE 15" and further have >>>>>>>>> sentence "UPDATE INDEX STATISTICS" in the said line >>>>>>>>> 4. Split the text by "\t" and "," >>>>>>>>> 5. Print the outcome >>>>>>>>> >>>>>>>>> >>>>>>>>> This was what I did with your suggestions included >>>>>>>>> >>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>>> f.cache() >>>>>>>>> f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_ >>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>> _).collect.foreach(println) >>>>>>>>> >>>>>>>>> >>>>>>>>> Couple of questions if I may >>>>>>>>> >>>>>>>>> >>>>>>>>> 1. I take that "_" refers to content of the file read in by >>>>>>>>> default? >>>>>>>>> 2. _.length > 0 basically filters out blank lines (not really >>>>>>>>> needed here) >>>>>>>>> 3. Multiple filters are needed for each *contains* logic >>>>>>>>> 4. split"\t," splits the filter by carriage return AND ,? >>>>>>>>> >>>>>>>>> >>>>>>>>> Regards >>>>>>>>> >>>>>>>>> >>>>>>>>> Dr Mich Talebzadeh >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> LinkedIn * >>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote: >>>>>>>>> >>>>>>>>>> Hi Mich, >>>>>>>>>> >>>>>>>>>> Few comments: >>>>>>>>>> >>>>>>>>>> When doing .filter(_ > “”) you’re actually doing a lexicographic >>>>>>>>>> comparison and not filtering for empty lines (which could be >>>>>>>>>> achieved with >>>>>>>>>> _.notEmpty or _.length > 0). >>>>>>>>>> I think that filtering with _.contains should be sufficient and >>>>>>>>>> the first filter can be omitted. >>>>>>>>>> >>>>>>>>>> As for line => line.split(“\t”).split(“,”): >>>>>>>>>> You have to do a second map or (since split() requires a regex >>>>>>>>>> as input) .split(“\t,”). >>>>>>>>>> The problem is that your first split() call will generate an >>>>>>>>>> Array and then your second call will result in an error. >>>>>>>>>> e.g. >>>>>>>>>> >>>>>>>>>> val lines: Array[String] = line.split(“\t”) >>>>>>>>>> lines.split(“,”) // Compilation error - no method split() exists >>>>>>>>>> for Array >>>>>>>>>> >>>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or >>>>>>>>>> map(_.split(“\t,”)) >>>>>>>>>> >>>>>>>>>> Hope that helps. >>>>>>>>>> >>>>>>>>>> *Eliran Bivas* >>>>>>>>>> Data Team | iguaz.io >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh < >>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I am not sure this is the correct approach >>>>>>>>>> >>>>>>>>>> Read a text file in >>>>>>>>>> >>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Now I want to get rid of empty lines and filter only the lines >>>>>>>>>> that contain "ASE15" >>>>>>>>>> >>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")). >>>>>>>>>> >>>>>>>>>> The above works but I am not sure whether I need two filter >>>>>>>>>> transformation above? Can it be done in one? >>>>>>>>>> >>>>>>>>>> Now I want to map the above filter to lines with carriage return >>>>>>>>>> ans split them by "," >>>>>>>>>> >>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>>>>> (line.split("\t"))) >>>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] = >>>>>>>>>> MapPartitionsRDD[131] at map at <console>:30 >>>>>>>>>> >>>>>>>>>> Now I want to split the output by "," >>>>>>>>>> >>>>>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>>>>> (line.split("\t").split(","))) >>>>>>>>>> <console>:30: error: value split is not a member of Array[String] >>>>>>>>>> f.filter(_ > "").filter(_ >>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(","))) >>>>>>>>>> >>>>>>>>>> ^ >>>>>>>>>> Any advice will be appreciated >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> >>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> LinkedIn * >>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >