sure, this will be help full try this
https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter3/save_an_rdd_to_a_database.html On Tue, Apr 5, 2016 at 1:56 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Thanks Sachin. Will test it > > I guess I can modify it to save the output to a Hive table as opposed to > terminal > > Regards > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 5 April 2016 at 09:06, Sachin Aggarwal <different.sac...@gmail.com> > wrote: > >> Hey , >> >> I have changed your example itself try this , it should work in terminal >> >> val result = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE >> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, >> 1)).reduceByKey(_ + _) >> result.foreachRDD(rdd => rdd.foreach(println)) >> >> >> On Tue, Apr 5, 2016 at 1:00 PM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> Thanks. >>> >>> Currently this is what I am doing >>> >>> // Get the lines >>> // >>> val lines = messages.map(_._2) >>> // Check for message >>> val showResults = lines.filter(_.contains("Sending >>> messages")).flatMap(line => line.split("\n,")).map(word => (word, >>> 1)).reduceByKey(_ + _).print(1000) >>> >>> So it prints max of 1000 lines to terminal after filter and map. Can >>> this be done as suggested? >>> >>> >>> >>> >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> On 5 April 2016 at 06:53, Sachin Aggarwal <different.sac...@gmail.com> >>> wrote: >>> >>>> Hi >>>> >>>> Instead of using print() directly on Dstream, I will suggest you use >>>> foreachRDD >>>> if you wanted to materialize all rows , example shown here:- >>>> >>>> >>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams >>>> >>>> dstream.foreachRDD(rdd => { >>>> val connection = createNewConnection() // executed at the driver >>>> rdd.foreach(record => { >>>> connection.send(record) // executed at the worker >>>> }) >>>> }) >>>> >>>> >>>> On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> I am afraid print(Integer.MAX_VALUE) does not return any lines! >>>>> However, print(1000) does >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> >>>>> >>>>> LinkedIn * >>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>> >>>>> >>>>> >>>>> http://talebzadehmich.wordpress.com >>>>> >>>>> >>>>> >>>>> On 3 April 2016 at 19:46, Ted Yu <yuzhih...@gmail.com> wrote: >>>>> >>>>>> Since num is an Int, you can specify Integer.MAX_VALUE >>>>>> >>>>>> FYI >>>>>> >>>>>> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Thanks Ted. >>>>>>> >>>>>>> As I see print() materializes the first 10 rows. On the other hand >>>>>>> print(n) will materialise n rows. >>>>>>> >>>>>>> How about if I wanted to materialize all rows? >>>>>>> >>>>>>> Cheers >>>>>>> >>>>>>> Dr Mich Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> LinkedIn * >>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>> >>>>>>> >>>>>>> >>>>>>> http://talebzadehmich.wordpress.com >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>> >>>>>>>> Mich: >>>>>>>> See the following method of DStream: >>>>>>>> >>>>>>>> * Print the first num elements of each RDD generated in this >>>>>>>> DStream. This is an output >>>>>>>> * operator, so this DStream will be registered as an output >>>>>>>> stream and there materialized. >>>>>>>> */ >>>>>>>> def print(num: Int): Unit = ssc.withScope { >>>>>>>> >>>>>>>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh < >>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>> >>>>>>>>> However this works. I am checking the logic to see if it does what >>>>>>>>> I asked it to do >>>>>>>>> >>>>>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print >>>>>>>>> >>>>>>>>> scala> ssc.start() >>>>>>>>> >>>>>>>>> scala> ------------------------------------------- >>>>>>>>> Time: 1459701925000 ms >>>>>>>>> ------------------------------------------- >>>>>>>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15 >>>>>>>>> databases,27) >>>>>>>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 >>>>>>>>> OR,27) >>>>>>>>> (Once databases are loaded to ASE 15, then you will need to >>>>>>>>> maintain them the way you maintain your PROD. For example run UPDATE >>>>>>>>> INDEX >>>>>>>>> STATISTICS and REORG COMPACT as necessary. One of the frequent >>>>>>>>> mistakes >>>>>>>>> that people do is NOT pruning data from daily log tables in ASE 15 >>>>>>>>> etc as >>>>>>>>> they do it in PROD. This normally results in slower performance on >>>>>>>>> ASE 15 >>>>>>>>> databases as test cycles continue. Use MDA readings to measure daily >>>>>>>>> DML >>>>>>>>> activities on ASE 15 tables and compare them with those of PROD. A 24 >>>>>>>>> hour >>>>>>>>> cycle measurement should be good. If you notice that certain tables >>>>>>>>> have >>>>>>>>> different DML hits (insert/update/delete) compared to PROD you will >>>>>>>>> know >>>>>>>>> that either ASE 15 is not doing everything in terms of batch activity >>>>>>>>> (some >>>>>>>>> jobs are missing), or there is something inconsistent somewhere. ,27) >>>>>>>>> (* Make sure that you have enough tempdb system segment space for >>>>>>>>> UPDATE INDEX STATISTICS. It is always advisable to gauge the tempdb >>>>>>>>> size >>>>>>>>> required in ASE 15 QA and expand the tempdb database in production >>>>>>>>> accordingly. The last thing you want is to blow up tempdb over the >>>>>>>>> migration weekend.,27) >>>>>>>>> (o In ASE 15 you can subdivide the task by running parallel UPDATE >>>>>>>>> INDEX STATISTICS on different tables in the same database at the same >>>>>>>>> time. >>>>>>>>> Watch tempdb segment growth though! OR,27) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Dr Mich Talebzadeh >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> LinkedIn * >>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 3 April 2016 at 17:06, Mich Talebzadeh < >>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Thanks Ted >>>>>>>>>> >>>>>>>>>> This works >>>>>>>>>> >>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, >>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>>>>>>> messages: >>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, String)] = >>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063 >>>>>>>>>> >>>>>>>>>> scala> // Get the lines >>>>>>>>>> scala> val lines = messages.map(_._2) >>>>>>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] = >>>>>>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64 >>>>>>>>>> >>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) >>>>>>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] = >>>>>>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d >>>>>>>>>> >>>>>>>>>> However, this fails >>>>>>>>>> >>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>> _).collect.foreach(println) >>>>>>>>>> <console>:43: error: value collect is not a member of >>>>>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)] >>>>>>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>> _).collect.foreach(println) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> LinkedIn * >>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> bq. is not a member of (String, String) >>>>>>>>>>> >>>>>>>>>>> As shown above, contains shouldn't be applied directly on a >>>>>>>>>>> tuple. >>>>>>>>>>> >>>>>>>>>>> Choose the element of the tuple and then apply contains on it. >>>>>>>>>>> >>>>>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh < >>>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thank you gents. >>>>>>>>>>>> >>>>>>>>>>>> That should "\n" as carriage return >>>>>>>>>>>> >>>>>>>>>>>> OK I am using spark streaming to analyse the message >>>>>>>>>>>> >>>>>>>>>>>> It does the streaming >>>>>>>>>>>> >>>>>>>>>>>> import _root_.kafka.serializer.StringDecoder >>>>>>>>>>>> import org.apache.spark.SparkConf >>>>>>>>>>>> import org.apache.spark.streaming._ >>>>>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils >>>>>>>>>>>> // >>>>>>>>>>>> scala> val sparkConf = new SparkConf(). >>>>>>>>>>>> | setAppName("StreamTest"). >>>>>>>>>>>> | setMaster("local[12]"). >>>>>>>>>>>> | set("spark.driver.allowMultipleContexts", >>>>>>>>>>>> "true"). >>>>>>>>>>>> | set("spark.hadoop.validateOutputSpecs", >>>>>>>>>>>> "false") >>>>>>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55)) >>>>>>>>>>>> scala> >>>>>>>>>>>> scala> val kafkaParams = Map[String, >>>>>>>>>>>> String]("bootstrap.servers" -> "rhes564:9092", >>>>>>>>>>>> "schema.registry.url" -> " >>>>>>>>>>>> http://rhes564:8081", "zookeeper.connect" -> "rhes564:2181", " >>>>>>>>>>>> group.id" -> "StreamTest" ) >>>>>>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] = >>>>>>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url -> >>>>>>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, >>>>>>>>>>>> group.id -> StreamTest) >>>>>>>>>>>> scala> val topic = Set("newtopic") >>>>>>>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic) >>>>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, >>>>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>>>>>>>>> messages: >>>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, String)] = >>>>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c >>>>>>>>>>>> >>>>>>>>>>>> This part is tricky >>>>>>>>>>>> >>>>>>>>>>>> scala> val showlines = messages.filter(_ contains("ASE >>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>>> <console>:47: error: value contains is not a member of (String, >>>>>>>>>>>> String) >>>>>>>>>>>> val showlines = messages.filter(_ contains("ASE >>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> How does one refer to the content of the stream here? >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> // >>>>>>>>>>>> // Now want to do some analysis on the same text file >>>>>>>>>>>> // >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> LinkedIn * >>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> bq. split"\t," splits the filter by carriage return >>>>>>>>>>>>> >>>>>>>>>>>>> Minor correction: "\t" denotes tab character. >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io >>>>>>>>>>>>> > wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Mich, >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. The first underscore in your filter call is refering to a >>>>>>>>>>>>>> line in the file (as textFile() results in a collection of >>>>>>>>>>>>>> strings) >>>>>>>>>>>>>> 2. You're correct. No need for it. >>>>>>>>>>>>>> 3. Filter is expecting a Boolean result. So you can merge >>>>>>>>>>>>>> your contains filters to one with AND (&&) statement. >>>>>>>>>>>>>> 4. Correct. Each character in split() is used as a divider. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Eliran Bivas >>>>>>>>>>>>>> >>>>>>>>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com> >>>>>>>>>>>>>> *Sent:* Apr 3, 2016 15:06 >>>>>>>>>>>>>> *To:* Eliran Bivas >>>>>>>>>>>>>> *Cc:* user @spark >>>>>>>>>>>>>> *Subject:* Re: multiple splits fails >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Eliran, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Many thanks for your input on this. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I thought about what I was trying to achieve so I rewrote the >>>>>>>>>>>>>> logic as follows: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. Read the text file in >>>>>>>>>>>>>> 2. Filter out empty lines (well not really needed here) >>>>>>>>>>>>>> 3. Search for lines that contain "ASE 15" and further >>>>>>>>>>>>>> have sentence "UPDATE INDEX STATISTICS" in the said line >>>>>>>>>>>>>> 4. Split the text by "\t" and "," >>>>>>>>>>>>>> 5. Print the outcome >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> This was what I did with your suggestions included >>>>>>>>>>>>>> >>>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>>>>>>>> f.cache() >>>>>>>>>>>>>> f.filter(_.length > 0).filter(_ contains("ASE >>>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line >>>>>>>>>>>>>> => >>>>>>>>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Couple of questions if I may >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. I take that "_" refers to content of the file read in >>>>>>>>>>>>>> by default? >>>>>>>>>>>>>> 2. _.length > 0 basically filters out blank lines (not >>>>>>>>>>>>>> really needed here) >>>>>>>>>>>>>> 3. Multiple filters are needed for each *contains* logic >>>>>>>>>>>>>> 4. split"\t," splits the filter by carriage return AND ,? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> LinkedIn * >>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Mich, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Few comments: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> When doing .filter(_ > “”) you’re actually doing a >>>>>>>>>>>>>>> lexicographic comparison and not filtering for empty lines >>>>>>>>>>>>>>> (which could be >>>>>>>>>>>>>>> achieved with _.notEmpty or _.length > 0). >>>>>>>>>>>>>>> I think that filtering with _.contains should be sufficient >>>>>>>>>>>>>>> and the first filter can be omitted. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> As for line => line.split(“\t”).split(“,”): >>>>>>>>>>>>>>> You have to do a second map or (since split() requires a >>>>>>>>>>>>>>> regex as input) .split(“\t,”). >>>>>>>>>>>>>>> The problem is that your first split() call will generate an >>>>>>>>>>>>>>> Array and then your second call will result in an error. >>>>>>>>>>>>>>> e.g. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> val lines: Array[String] = line.split(“\t”) >>>>>>>>>>>>>>> lines.split(“,”) // Compilation error - no method split() >>>>>>>>>>>>>>> exists for Array >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or >>>>>>>>>>>>>>> map(_.split(“\t,”)) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hope that helps. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Eliran Bivas* >>>>>>>>>>>>>>> Data Team | iguaz.io >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh < >>>>>>>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am not sure this is the correct approach >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Read a text file in >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Now I want to get rid of empty lines and filter only the >>>>>>>>>>>>>>> lines that contain "ASE15" >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The above works but I am not sure whether I need two filter >>>>>>>>>>>>>>> transformation above? Can it be done in one? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Now I want to map the above filter to lines with carriage >>>>>>>>>>>>>>> return ans split them by "," >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>>>>>>>>>> (line.split("\t"))) >>>>>>>>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] = >>>>>>>>>>>>>>> MapPartitionsRDD[131] at map at <console>:30 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Now I want to split the output by "," >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line >>>>>>>>>>>>>>> => (line.split("\t").split(","))) >>>>>>>>>>>>>>> <console>:30: error: value split is not a member of >>>>>>>>>>>>>>> Array[String] >>>>>>>>>>>>>>> f.filter(_ > "").filter(_ >>>>>>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(","))) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ^ >>>>>>>>>>>>>>> Any advice will be appreciated >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> LinkedIn * >>>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> >>>> Thanks & Regards >>>> >>>> Sachin Aggarwal >>>> 7760502772 >>>> >>> >>> >> >> >> -- >> >> Thanks & Regards >> >> Sachin Aggarwal >> 7760502772 >> > > -- Thanks & Regards Sachin Aggarwal 7760502772