Thanks Sachin. Will test it

I guess I can modify it to save the output to a Hive table as opposed to
terminal

Regards

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 5 April 2016 at 09:06, Sachin Aggarwal <different.sac...@gmail.com>
wrote:

> Hey ,
>
> I have changed your example itself try this , it should work in terminal
>
> val result = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE 
> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, 
> 1)).reduceByKey(_ + _)
> result.foreachRDD(rdd => rdd.foreach(println))
>
>
> On Tue, Apr 5, 2016 at 1:00 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> > wrote:
>
>> Thanks.
>>
>> Currently this is what I am doing
>>
>> // Get the lines
>> //
>> val lines = messages.map(_._2)
>> // Check for message
>> val showResults = lines.filter(_.contains("Sending
>> messages")).flatMap(line => line.split("\n,")).map(word => (word,
>> 1)).reduceByKey(_ + _).print(1000)
>>
>> So it prints max of 1000 lines to terminal after filter and map. Can this
>> be done as suggested?
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 5 April 2016 at 06:53, Sachin Aggarwal <different.sac...@gmail.com>
>> wrote:
>>
>>> Hi
>>>
>>> Instead of using print() directly on Dstream, I will suggest you use 
>>> foreachRDD
>>> if you  wanted to materialize all rows , example shown here:-
>>>
>>>
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>>>
>>> dstream.foreachRDD(rdd => {
>>>       val connection = createNewConnection()  // executed at the driver
>>>       rdd.foreach(record => {
>>>           connection.send(record) // executed at the worker
>>>       })
>>>   })
>>>
>>>
>>> On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> I am afraid print(Integer.MAX_VALUE) does not return any lines!
>>>> However, print(1000) does
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 3 April 2016 at 19:46, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> Since num is an Int, you can specify Integer.MAX_VALUE
>>>>>
>>>>> FYI
>>>>>
>>>>> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> Thanks Ted.
>>>>>>
>>>>>> As I see print() materializes the first 10 rows. On the other hand
>>>>>> print(n) will materialise n rows.
>>>>>>
>>>>>> How about if I wanted to materialize all rows?
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * 
>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>
>>>>>>> Mich:
>>>>>>> See the following method of DStream:
>>>>>>>
>>>>>>>    * Print the first num elements of each RDD generated in this
>>>>>>> DStream. This is an output
>>>>>>>    * operator, so this DStream will be registered as an output
>>>>>>> stream and there materialized.
>>>>>>>    */
>>>>>>>   def print(num: Int): Unit = ssc.withScope {
>>>>>>>
>>>>>>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <
>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>
>>>>>>>> However this works. I am checking the logic to see if it does what
>>>>>>>> I asked it to do
>>>>>>>>
>>>>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print
>>>>>>>>
>>>>>>>> scala> ssc.start()
>>>>>>>>
>>>>>>>> scala> -------------------------------------------
>>>>>>>> Time: 1459701925000 ms
>>>>>>>> -------------------------------------------
>>>>>>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15
>>>>>>>> databases,27)
>>>>>>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15
>>>>>>>> OR,27)
>>>>>>>> (Once databases are loaded to ASE 15, then you will need to
>>>>>>>> maintain them the way you maintain your PROD. For example run UPDATE 
>>>>>>>> INDEX
>>>>>>>> STATISTICS and REORG COMPACT as necessary. One of the frequent mistakes
>>>>>>>> that people do is NOT pruning data from daily log tables in ASE 15 etc 
>>>>>>>> as
>>>>>>>> they do it in PROD. This normally results in slower performance on ASE 
>>>>>>>> 15
>>>>>>>> databases as test cycles continue. Use MDA readings to measure daily 
>>>>>>>> DML
>>>>>>>> activities on ASE 15 tables and compare them with those of PROD. A 24 
>>>>>>>> hour
>>>>>>>> cycle measurement should be good. If you notice that certain tables 
>>>>>>>> have
>>>>>>>> different DML hits (insert/update/delete) compared to PROD you will 
>>>>>>>> know
>>>>>>>> that either ASE 15 is not doing everything in terms of batch activity 
>>>>>>>> (some
>>>>>>>> jobs are missing), or there is something inconsistent somewhere. ,27)
>>>>>>>> (* Make sure that you have enough tempdb system segment space for
>>>>>>>> UPDATE INDEX STATISTICS. It is always advisable to gauge the tempdb 
>>>>>>>> size
>>>>>>>> required in ASE 15 QA and expand the tempdb database in production
>>>>>>>> accordingly. The last thing you want is to blow up tempdb over the
>>>>>>>> migration weekend.,27)
>>>>>>>> (o In ASE 15 you can subdivide the task by running parallel UPDATE
>>>>>>>> INDEX STATISTICS on different tables in the same database at the same 
>>>>>>>> time.
>>>>>>>> Watch tempdb segment growth though! OR,27)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn * 
>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 3 April 2016 at 17:06, Mich Talebzadeh <
>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks Ted
>>>>>>>>>
>>>>>>>>> This works
>>>>>>>>>
>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String,
>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>>>>>>> String)] = 
>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063
>>>>>>>>>
>>>>>>>>> scala> // Get the lines
>>>>>>>>> scala> val lines = messages.map(_._2)
>>>>>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] =
>>>>>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64
>>>>>>>>>
>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
>>>>>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
>>>>>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>>>>>>>>>
>>>>>>>>> However, this fails
>>>>>>>>>
>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>> _).collect.foreach(println)
>>>>>>>>> <console>:43: error: value collect is not a member of
>>>>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>>>>>>>          val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn * 
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> bq. is not a member of (String, String)
>>>>>>>>>>
>>>>>>>>>> As shown above, contains shouldn't be applied directly on a tuple.
>>>>>>>>>>
>>>>>>>>>> Choose the element of the tuple and then apply contains on it.
>>>>>>>>>>
>>>>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thank you gents.
>>>>>>>>>>>
>>>>>>>>>>> That should "\n" as carriage return
>>>>>>>>>>>
>>>>>>>>>>> OK I am using spark streaming to analyse the message
>>>>>>>>>>>
>>>>>>>>>>> It does the streaming
>>>>>>>>>>>
>>>>>>>>>>> import _root_.kafka.serializer.StringDecoder
>>>>>>>>>>> import org.apache.spark.SparkConf
>>>>>>>>>>> import org.apache.spark.streaming._
>>>>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>>>>>>> //
>>>>>>>>>>> scala> val sparkConf = new SparkConf().
>>>>>>>>>>>      |              setAppName("StreamTest").
>>>>>>>>>>>      |              setMaster("local[12]").
>>>>>>>>>>>      |              set("spark.driver.allowMultipleContexts",
>>>>>>>>>>> "true").
>>>>>>>>>>>      |              set("spark.hadoop.validateOutputSpecs",
>>>>>>>>>>> "false")
>>>>>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>>>>>>>> scala>
>>>>>>>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers"
>>>>>>>>>>> -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>>>>>>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" ->
>>>>>>>>>>> "StreamTest" )
>>>>>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>>>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>>>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id
>>>>>>>>>>> -> StreamTest)
>>>>>>>>>>> scala> val topic = Set("newtopic")
>>>>>>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String,
>>>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>>>>>> messages:
>>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, String)] =
>>>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>>>>>>>>>
>>>>>>>>>>> This part is tricky
>>>>>>>>>>>
>>>>>>>>>>> scala> val showlines = messages.filter(_ contains("ASE
>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>> <console>:47: error: value contains is not a member of (String,
>>>>>>>>>>> String)
>>>>>>>>>>>          val showlines = messages.filter(_ contains("ASE
>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> How does one refer to the content of the stream here?
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> //
>>>>>>>>>>> // Now want to do some analysis on the same text file
>>>>>>>>>>> //
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> bq. split"\t," splits the filter by carriage return
>>>>>>>>>>>>
>>>>>>>>>>>> Minor correction: "\t" denotes tab character.
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Mich,
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. The first underscore in your filter call is refering to a
>>>>>>>>>>>>> line in the file (as textFile() results in a collection of 
>>>>>>>>>>>>> strings)
>>>>>>>>>>>>> 2. You're correct. No need for it.
>>>>>>>>>>>>> 3. Filter is expecting a Boolean result. So you can merge your
>>>>>>>>>>>>> contains filters to one with AND (&&) statement.
>>>>>>>>>>>>> 4. Correct. Each character in split() is used as a divider.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Eliran Bivas
>>>>>>>>>>>>>
>>>>>>>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>>>>>>>>> *Sent:* Apr 3, 2016 15:06
>>>>>>>>>>>>> *To:* Eliran Bivas
>>>>>>>>>>>>> *Cc:* user @spark
>>>>>>>>>>>>> *Subject:* Re: multiple splits fails
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Eliran,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Many thanks for your input on this.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I thought about what I was trying to achieve so I rewrote the
>>>>>>>>>>>>> logic as follows:
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>    1. Read the text file in
>>>>>>>>>>>>>    2. Filter out empty lines (well not really needed here)
>>>>>>>>>>>>>    3. Search for lines that contain "ASE 15" and further have
>>>>>>>>>>>>>    sentence "UPDATE INDEX STATISTICS" in the said line
>>>>>>>>>>>>>    4. Split the text by "\t" and ","
>>>>>>>>>>>>>    5. Print the outcome
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> This was what I did with your suggestions included
>>>>>>>>>>>>>
>>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>>>>>> f.cache()
>>>>>>>>>>>>>  f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_
>>>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Couple of questions if I may
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>    1. I take that "_" refers to content of the file read in
>>>>>>>>>>>>>    by default?
>>>>>>>>>>>>>    2. _.length > 0 basically filters out blank lines (not
>>>>>>>>>>>>>    really needed here)
>>>>>>>>>>>>>    3. Multiple filters are needed for each *contains* logic
>>>>>>>>>>>>>    4. split"\t," splits the filter by carriage return AND ,?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Mich,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Few comments:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When doing .filter(_ > “”) you’re actually doing a
>>>>>>>>>>>>>> lexicographic comparison and not filtering for empty lines 
>>>>>>>>>>>>>> (which could be
>>>>>>>>>>>>>> achieved with _.notEmpty or _.length > 0).
>>>>>>>>>>>>>> I think that filtering with _.contains should be sufficient
>>>>>>>>>>>>>> and the first filter can be omitted.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As for line => line.split(“\t”).split(“,”):
>>>>>>>>>>>>>> You have to do a second map or (since split() requires a
>>>>>>>>>>>>>> regex as input) .split(“\t,”).
>>>>>>>>>>>>>> The problem is that your first split() call will generate an
>>>>>>>>>>>>>> Array and then your second call will result in an error.
>>>>>>>>>>>>>> e.g.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> val lines: Array[String] = line.split(“\t”)
>>>>>>>>>>>>>> lines.split(“,”) // Compilation error - no method split()
>>>>>>>>>>>>>> exists for Array
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>>>>>>>>>>>>>> map(_.split(“\t,”))
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hope that helps.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *Eliran Bivas*
>>>>>>>>>>>>>> Data Team | iguaz.io
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <
>>>>>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am not sure this is the correct approach
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Read a text file in
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Now I want to get rid of empty lines and filter only the
>>>>>>>>>>>>>> lines that contain "ASE15"
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The above works but I am not sure whether I need two filter
>>>>>>>>>>>>>> transformation above? Can it be done in one?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Now I want to map the above filter to lines with carriage
>>>>>>>>>>>>>> return ans split them by ","
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>>>>>>>>> (line.split("\t")))
>>>>>>>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] =
>>>>>>>>>>>>>> MapPartitionsRDD[131] at map at <console>:30
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Now I want to split the output by ","
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line
>>>>>>>>>>>>>> => (line.split("\t").split(",")))
>>>>>>>>>>>>>> <console>:30: error: value split is not a member of
>>>>>>>>>>>>>> Array[String]
>>>>>>>>>>>>>>               f.filter(_ > "").filter(_
>>>>>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(",")))
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ^
>>>>>>>>>>>>>> Any advice will be appreciated
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> Thanks & Regards
>>>
>>> Sachin Aggarwal
>>> 7760502772
>>>
>>
>>
>
>
> --
>
> Thanks & Regards
>
> Sachin Aggarwal
> 7760502772
>

Reply via email to