sure,

this will be help full try this

https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter3/save_an_rdd_to_a_database.html

On Tue, Apr 5, 2016 at 1:56 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Thanks Sachin. Will test it
>
> I guess I can modify it to save the output to a Hive table as opposed to
> terminal
>
> Regards
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 5 April 2016 at 09:06, Sachin Aggarwal <different.sac...@gmail.com>
> wrote:
>
>> Hey ,
>>
>> I have changed your example itself try this , it should work in terminal
>>
>> val result = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE 
>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, 
>> 1)).reduceByKey(_ + _)
>> result.foreachRDD(rdd => rdd.foreach(println))
>>
>>
>> On Tue, Apr 5, 2016 at 1:00 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Thanks.
>>>
>>> Currently this is what I am doing
>>>
>>> // Get the lines
>>> //
>>> val lines = messages.map(_._2)
>>> // Check for message
>>> val showResults = lines.filter(_.contains("Sending
>>> messages")).flatMap(line => line.split("\n,")).map(word => (word,
>>> 1)).reduceByKey(_ + _).print(1000)
>>>
>>> So it prints max of 1000 lines to terminal after filter and map. Can
>>> this be done as suggested?
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 5 April 2016 at 06:53, Sachin Aggarwal <different.sac...@gmail.com>
>>> wrote:
>>>
>>>> Hi
>>>>
>>>> Instead of using print() directly on Dstream, I will suggest you use 
>>>> foreachRDD
>>>> if you  wanted to materialize all rows , example shown here:-
>>>>
>>>>
>>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>>>>
>>>> dstream.foreachRDD(rdd => {
>>>>       val connection = createNewConnection()  // executed at the driver
>>>>       rdd.foreach(record => {
>>>>           connection.send(record) // executed at the worker
>>>>       })
>>>>   })
>>>>
>>>>
>>>> On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> I am afraid print(Integer.MAX_VALUE) does not return any lines!
>>>>> However, print(1000) does
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>> On 3 April 2016 at 19:46, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> Since num is an Int, you can specify Integer.MAX_VALUE
>>>>>>
>>>>>> FYI
>>>>>>
>>>>>> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks Ted.
>>>>>>>
>>>>>>> As I see print() materializes the first 10 rows. On the other hand
>>>>>>> print(n) will materialise n rows.
>>>>>>>
>>>>>>> How about if I wanted to materialize all rows?
>>>>>>>
>>>>>>> Cheers
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * 
>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Mich:
>>>>>>>> See the following method of DStream:
>>>>>>>>
>>>>>>>>    * Print the first num elements of each RDD generated in this
>>>>>>>> DStream. This is an output
>>>>>>>>    * operator, so this DStream will be registered as an output
>>>>>>>> stream and there materialized.
>>>>>>>>    */
>>>>>>>>   def print(num: Int): Unit = ssc.withScope {
>>>>>>>>
>>>>>>>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <
>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> However this works. I am checking the logic to see if it does what
>>>>>>>>> I asked it to do
>>>>>>>>>
>>>>>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print
>>>>>>>>>
>>>>>>>>> scala> ssc.start()
>>>>>>>>>
>>>>>>>>> scala> -------------------------------------------
>>>>>>>>> Time: 1459701925000 ms
>>>>>>>>> -------------------------------------------
>>>>>>>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15
>>>>>>>>> databases,27)
>>>>>>>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15
>>>>>>>>> OR,27)
>>>>>>>>> (Once databases are loaded to ASE 15, then you will need to
>>>>>>>>> maintain them the way you maintain your PROD. For example run UPDATE 
>>>>>>>>> INDEX
>>>>>>>>> STATISTICS and REORG COMPACT as necessary. One of the frequent 
>>>>>>>>> mistakes
>>>>>>>>> that people do is NOT pruning data from daily log tables in ASE 15 
>>>>>>>>> etc as
>>>>>>>>> they do it in PROD. This normally results in slower performance on 
>>>>>>>>> ASE 15
>>>>>>>>> databases as test cycles continue. Use MDA readings to measure daily 
>>>>>>>>> DML
>>>>>>>>> activities on ASE 15 tables and compare them with those of PROD. A 24 
>>>>>>>>> hour
>>>>>>>>> cycle measurement should be good. If you notice that certain tables 
>>>>>>>>> have
>>>>>>>>> different DML hits (insert/update/delete) compared to PROD you will 
>>>>>>>>> know
>>>>>>>>> that either ASE 15 is not doing everything in terms of batch activity 
>>>>>>>>> (some
>>>>>>>>> jobs are missing), or there is something inconsistent somewhere. ,27)
>>>>>>>>> (* Make sure that you have enough tempdb system segment space for
>>>>>>>>> UPDATE INDEX STATISTICS. It is always advisable to gauge the tempdb 
>>>>>>>>> size
>>>>>>>>> required in ASE 15 QA and expand the tempdb database in production
>>>>>>>>> accordingly. The last thing you want is to blow up tempdb over the
>>>>>>>>> migration weekend.,27)
>>>>>>>>> (o In ASE 15 you can subdivide the task by running parallel UPDATE
>>>>>>>>> INDEX STATISTICS on different tables in the same database at the same 
>>>>>>>>> time.
>>>>>>>>> Watch tempdb segment growth though! OR,27)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn * 
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 3 April 2016 at 17:06, Mich Talebzadeh <
>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Ted
>>>>>>>>>>
>>>>>>>>>> This works
>>>>>>>>>>
>>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String,
>>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>>>>> messages:
>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, String)] =
>>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063
>>>>>>>>>>
>>>>>>>>>> scala> // Get the lines
>>>>>>>>>> scala> val lines = messages.map(_._2)
>>>>>>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] =
>>>>>>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64
>>>>>>>>>>
>>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
>>>>>>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
>>>>>>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>>>>>>>>>>
>>>>>>>>>> However, this fails
>>>>>>>>>>
>>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>> <console>:43: error: value collect is not a member of
>>>>>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>>>>>>>>          val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> LinkedIn * 
>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> bq. is not a member of (String, String)
>>>>>>>>>>>
>>>>>>>>>>> As shown above, contains shouldn't be applied directly on a
>>>>>>>>>>> tuple.
>>>>>>>>>>>
>>>>>>>>>>> Choose the element of the tuple and then apply contains on it.
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thank you gents.
>>>>>>>>>>>>
>>>>>>>>>>>> That should "\n" as carriage return
>>>>>>>>>>>>
>>>>>>>>>>>> OK I am using spark streaming to analyse the message
>>>>>>>>>>>>
>>>>>>>>>>>> It does the streaming
>>>>>>>>>>>>
>>>>>>>>>>>> import _root_.kafka.serializer.StringDecoder
>>>>>>>>>>>> import org.apache.spark.SparkConf
>>>>>>>>>>>> import org.apache.spark.streaming._
>>>>>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>>>>>>>> //
>>>>>>>>>>>> scala> val sparkConf = new SparkConf().
>>>>>>>>>>>>      |              setAppName("StreamTest").
>>>>>>>>>>>>      |              setMaster("local[12]").
>>>>>>>>>>>>      |              set("spark.driver.allowMultipleContexts",
>>>>>>>>>>>> "true").
>>>>>>>>>>>>      |              set("spark.hadoop.validateOutputSpecs",
>>>>>>>>>>>> "false")
>>>>>>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>>>>>>>>> scala>
>>>>>>>>>>>> scala> val kafkaParams = Map[String,
>>>>>>>>>>>> String]("bootstrap.servers" -> "rhes564:9092", 
>>>>>>>>>>>> "schema.registry.url" -> "
>>>>>>>>>>>> http://rhes564:8081";, "zookeeper.connect" -> "rhes564:2181", "
>>>>>>>>>>>> group.id" -> "StreamTest" )
>>>>>>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>>>>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>>>>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181,
>>>>>>>>>>>> group.id -> StreamTest)
>>>>>>>>>>>> scala> val topic = Set("newtopic")
>>>>>>>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>>>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String,
>>>>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>>>>>>> messages:
>>>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, String)] =
>>>>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>>>>>>>>>>
>>>>>>>>>>>> This part is tricky
>>>>>>>>>>>>
>>>>>>>>>>>> scala> val showlines = messages.filter(_ contains("ASE
>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>> <console>:47: error: value contains is not a member of (String,
>>>>>>>>>>>> String)
>>>>>>>>>>>>          val showlines = messages.filter(_ contains("ASE
>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> How does one refer to the content of the stream here?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> //
>>>>>>>>>>>> // Now want to do some analysis on the same text file
>>>>>>>>>>>> //
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> bq. split"\t," splits the filter by carriage return
>>>>>>>>>>>>>
>>>>>>>>>>>>> Minor correction: "\t" denotes tab character.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io
>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Mich,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1. The first underscore in your filter call is refering to a
>>>>>>>>>>>>>> line in the file (as textFile() results in a collection of 
>>>>>>>>>>>>>> strings)
>>>>>>>>>>>>>> 2. You're correct. No need for it.
>>>>>>>>>>>>>> 3. Filter is expecting a Boolean result. So you can merge
>>>>>>>>>>>>>> your contains filters to one with AND (&&) statement.
>>>>>>>>>>>>>> 4. Correct. Each character in split() is used as a divider.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Eliran Bivas
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>>>>>>>>>> *Sent:* Apr 3, 2016 15:06
>>>>>>>>>>>>>> *To:* Eliran Bivas
>>>>>>>>>>>>>> *Cc:* user @spark
>>>>>>>>>>>>>> *Subject:* Re: multiple splits fails
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Eliran,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Many thanks for your input on this.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I thought about what I was trying to achieve so I rewrote the
>>>>>>>>>>>>>> logic as follows:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    1. Read the text file in
>>>>>>>>>>>>>>    2. Filter out empty lines (well not really needed here)
>>>>>>>>>>>>>>    3. Search for lines that contain "ASE 15" and further
>>>>>>>>>>>>>>    have sentence "UPDATE INDEX STATISTICS" in the said line
>>>>>>>>>>>>>>    4. Split the text by "\t" and ","
>>>>>>>>>>>>>>    5. Print the outcome
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This was what I did with your suggestions included
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>>>>>>> f.cache()
>>>>>>>>>>>>>>  f.filter(_.length > 0).filter(_ contains("ASE
>>>>>>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line 
>>>>>>>>>>>>>> =>
>>>>>>>>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Couple of questions if I may
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    1. I take that "_" refers to content of the file read in
>>>>>>>>>>>>>>    by default?
>>>>>>>>>>>>>>    2. _.length > 0 basically filters out blank lines (not
>>>>>>>>>>>>>>    really needed here)
>>>>>>>>>>>>>>    3. Multiple filters are needed for each *contains* logic
>>>>>>>>>>>>>>    4. split"\t," splits the filter by carriage return AND ,?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Mich,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Few comments:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> When doing .filter(_ > “”) you’re actually doing a
>>>>>>>>>>>>>>> lexicographic comparison and not filtering for empty lines 
>>>>>>>>>>>>>>> (which could be
>>>>>>>>>>>>>>> achieved with _.notEmpty or _.length > 0).
>>>>>>>>>>>>>>> I think that filtering with _.contains should be sufficient
>>>>>>>>>>>>>>> and the first filter can be omitted.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> As for line => line.split(“\t”).split(“,”):
>>>>>>>>>>>>>>> You have to do a second map or (since split() requires a
>>>>>>>>>>>>>>> regex as input) .split(“\t,”).
>>>>>>>>>>>>>>> The problem is that your first split() call will generate an
>>>>>>>>>>>>>>> Array and then your second call will result in an error.
>>>>>>>>>>>>>>> e.g.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> val lines: Array[String] = line.split(“\t”)
>>>>>>>>>>>>>>> lines.split(“,”) // Compilation error - no method split()
>>>>>>>>>>>>>>> exists for Array
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>>>>>>>>>>>>>>> map(_.split(“\t,”))
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hope that helps.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *Eliran Bivas*
>>>>>>>>>>>>>>> Data Team | iguaz.io
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <
>>>>>>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am not sure this is the correct approach
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Read a text file in
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Now I want to get rid of empty lines and filter only the
>>>>>>>>>>>>>>> lines that contain "ASE15"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The above works but I am not sure whether I need two filter
>>>>>>>>>>>>>>> transformation above? Can it be done in one?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Now I want to map the above filter to lines with carriage
>>>>>>>>>>>>>>> return ans split them by ","
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>>>>>>>>>> (line.split("\t")))
>>>>>>>>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] =
>>>>>>>>>>>>>>> MapPartitionsRDD[131] at map at <console>:30
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Now I want to split the output by ","
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line
>>>>>>>>>>>>>>> => (line.split("\t").split(",")))
>>>>>>>>>>>>>>> <console>:30: error: value split is not a member of
>>>>>>>>>>>>>>> Array[String]
>>>>>>>>>>>>>>>               f.filter(_ > "").filter(_
>>>>>>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(",")))
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ^
>>>>>>>>>>>>>>> Any advice will be appreciated
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Thanks & Regards
>>>>
>>>> Sachin Aggarwal
>>>> 7760502772
>>>>
>>>
>>>
>>
>>
>> --
>>
>> Thanks & Regards
>>
>> Sachin Aggarwal
>> 7760502772
>>
>
>


-- 

Thanks & Regards

Sachin Aggarwal
7760502772

Reply via email to