This is the idea I have in mind I want to go through every line
result.foreachRDD(rdd => rdd.foreach(println)) rather than print each line I want to save them temporarily and then add/append the result set (lines in RDD ) to a table for further analyses. It could be a Parquet or Hive table. So only interested in lines of interest that saves me space on the disk. I am aware that I can simply write them to text files result.saveAsTextFiles("/user/hduser/tmp/keep") Any ideas will be appreciated Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 5 April 2016 at 09:32, Sachin Aggarwal <different.sac...@gmail.com> wrote: > sure, > > this will be help full try this > > > https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter3/save_an_rdd_to_a_database.html > > On Tue, Apr 5, 2016 at 1:56 PM, Mich Talebzadeh <mich.talebza...@gmail.com > > wrote: > >> Thanks Sachin. Will test it >> >> I guess I can modify it to save the output to a Hive table as opposed to >> terminal >> >> Regards >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 5 April 2016 at 09:06, Sachin Aggarwal <different.sac...@gmail.com> >> wrote: >> >>> Hey , >>> >>> I have changed your example itself try this , it should work in terminal >>> >>> val result = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE >>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, >>> 1)).reduceByKey(_ + _) >>> result.foreachRDD(rdd => rdd.foreach(println)) >>> >>> >>> On Tue, Apr 5, 2016 at 1:00 PM, Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Thanks. >>>> >>>> Currently this is what I am doing >>>> >>>> // Get the lines >>>> // >>>> val lines = messages.map(_._2) >>>> // Check for message >>>> val showResults = lines.filter(_.contains("Sending >>>> messages")).flatMap(line => line.split("\n,")).map(word => (word, >>>> 1)).reduceByKey(_ + _).print(1000) >>>> >>>> So it prints max of 1000 lines to terminal after filter and map. Can >>>> this be done as suggested? >>>> >>>> >>>> >>>> >>>> >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> >>>> On 5 April 2016 at 06:53, Sachin Aggarwal <different.sac...@gmail.com> >>>> wrote: >>>> >>>>> Hi >>>>> >>>>> Instead of using print() directly on Dstream, I will suggest you use >>>>> foreachRDD >>>>> if you wanted to materialize all rows , example shown here:- >>>>> >>>>> >>>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams >>>>> >>>>> dstream.foreachRDD(rdd => { >>>>> val connection = createNewConnection() // executed at the driver >>>>> rdd.foreach(record => { >>>>> connection.send(record) // executed at the worker >>>>> }) >>>>> }) >>>>> >>>>> >>>>> On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh < >>>>> mich.talebza...@gmail.com> wrote: >>>>> >>>>>> I am afraid print(Integer.MAX_VALUE) does not return any lines! >>>>>> However, print(1000) does >>>>>> >>>>>> Dr Mich Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> LinkedIn * >>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>> >>>>>> >>>>>> >>>>>> http://talebzadehmich.wordpress.com >>>>>> >>>>>> >>>>>> >>>>>> On 3 April 2016 at 19:46, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>> >>>>>>> Since num is an Int, you can specify Integer.MAX_VALUE >>>>>>> >>>>>>> FYI >>>>>>> >>>>>>> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh < >>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks Ted. >>>>>>>> >>>>>>>> As I see print() materializes the first 10 rows. On the other hand >>>>>>>> print(n) will materialise n rows. >>>>>>>> >>>>>>>> How about if I wanted to materialize all rows? >>>>>>>> >>>>>>>> Cheers >>>>>>>> >>>>>>>> Dr Mich Talebzadeh >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> LinkedIn * >>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Mich: >>>>>>>>> See the following method of DStream: >>>>>>>>> >>>>>>>>> * Print the first num elements of each RDD generated in this >>>>>>>>> DStream. This is an output >>>>>>>>> * operator, so this DStream will be registered as an output >>>>>>>>> stream and there materialized. >>>>>>>>> */ >>>>>>>>> def print(num: Int): Unit = ssc.withScope { >>>>>>>>> >>>>>>>>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh < >>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> However this works. I am checking the logic to see if it does >>>>>>>>>> what I asked it to do >>>>>>>>>> >>>>>>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print >>>>>>>>>> >>>>>>>>>> scala> ssc.start() >>>>>>>>>> >>>>>>>>>> scala> ------------------------------------------- >>>>>>>>>> Time: 1459701925000 ms >>>>>>>>>> ------------------------------------------- >>>>>>>>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15 >>>>>>>>>> databases,27) >>>>>>>>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 >>>>>>>>>> OR,27) >>>>>>>>>> (Once databases are loaded to ASE 15, then you will need to >>>>>>>>>> maintain them the way you maintain your PROD. For example run UPDATE >>>>>>>>>> INDEX >>>>>>>>>> STATISTICS and REORG COMPACT as necessary. One of the frequent >>>>>>>>>> mistakes >>>>>>>>>> that people do is NOT pruning data from daily log tables in ASE 15 >>>>>>>>>> etc as >>>>>>>>>> they do it in PROD. This normally results in slower performance on >>>>>>>>>> ASE 15 >>>>>>>>>> databases as test cycles continue. Use MDA readings to measure daily >>>>>>>>>> DML >>>>>>>>>> activities on ASE 15 tables and compare them with those of PROD. A >>>>>>>>>> 24 hour >>>>>>>>>> cycle measurement should be good. If you notice that certain tables >>>>>>>>>> have >>>>>>>>>> different DML hits (insert/update/delete) compared to PROD you will >>>>>>>>>> know >>>>>>>>>> that either ASE 15 is not doing everything in terms of batch >>>>>>>>>> activity (some >>>>>>>>>> jobs are missing), or there is something inconsistent somewhere. ,27) >>>>>>>>>> (* Make sure that you have enough tempdb system segment space for >>>>>>>>>> UPDATE INDEX STATISTICS. It is always advisable to gauge the tempdb >>>>>>>>>> size >>>>>>>>>> required in ASE 15 QA and expand the tempdb database in production >>>>>>>>>> accordingly. The last thing you want is to blow up tempdb over the >>>>>>>>>> migration weekend.,27) >>>>>>>>>> (o In ASE 15 you can subdivide the task by running parallel >>>>>>>>>> UPDATE INDEX STATISTICS on different tables in the same database at >>>>>>>>>> the >>>>>>>>>> same time. Watch tempdb segment growth though! OR,27) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> LinkedIn * >>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 3 April 2016 at 17:06, Mich Talebzadeh < >>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks Ted >>>>>>>>>>> >>>>>>>>>>> This works >>>>>>>>>>> >>>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, >>>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>>>>>>>> messages: >>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, String)] = >>>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063 >>>>>>>>>>> >>>>>>>>>>> scala> // Get the lines >>>>>>>>>>> scala> val lines = messages.map(_._2) >>>>>>>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] = >>>>>>>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64 >>>>>>>>>>> >>>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) >>>>>>>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] = >>>>>>>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d >>>>>>>>>>> >>>>>>>>>>> However, this fails >>>>>>>>>>> >>>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>> <console>:43: error: value collect is not a member of >>>>>>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)] >>>>>>>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> LinkedIn * >>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> bq. is not a member of (String, String) >>>>>>>>>>>> >>>>>>>>>>>> As shown above, contains shouldn't be applied directly on a >>>>>>>>>>>> tuple. >>>>>>>>>>>> >>>>>>>>>>>> Choose the element of the tuple and then apply contains on it. >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh < >>>>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thank you gents. >>>>>>>>>>>>> >>>>>>>>>>>>> That should "\n" as carriage return >>>>>>>>>>>>> >>>>>>>>>>>>> OK I am using spark streaming to analyse the message >>>>>>>>>>>>> >>>>>>>>>>>>> It does the streaming >>>>>>>>>>>>> >>>>>>>>>>>>> import _root_.kafka.serializer.StringDecoder >>>>>>>>>>>>> import org.apache.spark.SparkConf >>>>>>>>>>>>> import org.apache.spark.streaming._ >>>>>>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils >>>>>>>>>>>>> // >>>>>>>>>>>>> scala> val sparkConf = new SparkConf(). >>>>>>>>>>>>> | setAppName("StreamTest"). >>>>>>>>>>>>> | setMaster("local[12]"). >>>>>>>>>>>>> | set("spark.driver.allowMultipleContexts", >>>>>>>>>>>>> "true"). >>>>>>>>>>>>> | set("spark.hadoop.validateOutputSpecs", >>>>>>>>>>>>> "false") >>>>>>>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55)) >>>>>>>>>>>>> scala> >>>>>>>>>>>>> scala> val kafkaParams = Map[String, >>>>>>>>>>>>> String]("bootstrap.servers" -> "rhes564:9092", >>>>>>>>>>>>> "schema.registry.url" -> " >>>>>>>>>>>>> http://rhes564:8081", "zookeeper.connect" -> "rhes564:2181", " >>>>>>>>>>>>> group.id" -> "StreamTest" ) >>>>>>>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] = >>>>>>>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url -> >>>>>>>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, >>>>>>>>>>>>> group.id -> StreamTest) >>>>>>>>>>>>> scala> val topic = Set("newtopic") >>>>>>>>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic) >>>>>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, >>>>>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>>>>>>>>>> messages: >>>>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, String)] >>>>>>>>>>>>> = >>>>>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c >>>>>>>>>>>>> >>>>>>>>>>>>> This part is tricky >>>>>>>>>>>>> >>>>>>>>>>>>> scala> val showlines = messages.filter(_ contains("ASE >>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line >>>>>>>>>>>>> => >>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>>>> <console>:47: error: value contains is not a member of >>>>>>>>>>>>> (String, String) >>>>>>>>>>>>> val showlines = messages.filter(_ contains("ASE >>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line >>>>>>>>>>>>> => >>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> How does one refer to the content of the stream here? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> // >>>>>>>>>>>>> // Now want to do some analysis on the same text file >>>>>>>>>>>>> // >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> LinkedIn * >>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> bq. split"\t," splits the filter by carriage return >>>>>>>>>>>>>> >>>>>>>>>>>>>> Minor correction: "\t" denotes tab character. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas < >>>>>>>>>>>>>> elir...@iguaz.io> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Mich, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. The first underscore in your filter call is refering to a >>>>>>>>>>>>>>> line in the file (as textFile() results in a collection of >>>>>>>>>>>>>>> strings) >>>>>>>>>>>>>>> 2. You're correct. No need for it. >>>>>>>>>>>>>>> 3. Filter is expecting a Boolean result. So you can merge >>>>>>>>>>>>>>> your contains filters to one with AND (&&) statement. >>>>>>>>>>>>>>> 4. Correct. Each character in split() is used as a divider. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Eliran Bivas >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com> >>>>>>>>>>>>>>> *Sent:* Apr 3, 2016 15:06 >>>>>>>>>>>>>>> *To:* Eliran Bivas >>>>>>>>>>>>>>> *Cc:* user @spark >>>>>>>>>>>>>>> *Subject:* Re: multiple splits fails >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Eliran, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Many thanks for your input on this. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I thought about what I was trying to achieve so I rewrote >>>>>>>>>>>>>>> the logic as follows: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. Read the text file in >>>>>>>>>>>>>>> 2. Filter out empty lines (well not really needed here) >>>>>>>>>>>>>>> 3. Search for lines that contain "ASE 15" and further >>>>>>>>>>>>>>> have sentence "UPDATE INDEX STATISTICS" in the said line >>>>>>>>>>>>>>> 4. Split the text by "\t" and "," >>>>>>>>>>>>>>> 5. Print the outcome >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This was what I did with your suggestions included >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>>>>>>>>> f.cache() >>>>>>>>>>>>>>> f.filter(_.length > 0).filter(_ contains("ASE >>>>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX >>>>>>>>>>>>>>> STATISTICS")).flatMap(line => >>>>>>>>>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Couple of questions if I may >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. I take that "_" refers to content of the file read in >>>>>>>>>>>>>>> by default? >>>>>>>>>>>>>>> 2. _.length > 0 basically filters out blank lines (not >>>>>>>>>>>>>>> really needed here) >>>>>>>>>>>>>>> 3. Multiple filters are needed for each *contains* logic >>>>>>>>>>>>>>> 4. split"\t," splits the filter by carriage return AND ,? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> LinkedIn * >>>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Mich, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Few comments: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> When doing .filter(_ > “”) you’re actually doing a >>>>>>>>>>>>>>>> lexicographic comparison and not filtering for empty lines >>>>>>>>>>>>>>>> (which could be >>>>>>>>>>>>>>>> achieved with _.notEmpty or _.length > 0). >>>>>>>>>>>>>>>> I think that filtering with _.contains should be sufficient >>>>>>>>>>>>>>>> and the first filter can be omitted. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> As for line => line.split(“\t”).split(“,”): >>>>>>>>>>>>>>>> You have to do a second map or (since split() requires a >>>>>>>>>>>>>>>> regex as input) .split(“\t,”). >>>>>>>>>>>>>>>> The problem is that your first split() call will generate >>>>>>>>>>>>>>>> an Array and then your second call will result in an error. >>>>>>>>>>>>>>>> e.g. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> val lines: Array[String] = line.split(“\t”) >>>>>>>>>>>>>>>> lines.split(“,”) // Compilation error - no method split() >>>>>>>>>>>>>>>> exists for Array >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or >>>>>>>>>>>>>>>> map(_.split(“\t,”)) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hope that helps. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *Eliran Bivas* >>>>>>>>>>>>>>>> Data Team | iguaz.io >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh < >>>>>>>>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I am not sure this is the correct approach >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Read a text file in >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Now I want to get rid of empty lines and filter only the >>>>>>>>>>>>>>>> lines that contain "ASE15" >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The above works but I am not sure whether I need two filter >>>>>>>>>>>>>>>> transformation above? Can it be done in one? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Now I want to map the above filter to lines with carriage >>>>>>>>>>>>>>>> return ans split them by "," >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>>>>>>>>>>> (line.split("\t"))) >>>>>>>>>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] = >>>>>>>>>>>>>>>> MapPartitionsRDD[131] at map at <console>:30 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Now I want to split the output by "," >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> scala> f.filter(_ > "").filter(_ >>>>>>>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(","))) >>>>>>>>>>>>>>>> <console>:30: error: value split is not a member of >>>>>>>>>>>>>>>> Array[String] >>>>>>>>>>>>>>>> f.filter(_ > "").filter(_ >>>>>>>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(","))) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ^ >>>>>>>>>>>>>>>> Any advice will be appreciated >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> LinkedIn * >>>>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Thanks & Regards >>>>> >>>>> Sachin Aggarwal >>>>> 7760502772 >>>>> >>>> >>>> >>> >>> >>> -- >>> >>> Thanks & Regards >>> >>> Sachin Aggarwal >>> 7760502772 >>> >> >> > > > -- > > Thanks & Regards > > Sachin Aggarwal > 7760502772 >