If I go through each RDD  I get

val result = lines.filter(_.contains("Sending messages")).flatMap(line =>
line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
scala> result.foreachRDD( rdd => {
     | for(item <- rdd.collect().toArray) {
     | println(item);
     | }
     | })

Rather than println(items), I want to store the data temporarily and then
flush the results to an HDFS file as an append

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 5 April 2016 at 14:02, Mich Talebzadeh <mich.talebza...@gmail.com> wrote:

> This is the idea I have in mind
>
> I want to go through every line
>
> result.foreachRDD(rdd => rdd.foreach(println))
>
> rather than print each line I want to save them temporarily and then
> add/append the result set (lines in RDD ) to a table for further analyses.
> It could be a Parquet or Hive table.
>
> So only interested in lines of interest that saves me space on the disk.
>
> I am aware that I can simply write them to text files
>
> result.saveAsTextFiles("/user/hduser/tmp/keep")
>
>
> Any ideas will be appreciated
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 5 April 2016 at 09:32, Sachin Aggarwal <different.sac...@gmail.com>
> wrote:
>
>> sure,
>>
>> this will be help full try this
>>
>>
>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter3/save_an_rdd_to_a_database.html
>>
>> On Tue, Apr 5, 2016 at 1:56 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Thanks Sachin. Will test it
>>>
>>> I guess I can modify it to save the output to a Hive table as opposed to
>>> terminal
>>>
>>> Regards
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 5 April 2016 at 09:06, Sachin Aggarwal <different.sac...@gmail.com>
>>> wrote:
>>>
>>>> Hey ,
>>>>
>>>> I have changed your example itself try this , it should work in
>>>> terminal
>>>>
>>>> val result = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE 
>>>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, 
>>>> 1)).reduceByKey(_ + _)
>>>> result.foreachRDD(rdd => rdd.foreach(println))
>>>>
>>>>
>>>> On Tue, Apr 5, 2016 at 1:00 PM, Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Thanks.
>>>>>
>>>>> Currently this is what I am doing
>>>>>
>>>>> // Get the lines
>>>>> //
>>>>> val lines = messages.map(_._2)
>>>>> // Check for message
>>>>> val showResults = lines.filter(_.contains("Sending
>>>>> messages")).flatMap(line => line.split("\n,")).map(word => (word,
>>>>> 1)).reduceByKey(_ + _).print(1000)
>>>>>
>>>>> So it prints max of 1000 lines to terminal after filter and map. Can
>>>>> this be done as suggested?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>> On 5 April 2016 at 06:53, Sachin Aggarwal <different.sac...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> Instead of using print() directly on Dstream, I will suggest you use 
>>>>>> foreachRDD
>>>>>> if you  wanted to materialize all rows , example shown here:-
>>>>>>
>>>>>>
>>>>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>>>>>>
>>>>>> dstream.foreachRDD(rdd => {
>>>>>>       val connection = createNewConnection()  // executed at the driver
>>>>>>       rdd.foreach(record => {
>>>>>>           connection.send(record) // executed at the worker
>>>>>>       })
>>>>>>   })
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> I am afraid print(Integer.MAX_VALUE) does not return any lines!
>>>>>>> However, print(1000) does
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * 
>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 3 April 2016 at 19:46, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Since num is an Int, you can specify Integer.MAX_VALUE
>>>>>>>>
>>>>>>>> FYI
>>>>>>>>
>>>>>>>> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh <
>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks Ted.
>>>>>>>>>
>>>>>>>>> As I see print() materializes the first 10 rows. On the other
>>>>>>>>> hand  print(n) will materialise n rows.
>>>>>>>>>
>>>>>>>>> How about if I wanted to materialize all rows?
>>>>>>>>>
>>>>>>>>> Cheers
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn * 
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Mich:
>>>>>>>>>> See the following method of DStream:
>>>>>>>>>>
>>>>>>>>>>    * Print the first num elements of each RDD generated in this
>>>>>>>>>> DStream. This is an output
>>>>>>>>>>    * operator, so this DStream will be registered as an output
>>>>>>>>>> stream and there materialized.
>>>>>>>>>>    */
>>>>>>>>>>   def print(num: Int): Unit = ssc.withScope {
>>>>>>>>>>
>>>>>>>>>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <
>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> However this works. I am checking the logic to see if it does
>>>>>>>>>>> what I asked it to do
>>>>>>>>>>>
>>>>>>>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print
>>>>>>>>>>>
>>>>>>>>>>> scala> ssc.start()
>>>>>>>>>>>
>>>>>>>>>>> scala> -------------------------------------------
>>>>>>>>>>> Time: 1459701925000 ms
>>>>>>>>>>> -------------------------------------------
>>>>>>>>>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15
>>>>>>>>>>> databases,27)
>>>>>>>>>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15
>>>>>>>>>>> OR,27)
>>>>>>>>>>> (Once databases are loaded to ASE 15, then you will need to
>>>>>>>>>>> maintain them the way you maintain your PROD. For example run 
>>>>>>>>>>> UPDATE INDEX
>>>>>>>>>>> STATISTICS and REORG COMPACT as necessary. One of the frequent 
>>>>>>>>>>> mistakes
>>>>>>>>>>> that people do is NOT pruning data from daily log tables in ASE 15 
>>>>>>>>>>> etc as
>>>>>>>>>>> they do it in PROD. This normally results in slower performance on 
>>>>>>>>>>> ASE 15
>>>>>>>>>>> databases as test cycles continue. Use MDA readings to measure 
>>>>>>>>>>> daily DML
>>>>>>>>>>> activities on ASE 15 tables and compare them with those of PROD. A 
>>>>>>>>>>> 24 hour
>>>>>>>>>>> cycle measurement should be good. If you notice that certain tables 
>>>>>>>>>>> have
>>>>>>>>>>> different DML hits (insert/update/delete) compared to PROD you will 
>>>>>>>>>>> know
>>>>>>>>>>> that either ASE 15 is not doing everything in terms of batch 
>>>>>>>>>>> activity (some
>>>>>>>>>>> jobs are missing), or there is something inconsistent somewhere. 
>>>>>>>>>>> ,27)
>>>>>>>>>>> (* Make sure that you have enough tempdb system segment space
>>>>>>>>>>> for UPDATE INDEX STATISTICS. It is always advisable to gauge the 
>>>>>>>>>>> tempdb
>>>>>>>>>>> size required in ASE 15 QA and expand the tempdb database in 
>>>>>>>>>>> production
>>>>>>>>>>> accordingly. The last thing you want is to blow up tempdb over the
>>>>>>>>>>> migration weekend.,27)
>>>>>>>>>>> (o In ASE 15 you can subdivide the task by running parallel
>>>>>>>>>>> UPDATE INDEX STATISTICS on different tables in the same database at 
>>>>>>>>>>> the
>>>>>>>>>>> same time. Watch tempdb segment growth though! OR,27)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 3 April 2016 at 17:06, Mich Talebzadeh <
>>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks Ted
>>>>>>>>>>>>
>>>>>>>>>>>> This works
>>>>>>>>>>>>
>>>>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String,
>>>>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>>>>>>> messages:
>>>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, String)] =
>>>>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063
>>>>>>>>>>>>
>>>>>>>>>>>> scala> // Get the lines
>>>>>>>>>>>> scala> val lines = messages.map(_._2)
>>>>>>>>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] =
>>>>>>>>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64
>>>>>>>>>>>>
>>>>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
>>>>>>>>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
>>>>>>>>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>>>>>>>>>>>>
>>>>>>>>>>>> However, this fails
>>>>>>>>>>>>
>>>>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>> <console>:43: error: value collect is not a member of
>>>>>>>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>>>>>>>>>>          val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> bq. is not a member of (String, String)
>>>>>>>>>>>>>
>>>>>>>>>>>>> As shown above, contains shouldn't be applied directly on a
>>>>>>>>>>>>> tuple.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Choose the element of the tuple and then apply contains on it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you gents.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> That should "\n" as carriage return
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> OK I am using spark streaming to analyse the message
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It does the streaming
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> import _root_.kafka.serializer.StringDecoder
>>>>>>>>>>>>>> import org.apache.spark.SparkConf
>>>>>>>>>>>>>> import org.apache.spark.streaming._
>>>>>>>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>>>>>>>>>> //
>>>>>>>>>>>>>> scala> val sparkConf = new SparkConf().
>>>>>>>>>>>>>>      |              setAppName("StreamTest").
>>>>>>>>>>>>>>      |              setMaster("local[12]").
>>>>>>>>>>>>>>      |              set("spark.driver.allowMultipleContexts",
>>>>>>>>>>>>>> "true").
>>>>>>>>>>>>>>      |              set("spark.hadoop.validateOutputSpecs",
>>>>>>>>>>>>>> "false")
>>>>>>>>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>>>>>>>>>>> scala>
>>>>>>>>>>>>>> scala> val kafkaParams = Map[String,
>>>>>>>>>>>>>> String]("bootstrap.servers" -> "rhes564:9092", 
>>>>>>>>>>>>>> "schema.registry.url" -> "
>>>>>>>>>>>>>> http://rhes564:8081";, "zookeeper.connect" -> "rhes564:2181",
>>>>>>>>>>>>>> "group.id" -> "StreamTest" )
>>>>>>>>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>>>>>>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>>>>>>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181,
>>>>>>>>>>>>>> group.id -> StreamTest)
>>>>>>>>>>>>>> scala> val topic = Set("newtopic")
>>>>>>>>>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>>>>>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String,
>>>>>>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>>>>>>>>> messages:
>>>>>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, 
>>>>>>>>>>>>>> String)] =
>>>>>>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This part is tricky
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> scala> val showlines = messages.filter(_ contains("ASE
>>>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line 
>>>>>>>>>>>>>> =>
>>>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>>>> <console>:47: error: value contains is not a member of
>>>>>>>>>>>>>> (String, String)
>>>>>>>>>>>>>>          val showlines = messages.filter(_ contains("ASE
>>>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line 
>>>>>>>>>>>>>> =>
>>>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> How does one refer to the content of the stream here?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> //
>>>>>>>>>>>>>> // Now want to do some analysis on the same text file
>>>>>>>>>>>>>> //
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> bq. split"\t," splits the filter by carriage return
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Minor correction: "\t" denotes tab character.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <
>>>>>>>>>>>>>>> elir...@iguaz.io> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Mich,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1. The first underscore in your filter call is refering to
>>>>>>>>>>>>>>>> a line in the file (as textFile() results in a collection of 
>>>>>>>>>>>>>>>> strings)
>>>>>>>>>>>>>>>> 2. You're correct. No need for it.
>>>>>>>>>>>>>>>> 3. Filter is expecting a Boolean result. So you can merge
>>>>>>>>>>>>>>>> your contains filters to one with AND (&&) statement.
>>>>>>>>>>>>>>>> 4. Correct. Each character in split() is used as a divider.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Eliran Bivas
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>>>>>>>>>>>> *Sent:* Apr 3, 2016 15:06
>>>>>>>>>>>>>>>> *To:* Eliran Bivas
>>>>>>>>>>>>>>>> *Cc:* user @spark
>>>>>>>>>>>>>>>> *Subject:* Re: multiple splits fails
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Eliran,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Many thanks for your input on this.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I thought about what I was trying to achieve so I rewrote
>>>>>>>>>>>>>>>> the logic as follows:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>    1. Read the text file in
>>>>>>>>>>>>>>>>    2. Filter out empty lines (well not really needed here)
>>>>>>>>>>>>>>>>    3. Search for lines that contain "ASE 15" and further
>>>>>>>>>>>>>>>>    have sentence "UPDATE INDEX STATISTICS" in the said line
>>>>>>>>>>>>>>>>    4. Split the text by "\t" and ","
>>>>>>>>>>>>>>>>    5. Print the outcome
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This was what I did with your suggestions included
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>>>>>>>>> f.cache()
>>>>>>>>>>>>>>>>  f.filter(_.length > 0).filter(_ contains("ASE
>>>>>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX 
>>>>>>>>>>>>>>>> STATISTICS")).flatMap(line =>
>>>>>>>>>>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Couple of questions if I may
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>    1. I take that "_" refers to content of the file read
>>>>>>>>>>>>>>>>    in by default?
>>>>>>>>>>>>>>>>    2. _.length > 0 basically filters out blank lines (not
>>>>>>>>>>>>>>>>    really needed here)
>>>>>>>>>>>>>>>>    3. Multiple filters are needed for each *contains* logic
>>>>>>>>>>>>>>>>    4. split"\t," splits the filter by carriage return AND
>>>>>>>>>>>>>>>>    ,?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Mich,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Few comments:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When doing .filter(_ > “”) you’re actually doing a
>>>>>>>>>>>>>>>>> lexicographic comparison and not filtering for empty lines 
>>>>>>>>>>>>>>>>> (which could be
>>>>>>>>>>>>>>>>> achieved with _.notEmpty or _.length > 0).
>>>>>>>>>>>>>>>>> I think that filtering with _.contains should be
>>>>>>>>>>>>>>>>> sufficient and the first filter can be omitted.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> As for line => line.split(“\t”).split(“,”):
>>>>>>>>>>>>>>>>> You have to do a second map or (since split() requires a
>>>>>>>>>>>>>>>>> regex as input) .split(“\t,”).
>>>>>>>>>>>>>>>>> The problem is that your first split() call will generate
>>>>>>>>>>>>>>>>> an Array and then your second call will result in an error.
>>>>>>>>>>>>>>>>> e.g.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> val lines: Array[String] = line.split(“\t”)
>>>>>>>>>>>>>>>>> lines.split(“,”) // Compilation error - no method split()
>>>>>>>>>>>>>>>>> exists for Array
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>>>>>>>>>>>>>>>>> map(_.split(“\t,”))
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hope that helps.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> *Eliran Bivas*
>>>>>>>>>>>>>>>>> Data Team | iguaz.io
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <
>>>>>>>>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I am not sure this is the correct approach
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Read a text file in
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Now I want to get rid of empty lines and filter only the
>>>>>>>>>>>>>>>>> lines that contain "ASE15"
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The above works but I am not sure whether I need two
>>>>>>>>>>>>>>>>> filter transformation above? Can it be done in one?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Now I want to map the above filter to lines with carriage
>>>>>>>>>>>>>>>>> return ans split them by ","
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>>>>>>>>>>>> (line.split("\t")))
>>>>>>>>>>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] =
>>>>>>>>>>>>>>>>> MapPartitionsRDD[131] at map at <console>:30
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Now I want to split the output by ","
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> scala> f.filter(_ > "").filter(_
>>>>>>>>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(",")))
>>>>>>>>>>>>>>>>> <console>:30: error: value split is not a member of
>>>>>>>>>>>>>>>>> Array[String]
>>>>>>>>>>>>>>>>>               f.filter(_ > "").filter(_
>>>>>>>>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(",")))
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ^
>>>>>>>>>>>>>>>>> Any advice will be appreciated
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Thanks & Regards
>>>>>>
>>>>>> Sachin Aggarwal
>>>>>> 7760502772
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Thanks & Regards
>>>>
>>>> Sachin Aggarwal
>>>> 7760502772
>>>>
>>>
>>>
>>
>>
>> --
>>
>> Thanks & Regards
>>
>> Sachin Aggarwal
>> 7760502772
>>
>
>

Reply via email to