Thanks Sachin. Will test it I guess I can modify it to save the output to a Hive table as opposed to terminal
Regards Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 5 April 2016 at 09:06, Sachin Aggarwal <different.sac...@gmail.com> wrote: > Hey , > > I have changed your example itself try this , it should work in terminal > > val result = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE > INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, > 1)).reduceByKey(_ + _) > result.foreachRDD(rdd => rdd.foreach(println)) > > > On Tue, Apr 5, 2016 at 1:00 PM, Mich Talebzadeh <mich.talebza...@gmail.com > > wrote: > >> Thanks. >> >> Currently this is what I am doing >> >> // Get the lines >> // >> val lines = messages.map(_._2) >> // Check for message >> val showResults = lines.filter(_.contains("Sending >> messages")).flatMap(line => line.split("\n,")).map(word => (word, >> 1)).reduceByKey(_ + _).print(1000) >> >> So it prints max of 1000 lines to terminal after filter and map. Can this >> be done as suggested? >> >> >> >> >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 5 April 2016 at 06:53, Sachin Aggarwal <different.sac...@gmail.com> >> wrote: >> >>> Hi >>> >>> Instead of using print() directly on Dstream, I will suggest you use >>> foreachRDD >>> if you wanted to materialize all rows , example shown here:- >>> >>> >>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams >>> >>> dstream.foreachRDD(rdd => { >>> val connection = createNewConnection() // executed at the driver >>> rdd.foreach(record => { >>> connection.send(record) // executed at the worker >>> }) >>> }) >>> >>> >>> On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> I am afraid print(Integer.MAX_VALUE) does not return any lines! >>>> However, print(1000) does >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> >>>> On 3 April 2016 at 19:46, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> Since num is an Int, you can specify Integer.MAX_VALUE >>>>> >>>>> FYI >>>>> >>>>> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh < >>>>> mich.talebza...@gmail.com> wrote: >>>>> >>>>>> Thanks Ted. >>>>>> >>>>>> As I see print() materializes the first 10 rows. On the other hand >>>>>> print(n) will materialise n rows. >>>>>> >>>>>> How about if I wanted to materialize all rows? >>>>>> >>>>>> Cheers >>>>>> >>>>>> Dr Mich Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> LinkedIn * >>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>> >>>>>> >>>>>> >>>>>> http://talebzadehmich.wordpress.com >>>>>> >>>>>> >>>>>> >>>>>> On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>> >>>>>>> Mich: >>>>>>> See the following method of DStream: >>>>>>> >>>>>>> * Print the first num elements of each RDD generated in this >>>>>>> DStream. This is an output >>>>>>> * operator, so this DStream will be registered as an output >>>>>>> stream and there materialized. >>>>>>> */ >>>>>>> def print(num: Int): Unit = ssc.withScope { >>>>>>> >>>>>>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh < >>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>> >>>>>>>> However this works. I am checking the logic to see if it does what >>>>>>>> I asked it to do >>>>>>>> >>>>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print >>>>>>>> >>>>>>>> scala> ssc.start() >>>>>>>> >>>>>>>> scala> ------------------------------------------- >>>>>>>> Time: 1459701925000 ms >>>>>>>> ------------------------------------------- >>>>>>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15 >>>>>>>> databases,27) >>>>>>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 >>>>>>>> OR,27) >>>>>>>> (Once databases are loaded to ASE 15, then you will need to >>>>>>>> maintain them the way you maintain your PROD. For example run UPDATE >>>>>>>> INDEX >>>>>>>> STATISTICS and REORG COMPACT as necessary. One of the frequent mistakes >>>>>>>> that people do is NOT pruning data from daily log tables in ASE 15 etc >>>>>>>> as >>>>>>>> they do it in PROD. This normally results in slower performance on ASE >>>>>>>> 15 >>>>>>>> databases as test cycles continue. Use MDA readings to measure daily >>>>>>>> DML >>>>>>>> activities on ASE 15 tables and compare them with those of PROD. A 24 >>>>>>>> hour >>>>>>>> cycle measurement should be good. If you notice that certain tables >>>>>>>> have >>>>>>>> different DML hits (insert/update/delete) compared to PROD you will >>>>>>>> know >>>>>>>> that either ASE 15 is not doing everything in terms of batch activity >>>>>>>> (some >>>>>>>> jobs are missing), or there is something inconsistent somewhere. ,27) >>>>>>>> (* Make sure that you have enough tempdb system segment space for >>>>>>>> UPDATE INDEX STATISTICS. It is always advisable to gauge the tempdb >>>>>>>> size >>>>>>>> required in ASE 15 QA and expand the tempdb database in production >>>>>>>> accordingly. The last thing you want is to blow up tempdb over the >>>>>>>> migration weekend.,27) >>>>>>>> (o In ASE 15 you can subdivide the task by running parallel UPDATE >>>>>>>> INDEX STATISTICS on different tables in the same database at the same >>>>>>>> time. >>>>>>>> Watch tempdb segment growth though! OR,27) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Dr Mich Talebzadeh >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> LinkedIn * >>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 3 April 2016 at 17:06, Mich Talebzadeh < >>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thanks Ted >>>>>>>>> >>>>>>>>> This works >>>>>>>>> >>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, >>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String, >>>>>>>>> String)] = >>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063 >>>>>>>>> >>>>>>>>> scala> // Get the lines >>>>>>>>> scala> val lines = messages.map(_._2) >>>>>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] = >>>>>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64 >>>>>>>>> >>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) >>>>>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] = >>>>>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d >>>>>>>>> >>>>>>>>> However, this fails >>>>>>>>> >>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>> _).collect.foreach(println) >>>>>>>>> <console>:43: error: value collect is not a member of >>>>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)] >>>>>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>> _).collect.foreach(println) >>>>>>>>> >>>>>>>>> >>>>>>>>> Dr Mich Talebzadeh >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> LinkedIn * >>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> bq. is not a member of (String, String) >>>>>>>>>> >>>>>>>>>> As shown above, contains shouldn't be applied directly on a tuple. >>>>>>>>>> >>>>>>>>>> Choose the element of the tuple and then apply contains on it. >>>>>>>>>> >>>>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh < >>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Thank you gents. >>>>>>>>>>> >>>>>>>>>>> That should "\n" as carriage return >>>>>>>>>>> >>>>>>>>>>> OK I am using spark streaming to analyse the message >>>>>>>>>>> >>>>>>>>>>> It does the streaming >>>>>>>>>>> >>>>>>>>>>> import _root_.kafka.serializer.StringDecoder >>>>>>>>>>> import org.apache.spark.SparkConf >>>>>>>>>>> import org.apache.spark.streaming._ >>>>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils >>>>>>>>>>> // >>>>>>>>>>> scala> val sparkConf = new SparkConf(). >>>>>>>>>>> | setAppName("StreamTest"). >>>>>>>>>>> | setMaster("local[12]"). >>>>>>>>>>> | set("spark.driver.allowMultipleContexts", >>>>>>>>>>> "true"). >>>>>>>>>>> | set("spark.hadoop.validateOutputSpecs", >>>>>>>>>>> "false") >>>>>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55)) >>>>>>>>>>> scala> >>>>>>>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" >>>>>>>>>>> -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", >>>>>>>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> >>>>>>>>>>> "StreamTest" ) >>>>>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] = >>>>>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url -> >>>>>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id >>>>>>>>>>> -> StreamTest) >>>>>>>>>>> scala> val topic = Set("newtopic") >>>>>>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic) >>>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, >>>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>>>>>>>> messages: >>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, String)] = >>>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c >>>>>>>>>>> >>>>>>>>>>> This part is tricky >>>>>>>>>>> >>>>>>>>>>> scala> val showlines = messages.filter(_ contains("ASE >>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>> <console>:47: error: value contains is not a member of (String, >>>>>>>>>>> String) >>>>>>>>>>> val showlines = messages.filter(_ contains("ASE >>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> How does one refer to the content of the stream here? >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> // >>>>>>>>>>> // Now want to do some analysis on the same text file >>>>>>>>>>> // >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> LinkedIn * >>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> bq. split"\t," splits the filter by carriage return >>>>>>>>>>>> >>>>>>>>>>>> Minor correction: "\t" denotes tab character. >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Mich, >>>>>>>>>>>>> >>>>>>>>>>>>> 1. The first underscore in your filter call is refering to a >>>>>>>>>>>>> line in the file (as textFile() results in a collection of >>>>>>>>>>>>> strings) >>>>>>>>>>>>> 2. You're correct. No need for it. >>>>>>>>>>>>> 3. Filter is expecting a Boolean result. So you can merge your >>>>>>>>>>>>> contains filters to one with AND (&&) statement. >>>>>>>>>>>>> 4. Correct. Each character in split() is used as a divider. >>>>>>>>>>>>> >>>>>>>>>>>>> Eliran Bivas >>>>>>>>>>>>> >>>>>>>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com> >>>>>>>>>>>>> *Sent:* Apr 3, 2016 15:06 >>>>>>>>>>>>> *To:* Eliran Bivas >>>>>>>>>>>>> *Cc:* user @spark >>>>>>>>>>>>> *Subject:* Re: multiple splits fails >>>>>>>>>>>>> >>>>>>>>>>>>> Hi Eliran, >>>>>>>>>>>>> >>>>>>>>>>>>> Many thanks for your input on this. >>>>>>>>>>>>> >>>>>>>>>>>>> I thought about what I was trying to achieve so I rewrote the >>>>>>>>>>>>> logic as follows: >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 1. Read the text file in >>>>>>>>>>>>> 2. Filter out empty lines (well not really needed here) >>>>>>>>>>>>> 3. Search for lines that contain "ASE 15" and further have >>>>>>>>>>>>> sentence "UPDATE INDEX STATISTICS" in the said line >>>>>>>>>>>>> 4. Split the text by "\t" and "," >>>>>>>>>>>>> 5. Print the outcome >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> This was what I did with your suggestions included >>>>>>>>>>>>> >>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>>>>>>> f.cache() >>>>>>>>>>>>> f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_ >>>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>>>>> _).collect.foreach(println) >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Couple of questions if I may >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 1. I take that "_" refers to content of the file read in >>>>>>>>>>>>> by default? >>>>>>>>>>>>> 2. _.length > 0 basically filters out blank lines (not >>>>>>>>>>>>> really needed here) >>>>>>>>>>>>> 3. Multiple filters are needed for each *contains* logic >>>>>>>>>>>>> 4. split"\t," splits the filter by carriage return AND ,? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Regards >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> LinkedIn * >>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Mich, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Few comments: >>>>>>>>>>>>>> >>>>>>>>>>>>>> When doing .filter(_ > “”) you’re actually doing a >>>>>>>>>>>>>> lexicographic comparison and not filtering for empty lines >>>>>>>>>>>>>> (which could be >>>>>>>>>>>>>> achieved with _.notEmpty or _.length > 0). >>>>>>>>>>>>>> I think that filtering with _.contains should be sufficient >>>>>>>>>>>>>> and the first filter can be omitted. >>>>>>>>>>>>>> >>>>>>>>>>>>>> As for line => line.split(“\t”).split(“,”): >>>>>>>>>>>>>> You have to do a second map or (since split() requires a >>>>>>>>>>>>>> regex as input) .split(“\t,”). >>>>>>>>>>>>>> The problem is that your first split() call will generate an >>>>>>>>>>>>>> Array and then your second call will result in an error. >>>>>>>>>>>>>> e.g. >>>>>>>>>>>>>> >>>>>>>>>>>>>> val lines: Array[String] = line.split(“\t”) >>>>>>>>>>>>>> lines.split(“,”) // Compilation error - no method split() >>>>>>>>>>>>>> exists for Array >>>>>>>>>>>>>> >>>>>>>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or >>>>>>>>>>>>>> map(_.split(“\t,”)) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hope that helps. >>>>>>>>>>>>>> >>>>>>>>>>>>>> *Eliran Bivas* >>>>>>>>>>>>>> Data Team | iguaz.io >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh < >>>>>>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I am not sure this is the correct approach >>>>>>>>>>>>>> >>>>>>>>>>>>>> Read a text file in >>>>>>>>>>>>>> >>>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Now I want to get rid of empty lines and filter only the >>>>>>>>>>>>>> lines that contain "ASE15" >>>>>>>>>>>>>> >>>>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")). >>>>>>>>>>>>>> >>>>>>>>>>>>>> The above works but I am not sure whether I need two filter >>>>>>>>>>>>>> transformation above? Can it be done in one? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Now I want to map the above filter to lines with carriage >>>>>>>>>>>>>> return ans split them by "," >>>>>>>>>>>>>> >>>>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>>>>>>>>> (line.split("\t"))) >>>>>>>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] = >>>>>>>>>>>>>> MapPartitionsRDD[131] at map at <console>:30 >>>>>>>>>>>>>> >>>>>>>>>>>>>> Now I want to split the output by "," >>>>>>>>>>>>>> >>>>>>>>>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line >>>>>>>>>>>>>> => (line.split("\t").split(","))) >>>>>>>>>>>>>> <console>:30: error: value split is not a member of >>>>>>>>>>>>>> Array[String] >>>>>>>>>>>>>> f.filter(_ > "").filter(_ >>>>>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(","))) >>>>>>>>>>>>>> >>>>>>>>>>>>>> ^ >>>>>>>>>>>>>> Any advice will be appreciated >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> >>>>>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> LinkedIn * >>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >>> -- >>> >>> Thanks & Regards >>> >>> Sachin Aggarwal >>> 7760502772 >>> >> >> > > > -- > > Thanks & Regards > > Sachin Aggarwal > 7760502772 >