If I go through each RDD I get val result = lines.filter(_.contains("Sending messages")).flatMap(line => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) scala> result.foreachRDD( rdd => { | for(item <- rdd.collect().toArray) { | println(item); | } | })
Rather than println(items), I want to store the data temporarily and then flush the results to an HDFS file as an append Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 5 April 2016 at 14:02, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > This is the idea I have in mind > > I want to go through every line > > result.foreachRDD(rdd => rdd.foreach(println)) > > rather than print each line I want to save them temporarily and then > add/append the result set (lines in RDD ) to a table for further analyses. > It could be a Parquet or Hive table. > > So only interested in lines of interest that saves me space on the disk. > > I am aware that I can simply write them to text files > > result.saveAsTextFiles("/user/hduser/tmp/keep") > > > Any ideas will be appreciated > > Thanks > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 5 April 2016 at 09:32, Sachin Aggarwal <different.sac...@gmail.com> > wrote: > >> sure, >> >> this will be help full try this >> >> >> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter3/save_an_rdd_to_a_database.html >> >> On Tue, Apr 5, 2016 at 1:56 PM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> Thanks Sachin. Will test it >>> >>> I guess I can modify it to save the output to a Hive table as opposed to >>> terminal >>> >>> Regards >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> On 5 April 2016 at 09:06, Sachin Aggarwal <different.sac...@gmail.com> >>> wrote: >>> >>>> Hey , >>>> >>>> I have changed your example itself try this , it should work in >>>> terminal >>>> >>>> val result = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE >>>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, >>>> 1)).reduceByKey(_ + _) >>>> result.foreachRDD(rdd => rdd.foreach(println)) >>>> >>>> >>>> On Tue, Apr 5, 2016 at 1:00 PM, Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> Thanks. >>>>> >>>>> Currently this is what I am doing >>>>> >>>>> // Get the lines >>>>> // >>>>> val lines = messages.map(_._2) >>>>> // Check for message >>>>> val showResults = lines.filter(_.contains("Sending >>>>> messages")).flatMap(line => line.split("\n,")).map(word => (word, >>>>> 1)).reduceByKey(_ + _).print(1000) >>>>> >>>>> So it prints max of 1000 lines to terminal after filter and map. Can >>>>> this be done as suggested? >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> >>>>> >>>>> LinkedIn * >>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>> >>>>> >>>>> >>>>> http://talebzadehmich.wordpress.com >>>>> >>>>> >>>>> >>>>> On 5 April 2016 at 06:53, Sachin Aggarwal <different.sac...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi >>>>>> >>>>>> Instead of using print() directly on Dstream, I will suggest you use >>>>>> foreachRDD >>>>>> if you wanted to materialize all rows , example shown here:- >>>>>> >>>>>> >>>>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams >>>>>> >>>>>> dstream.foreachRDD(rdd => { >>>>>> val connection = createNewConnection() // executed at the driver >>>>>> rdd.foreach(record => { >>>>>> connection.send(record) // executed at the worker >>>>>> }) >>>>>> }) >>>>>> >>>>>> >>>>>> On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> I am afraid print(Integer.MAX_VALUE) does not return any lines! >>>>>>> However, print(1000) does >>>>>>> >>>>>>> Dr Mich Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> LinkedIn * >>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>> >>>>>>> >>>>>>> >>>>>>> http://talebzadehmich.wordpress.com >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 3 April 2016 at 19:46, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>> >>>>>>>> Since num is an Int, you can specify Integer.MAX_VALUE >>>>>>>> >>>>>>>> FYI >>>>>>>> >>>>>>>> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh < >>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thanks Ted. >>>>>>>>> >>>>>>>>> As I see print() materializes the first 10 rows. On the other >>>>>>>>> hand print(n) will materialise n rows. >>>>>>>>> >>>>>>>>> How about if I wanted to materialize all rows? >>>>>>>>> >>>>>>>>> Cheers >>>>>>>>> >>>>>>>>> Dr Mich Talebzadeh >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> LinkedIn * >>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Mich: >>>>>>>>>> See the following method of DStream: >>>>>>>>>> >>>>>>>>>> * Print the first num elements of each RDD generated in this >>>>>>>>>> DStream. This is an output >>>>>>>>>> * operator, so this DStream will be registered as an output >>>>>>>>>> stream and there materialized. >>>>>>>>>> */ >>>>>>>>>> def print(num: Int): Unit = ssc.withScope { >>>>>>>>>> >>>>>>>>>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh < >>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> However this works. I am checking the logic to see if it does >>>>>>>>>>> what I asked it to do >>>>>>>>>>> >>>>>>>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print >>>>>>>>>>> >>>>>>>>>>> scala> ssc.start() >>>>>>>>>>> >>>>>>>>>>> scala> ------------------------------------------- >>>>>>>>>>> Time: 1459701925000 ms >>>>>>>>>>> ------------------------------------------- >>>>>>>>>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15 >>>>>>>>>>> databases,27) >>>>>>>>>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 >>>>>>>>>>> OR,27) >>>>>>>>>>> (Once databases are loaded to ASE 15, then you will need to >>>>>>>>>>> maintain them the way you maintain your PROD. For example run >>>>>>>>>>> UPDATE INDEX >>>>>>>>>>> STATISTICS and REORG COMPACT as necessary. One of the frequent >>>>>>>>>>> mistakes >>>>>>>>>>> that people do is NOT pruning data from daily log tables in ASE 15 >>>>>>>>>>> etc as >>>>>>>>>>> they do it in PROD. This normally results in slower performance on >>>>>>>>>>> ASE 15 >>>>>>>>>>> databases as test cycles continue. Use MDA readings to measure >>>>>>>>>>> daily DML >>>>>>>>>>> activities on ASE 15 tables and compare them with those of PROD. A >>>>>>>>>>> 24 hour >>>>>>>>>>> cycle measurement should be good. If you notice that certain tables >>>>>>>>>>> have >>>>>>>>>>> different DML hits (insert/update/delete) compared to PROD you will >>>>>>>>>>> know >>>>>>>>>>> that either ASE 15 is not doing everything in terms of batch >>>>>>>>>>> activity (some >>>>>>>>>>> jobs are missing), or there is something inconsistent somewhere. >>>>>>>>>>> ,27) >>>>>>>>>>> (* Make sure that you have enough tempdb system segment space >>>>>>>>>>> for UPDATE INDEX STATISTICS. It is always advisable to gauge the >>>>>>>>>>> tempdb >>>>>>>>>>> size required in ASE 15 QA and expand the tempdb database in >>>>>>>>>>> production >>>>>>>>>>> accordingly. The last thing you want is to blow up tempdb over the >>>>>>>>>>> migration weekend.,27) >>>>>>>>>>> (o In ASE 15 you can subdivide the task by running parallel >>>>>>>>>>> UPDATE INDEX STATISTICS on different tables in the same database at >>>>>>>>>>> the >>>>>>>>>>> same time. Watch tempdb segment growth though! OR,27) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> LinkedIn * >>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 3 April 2016 at 17:06, Mich Talebzadeh < >>>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks Ted >>>>>>>>>>>> >>>>>>>>>>>> This works >>>>>>>>>>>> >>>>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, >>>>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>>>>>>>>> messages: >>>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, String)] = >>>>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063 >>>>>>>>>>>> >>>>>>>>>>>> scala> // Get the lines >>>>>>>>>>>> scala> val lines = messages.map(_._2) >>>>>>>>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] = >>>>>>>>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64 >>>>>>>>>>>> >>>>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) >>>>>>>>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] = >>>>>>>>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d >>>>>>>>>>>> >>>>>>>>>>>> However, this fails >>>>>>>>>>>> >>>>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>>> <console>:43: error: value collect is not a member of >>>>>>>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)] >>>>>>>>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> LinkedIn * >>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> bq. is not a member of (String, String) >>>>>>>>>>>>> >>>>>>>>>>>>> As shown above, contains shouldn't be applied directly on a >>>>>>>>>>>>> tuple. >>>>>>>>>>>>> >>>>>>>>>>>>> Choose the element of the tuple and then apply contains on it. >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh < >>>>>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thank you gents. >>>>>>>>>>>>>> >>>>>>>>>>>>>> That should "\n" as carriage return >>>>>>>>>>>>>> >>>>>>>>>>>>>> OK I am using spark streaming to analyse the message >>>>>>>>>>>>>> >>>>>>>>>>>>>> It does the streaming >>>>>>>>>>>>>> >>>>>>>>>>>>>> import _root_.kafka.serializer.StringDecoder >>>>>>>>>>>>>> import org.apache.spark.SparkConf >>>>>>>>>>>>>> import org.apache.spark.streaming._ >>>>>>>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils >>>>>>>>>>>>>> // >>>>>>>>>>>>>> scala> val sparkConf = new SparkConf(). >>>>>>>>>>>>>> | setAppName("StreamTest"). >>>>>>>>>>>>>> | setMaster("local[12]"). >>>>>>>>>>>>>> | set("spark.driver.allowMultipleContexts", >>>>>>>>>>>>>> "true"). >>>>>>>>>>>>>> | set("spark.hadoop.validateOutputSpecs", >>>>>>>>>>>>>> "false") >>>>>>>>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55)) >>>>>>>>>>>>>> scala> >>>>>>>>>>>>>> scala> val kafkaParams = Map[String, >>>>>>>>>>>>>> String]("bootstrap.servers" -> "rhes564:9092", >>>>>>>>>>>>>> "schema.registry.url" -> " >>>>>>>>>>>>>> http://rhes564:8081", "zookeeper.connect" -> "rhes564:2181", >>>>>>>>>>>>>> "group.id" -> "StreamTest" ) >>>>>>>>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] = >>>>>>>>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url -> >>>>>>>>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, >>>>>>>>>>>>>> group.id -> StreamTest) >>>>>>>>>>>>>> scala> val topic = Set("newtopic") >>>>>>>>>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic) >>>>>>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, >>>>>>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>>>>>>>>>>> messages: >>>>>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, >>>>>>>>>>>>>> String)] = >>>>>>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c >>>>>>>>>>>>>> >>>>>>>>>>>>>> This part is tricky >>>>>>>>>>>>>> >>>>>>>>>>>>>> scala> val showlines = messages.filter(_ contains("ASE >>>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line >>>>>>>>>>>>>> => >>>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>>>>> <console>:47: error: value contains is not a member of >>>>>>>>>>>>>> (String, String) >>>>>>>>>>>>>> val showlines = messages.filter(_ contains("ASE >>>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line >>>>>>>>>>>>>> => >>>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> How does one refer to the content of the stream here? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> // >>>>>>>>>>>>>> // Now want to do some analysis on the same text file >>>>>>>>>>>>>> // >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> LinkedIn * >>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> bq. split"\t," splits the filter by carriage return >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Minor correction: "\t" denotes tab character. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas < >>>>>>>>>>>>>>> elir...@iguaz.io> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Mich, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1. The first underscore in your filter call is refering to >>>>>>>>>>>>>>>> a line in the file (as textFile() results in a collection of >>>>>>>>>>>>>>>> strings) >>>>>>>>>>>>>>>> 2. You're correct. No need for it. >>>>>>>>>>>>>>>> 3. Filter is expecting a Boolean result. So you can merge >>>>>>>>>>>>>>>> your contains filters to one with AND (&&) statement. >>>>>>>>>>>>>>>> 4. Correct. Each character in split() is used as a divider. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Eliran Bivas >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com> >>>>>>>>>>>>>>>> *Sent:* Apr 3, 2016 15:06 >>>>>>>>>>>>>>>> *To:* Eliran Bivas >>>>>>>>>>>>>>>> *Cc:* user @spark >>>>>>>>>>>>>>>> *Subject:* Re: multiple splits fails >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Eliran, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Many thanks for your input on this. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I thought about what I was trying to achieve so I rewrote >>>>>>>>>>>>>>>> the logic as follows: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1. Read the text file in >>>>>>>>>>>>>>>> 2. Filter out empty lines (well not really needed here) >>>>>>>>>>>>>>>> 3. Search for lines that contain "ASE 15" and further >>>>>>>>>>>>>>>> have sentence "UPDATE INDEX STATISTICS" in the said line >>>>>>>>>>>>>>>> 4. Split the text by "\t" and "," >>>>>>>>>>>>>>>> 5. Print the outcome >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This was what I did with your suggestions included >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>>>>>>>>>> f.cache() >>>>>>>>>>>>>>>> f.filter(_.length > 0).filter(_ contains("ASE >>>>>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX >>>>>>>>>>>>>>>> STATISTICS")).flatMap(line => >>>>>>>>>>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Couple of questions if I may >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1. I take that "_" refers to content of the file read >>>>>>>>>>>>>>>> in by default? >>>>>>>>>>>>>>>> 2. _.length > 0 basically filters out blank lines (not >>>>>>>>>>>>>>>> really needed here) >>>>>>>>>>>>>>>> 3. Multiple filters are needed for each *contains* logic >>>>>>>>>>>>>>>> 4. split"\t," splits the filter by carriage return AND >>>>>>>>>>>>>>>> ,? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regards >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> LinkedIn * >>>>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Mich, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Few comments: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> When doing .filter(_ > “”) you’re actually doing a >>>>>>>>>>>>>>>>> lexicographic comparison and not filtering for empty lines >>>>>>>>>>>>>>>>> (which could be >>>>>>>>>>>>>>>>> achieved with _.notEmpty or _.length > 0). >>>>>>>>>>>>>>>>> I think that filtering with _.contains should be >>>>>>>>>>>>>>>>> sufficient and the first filter can be omitted. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> As for line => line.split(“\t”).split(“,”): >>>>>>>>>>>>>>>>> You have to do a second map or (since split() requires a >>>>>>>>>>>>>>>>> regex as input) .split(“\t,”). >>>>>>>>>>>>>>>>> The problem is that your first split() call will generate >>>>>>>>>>>>>>>>> an Array and then your second call will result in an error. >>>>>>>>>>>>>>>>> e.g. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> val lines: Array[String] = line.split(“\t”) >>>>>>>>>>>>>>>>> lines.split(“,”) // Compilation error - no method split() >>>>>>>>>>>>>>>>> exists for Array >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or >>>>>>>>>>>>>>>>> map(_.split(“\t,”)) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hope that helps. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> *Eliran Bivas* >>>>>>>>>>>>>>>>> Data Team | iguaz.io >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh < >>>>>>>>>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I am not sure this is the correct approach >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Read a text file in >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Now I want to get rid of empty lines and filter only the >>>>>>>>>>>>>>>>> lines that contain "ASE15" >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The above works but I am not sure whether I need two >>>>>>>>>>>>>>>>> filter transformation above? Can it be done in one? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Now I want to map the above filter to lines with carriage >>>>>>>>>>>>>>>>> return ans split them by "," >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>>>>>>>>>>>> (line.split("\t"))) >>>>>>>>>>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] = >>>>>>>>>>>>>>>>> MapPartitionsRDD[131] at map at <console>:30 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Now I want to split the output by "," >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> scala> f.filter(_ > "").filter(_ >>>>>>>>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(","))) >>>>>>>>>>>>>>>>> <console>:30: error: value split is not a member of >>>>>>>>>>>>>>>>> Array[String] >>>>>>>>>>>>>>>>> f.filter(_ > "").filter(_ >>>>>>>>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(","))) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ^ >>>>>>>>>>>>>>>>> Any advice will be appreciated >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> LinkedIn * >>>>>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Thanks & Regards >>>>>> >>>>>> Sachin Aggarwal >>>>>> 7760502772 >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> >>>> Thanks & Regards >>>> >>>> Sachin Aggarwal >>>> 7760502772 >>>> >>> >>> >> >> >> -- >> >> Thanks & Regards >> >> Sachin Aggarwal >> 7760502772 >> > >