This is the idea I have in mind

I want to go through every line

result.foreachRDD(rdd => rdd.foreach(println))

rather than print each line I want to save them temporarily and then
add/append the result set (lines in RDD ) to a table for further analyses.
It could be a Parquet or Hive table.

So only interested in lines of interest that saves me space on the disk.

I am aware that I can simply write them to text files

result.saveAsTextFiles("/user/hduser/tmp/keep")


Any ideas will be appreciated

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 5 April 2016 at 09:32, Sachin Aggarwal <different.sac...@gmail.com>
wrote:

> sure,
>
> this will be help full try this
>
>
> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter3/save_an_rdd_to_a_database.html
>
> On Tue, Apr 5, 2016 at 1:56 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> > wrote:
>
>> Thanks Sachin. Will test it
>>
>> I guess I can modify it to save the output to a Hive table as opposed to
>> terminal
>>
>> Regards
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 5 April 2016 at 09:06, Sachin Aggarwal <different.sac...@gmail.com>
>> wrote:
>>
>>> Hey ,
>>>
>>> I have changed your example itself try this , it should work in terminal
>>>
>>> val result = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE 
>>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, 
>>> 1)).reduceByKey(_ + _)
>>> result.foreachRDD(rdd => rdd.foreach(println))
>>>
>>>
>>> On Tue, Apr 5, 2016 at 1:00 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Thanks.
>>>>
>>>> Currently this is what I am doing
>>>>
>>>> // Get the lines
>>>> //
>>>> val lines = messages.map(_._2)
>>>> // Check for message
>>>> val showResults = lines.filter(_.contains("Sending
>>>> messages")).flatMap(line => line.split("\n,")).map(word => (word,
>>>> 1)).reduceByKey(_ + _).print(1000)
>>>>
>>>> So it prints max of 1000 lines to terminal after filter and map. Can
>>>> this be done as suggested?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 5 April 2016 at 06:53, Sachin Aggarwal <different.sac...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> Instead of using print() directly on Dstream, I will suggest you use 
>>>>> foreachRDD
>>>>> if you  wanted to materialize all rows , example shown here:-
>>>>>
>>>>>
>>>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>>>>>
>>>>> dstream.foreachRDD(rdd => {
>>>>>       val connection = createNewConnection()  // executed at the driver
>>>>>       rdd.foreach(record => {
>>>>>           connection.send(record) // executed at the worker
>>>>>       })
>>>>>   })
>>>>>
>>>>>
>>>>> On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> I am afraid print(Integer.MAX_VALUE) does not return any lines!
>>>>>> However, print(1000) does
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * 
>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 3 April 2016 at 19:46, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>
>>>>>>> Since num is an Int, you can specify Integer.MAX_VALUE
>>>>>>>
>>>>>>> FYI
>>>>>>>
>>>>>>> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh <
>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks Ted.
>>>>>>>>
>>>>>>>> As I see print() materializes the first 10 rows. On the other hand
>>>>>>>> print(n) will materialise n rows.
>>>>>>>>
>>>>>>>> How about if I wanted to materialize all rows?
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn * 
>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Mich:
>>>>>>>>> See the following method of DStream:
>>>>>>>>>
>>>>>>>>>    * Print the first num elements of each RDD generated in this
>>>>>>>>> DStream. This is an output
>>>>>>>>>    * operator, so this DStream will be registered as an output
>>>>>>>>> stream and there materialized.
>>>>>>>>>    */
>>>>>>>>>   def print(num: Int): Unit = ssc.withScope {
>>>>>>>>>
>>>>>>>>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <
>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> However this works. I am checking the logic to see if it does
>>>>>>>>>> what I asked it to do
>>>>>>>>>>
>>>>>>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print
>>>>>>>>>>
>>>>>>>>>> scala> ssc.start()
>>>>>>>>>>
>>>>>>>>>> scala> -------------------------------------------
>>>>>>>>>> Time: 1459701925000 ms
>>>>>>>>>> -------------------------------------------
>>>>>>>>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15
>>>>>>>>>> databases,27)
>>>>>>>>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15
>>>>>>>>>> OR,27)
>>>>>>>>>> (Once databases are loaded to ASE 15, then you will need to
>>>>>>>>>> maintain them the way you maintain your PROD. For example run UPDATE 
>>>>>>>>>> INDEX
>>>>>>>>>> STATISTICS and REORG COMPACT as necessary. One of the frequent 
>>>>>>>>>> mistakes
>>>>>>>>>> that people do is NOT pruning data from daily log tables in ASE 15 
>>>>>>>>>> etc as
>>>>>>>>>> they do it in PROD. This normally results in slower performance on 
>>>>>>>>>> ASE 15
>>>>>>>>>> databases as test cycles continue. Use MDA readings to measure daily 
>>>>>>>>>> DML
>>>>>>>>>> activities on ASE 15 tables and compare them with those of PROD. A 
>>>>>>>>>> 24 hour
>>>>>>>>>> cycle measurement should be good. If you notice that certain tables 
>>>>>>>>>> have
>>>>>>>>>> different DML hits (insert/update/delete) compared to PROD you will 
>>>>>>>>>> know
>>>>>>>>>> that either ASE 15 is not doing everything in terms of batch 
>>>>>>>>>> activity (some
>>>>>>>>>> jobs are missing), or there is something inconsistent somewhere. ,27)
>>>>>>>>>> (* Make sure that you have enough tempdb system segment space for
>>>>>>>>>> UPDATE INDEX STATISTICS. It is always advisable to gauge the tempdb 
>>>>>>>>>> size
>>>>>>>>>> required in ASE 15 QA and expand the tempdb database in production
>>>>>>>>>> accordingly. The last thing you want is to blow up tempdb over the
>>>>>>>>>> migration weekend.,27)
>>>>>>>>>> (o In ASE 15 you can subdivide the task by running parallel
>>>>>>>>>> UPDATE INDEX STATISTICS on different tables in the same database at 
>>>>>>>>>> the
>>>>>>>>>> same time. Watch tempdb segment growth though! OR,27)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> LinkedIn * 
>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 3 April 2016 at 17:06, Mich Talebzadeh <
>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks Ted
>>>>>>>>>>>
>>>>>>>>>>> This works
>>>>>>>>>>>
>>>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String,
>>>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>>>>>> messages:
>>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, String)] =
>>>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063
>>>>>>>>>>>
>>>>>>>>>>> scala> // Get the lines
>>>>>>>>>>> scala> val lines = messages.map(_._2)
>>>>>>>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] =
>>>>>>>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64
>>>>>>>>>>>
>>>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
>>>>>>>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
>>>>>>>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>>>>>>>>>>>
>>>>>>>>>>> However, this fails
>>>>>>>>>>>
>>>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>> <console>:43: error: value collect is not a member of
>>>>>>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>>>>>>>>>          val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> bq. is not a member of (String, String)
>>>>>>>>>>>>
>>>>>>>>>>>> As shown above, contains shouldn't be applied directly on a
>>>>>>>>>>>> tuple.
>>>>>>>>>>>>
>>>>>>>>>>>> Choose the element of the tuple and then apply contains on it.
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you gents.
>>>>>>>>>>>>>
>>>>>>>>>>>>> That should "\n" as carriage return
>>>>>>>>>>>>>
>>>>>>>>>>>>> OK I am using spark streaming to analyse the message
>>>>>>>>>>>>>
>>>>>>>>>>>>> It does the streaming
>>>>>>>>>>>>>
>>>>>>>>>>>>> import _root_.kafka.serializer.StringDecoder
>>>>>>>>>>>>> import org.apache.spark.SparkConf
>>>>>>>>>>>>> import org.apache.spark.streaming._
>>>>>>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>>>>>>>>> //
>>>>>>>>>>>>> scala> val sparkConf = new SparkConf().
>>>>>>>>>>>>>      |              setAppName("StreamTest").
>>>>>>>>>>>>>      |              setMaster("local[12]").
>>>>>>>>>>>>>      |              set("spark.driver.allowMultipleContexts",
>>>>>>>>>>>>> "true").
>>>>>>>>>>>>>      |              set("spark.hadoop.validateOutputSpecs",
>>>>>>>>>>>>> "false")
>>>>>>>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>>>>>>>>>> scala>
>>>>>>>>>>>>> scala> val kafkaParams = Map[String,
>>>>>>>>>>>>> String]("bootstrap.servers" -> "rhes564:9092", 
>>>>>>>>>>>>> "schema.registry.url" -> "
>>>>>>>>>>>>> http://rhes564:8081";, "zookeeper.connect" -> "rhes564:2181", "
>>>>>>>>>>>>> group.id" -> "StreamTest" )
>>>>>>>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>>>>>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>>>>>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181,
>>>>>>>>>>>>> group.id -> StreamTest)
>>>>>>>>>>>>> scala> val topic = Set("newtopic")
>>>>>>>>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>>>>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String,
>>>>>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>>>>>>>> messages:
>>>>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, String)] 
>>>>>>>>>>>>> =
>>>>>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>>>>>>>>>>>
>>>>>>>>>>>>> This part is tricky
>>>>>>>>>>>>>
>>>>>>>>>>>>> scala> val showlines = messages.filter(_ contains("ASE
>>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line 
>>>>>>>>>>>>> =>
>>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>>> <console>:47: error: value contains is not a member of
>>>>>>>>>>>>> (String, String)
>>>>>>>>>>>>>          val showlines = messages.filter(_ contains("ASE
>>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line 
>>>>>>>>>>>>> =>
>>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> How does one refer to the content of the stream here?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> //
>>>>>>>>>>>>> // Now want to do some analysis on the same text file
>>>>>>>>>>>>> //
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> bq. split"\t," splits the filter by carriage return
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Minor correction: "\t" denotes tab character.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <
>>>>>>>>>>>>>> elir...@iguaz.io> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Mich,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1. The first underscore in your filter call is refering to a
>>>>>>>>>>>>>>> line in the file (as textFile() results in a collection of 
>>>>>>>>>>>>>>> strings)
>>>>>>>>>>>>>>> 2. You're correct. No need for it.
>>>>>>>>>>>>>>> 3. Filter is expecting a Boolean result. So you can merge
>>>>>>>>>>>>>>> your contains filters to one with AND (&&) statement.
>>>>>>>>>>>>>>> 4. Correct. Each character in split() is used as a divider.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Eliran Bivas
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>>>>>>>>>>> *Sent:* Apr 3, 2016 15:06
>>>>>>>>>>>>>>> *To:* Eliran Bivas
>>>>>>>>>>>>>>> *Cc:* user @spark
>>>>>>>>>>>>>>> *Subject:* Re: multiple splits fails
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Eliran,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Many thanks for your input on this.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I thought about what I was trying to achieve so I rewrote
>>>>>>>>>>>>>>> the logic as follows:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    1. Read the text file in
>>>>>>>>>>>>>>>    2. Filter out empty lines (well not really needed here)
>>>>>>>>>>>>>>>    3. Search for lines that contain "ASE 15" and further
>>>>>>>>>>>>>>>    have sentence "UPDATE INDEX STATISTICS" in the said line
>>>>>>>>>>>>>>>    4. Split the text by "\t" and ","
>>>>>>>>>>>>>>>    5. Print the outcome
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This was what I did with your suggestions included
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>>>>>>>> f.cache()
>>>>>>>>>>>>>>>  f.filter(_.length > 0).filter(_ contains("ASE
>>>>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX 
>>>>>>>>>>>>>>> STATISTICS")).flatMap(line =>
>>>>>>>>>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Couple of questions if I may
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    1. I take that "_" refers to content of the file read in
>>>>>>>>>>>>>>>    by default?
>>>>>>>>>>>>>>>    2. _.length > 0 basically filters out blank lines (not
>>>>>>>>>>>>>>>    really needed here)
>>>>>>>>>>>>>>>    3. Multiple filters are needed for each *contains* logic
>>>>>>>>>>>>>>>    4. split"\t," splits the filter by carriage return AND ,?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Mich,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Few comments:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> When doing .filter(_ > “”) you’re actually doing a
>>>>>>>>>>>>>>>> lexicographic comparison and not filtering for empty lines 
>>>>>>>>>>>>>>>> (which could be
>>>>>>>>>>>>>>>> achieved with _.notEmpty or _.length > 0).
>>>>>>>>>>>>>>>> I think that filtering with _.contains should be sufficient
>>>>>>>>>>>>>>>> and the first filter can be omitted.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> As for line => line.split(“\t”).split(“,”):
>>>>>>>>>>>>>>>> You have to do a second map or (since split() requires a
>>>>>>>>>>>>>>>> regex as input) .split(“\t,”).
>>>>>>>>>>>>>>>> The problem is that your first split() call will generate
>>>>>>>>>>>>>>>> an Array and then your second call will result in an error.
>>>>>>>>>>>>>>>> e.g.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> val lines: Array[String] = line.split(“\t”)
>>>>>>>>>>>>>>>> lines.split(“,”) // Compilation error - no method split()
>>>>>>>>>>>>>>>> exists for Array
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>>>>>>>>>>>>>>>> map(_.split(“\t,”))
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hope that helps.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *Eliran Bivas*
>>>>>>>>>>>>>>>> Data Team | iguaz.io
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <
>>>>>>>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am not sure this is the correct approach
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Read a text file in
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Now I want to get rid of empty lines and filter only the
>>>>>>>>>>>>>>>> lines that contain "ASE15"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The above works but I am not sure whether I need two filter
>>>>>>>>>>>>>>>> transformation above? Can it be done in one?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Now I want to map the above filter to lines with carriage
>>>>>>>>>>>>>>>> return ans split them by ","
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>>>>>>>>>>> (line.split("\t")))
>>>>>>>>>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] =
>>>>>>>>>>>>>>>> MapPartitionsRDD[131] at map at <console>:30
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Now I want to split the output by ","
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> scala> f.filter(_ > "").filter(_
>>>>>>>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(",")))
>>>>>>>>>>>>>>>> <console>:30: error: value split is not a member of
>>>>>>>>>>>>>>>> Array[String]
>>>>>>>>>>>>>>>>               f.filter(_ > "").filter(_
>>>>>>>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(",")))
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ^
>>>>>>>>>>>>>>>> Any advice will be appreciated
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Thanks & Regards
>>>>>
>>>>> Sachin Aggarwal
>>>>> 7760502772
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> Thanks & Regards
>>>
>>> Sachin Aggarwal
>>> 7760502772
>>>
>>
>>
>
>
> --
>
> Thanks & Regards
>
> Sachin Aggarwal
> 7760502772
>

Reply via email to