Thank you gents.

That should "\n" as carriage return

OK I am using spark streaming to analyse the message

It does the streaming

import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
//
scala> val sparkConf = new SparkConf().
     |              setAppName("StreamTest").
     |              setMaster("local[12]").
     |              set("spark.driver.allowMultipleContexts", "true").
     |              set("spark.hadoop.validateOutputSpecs", "false")
scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
scala>
scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
kafkaParams: scala.collection.immutable.Map[String,String] =
Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id ->
StreamTest)
scala> val topic = Set("newtopic")
topic: scala.collection.immutable.Set[String] = Set(newtopic)
scala> val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topic)
messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)]
= org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c

This part is tricky

scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_
contains("UPDATE INDEX STATISTICS")).flatMap(line =>
line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
_).collect.foreach(println)
<console>:47: error: value contains is not a member of (String, String)
         val showlines = messages.filter(_ contains("ASE 15")).filter(_
contains("UPDATE INDEX STATISTICS")).flatMap(line =>
line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
_).collect.foreach(println)


How does one refer to the content of the stream here?

Thanks










//
// Now want to do some analysis on the same text file
//



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote:

> bq. split"\t," splits the filter by carriage return
>
> Minor correction: "\t" denotes tab character.
>
> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io> wrote:
>
>> Hi Mich,
>>
>> 1. The first underscore in your filter call is refering to a line in the
>> file (as textFile() results in a collection of strings)
>> 2. You're correct. No need for it.
>> 3. Filter is expecting a Boolean result. So you can merge your contains
>> filters to one with AND (&&) statement.
>> 4. Correct. Each character in split() is used as a divider.
>>
>> Eliran Bivas
>>
>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
>> *Sent:* Apr 3, 2016 15:06
>> *To:* Eliran Bivas
>> *Cc:* user @spark
>> *Subject:* Re: multiple splits fails
>>
>> Hi Eliran,
>>
>> Many thanks for your input on this.
>>
>> I thought about what I was trying to achieve so I rewrote the logic as
>> follows:
>>
>>
>>    1. Read the text file in
>>    2. Filter out empty lines (well not really needed here)
>>    3. Search for lines that contain "ASE 15" and further have sentence
>>    "UPDATE INDEX STATISTICS" in the said line
>>    4. Split the text by "\t" and ","
>>    5. Print the outcome
>>
>>
>> This was what I did with your suggestions included
>>
>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>> f.cache()
>>  f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_
>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>> _).collect.foreach(println)
>>
>>
>> Couple of questions if I may
>>
>>
>>    1. I take that "_" refers to content of the file read in by default?
>>    2. _.length > 0 basically filters out blank lines (not really needed
>>    here)
>>    3. Multiple filters are needed for each *contains* logic
>>    4. split"\t," splits the filter by carriage return AND ,?
>>
>>
>> Regards
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote:
>>
>>> Hi Mich,
>>>
>>> Few comments:
>>>
>>> When doing .filter(_ > “”) you’re actually doing a lexicographic
>>> comparison and not filtering for empty lines (which could be achieved with
>>> _.notEmpty or _.length > 0).
>>> I think that filtering with _.contains should be sufficient and the
>>> first filter can be omitted.
>>>
>>> As for line => line.split(“\t”).split(“,”):
>>> You have to do a second map or (since split() requires a regex as
>>> input) .split(“\t,”).
>>> The problem is that your first split() call will generate an Array and
>>> then your second call will result in an error.
>>> e.g.
>>>
>>> val lines: Array[String] = line.split(“\t”)
>>> lines.split(“,”) // Compilation error - no method split() exists for
>>> Array
>>>
>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>>> map(_.split(“\t,”))
>>>
>>> Hope that helps.
>>>
>>> *Eliran Bivas*
>>> Data Team | iguaz.io
>>>
>>>
>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <mich.talebza...@gmail.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> I am not sure this is the correct approach
>>>
>>> Read a text file in
>>>
>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>
>>>
>>> Now I want to get rid of empty lines and filter only the lines that
>>> contain "ASE15"
>>>
>>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>>
>>> The above works but I am not sure whether I need two filter
>>> transformation above? Can it be done in one?
>>>
>>> Now I want to map the above filter to lines with carriage return ans
>>> split them by ","
>>>
>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>> (line.split("\t")))
>>> res88: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[131]
>>> at map at <console>:30
>>>
>>> Now I want to split the output by ","
>>>
>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>> (line.split("\t").split(",")))
>>> <console>:30: error: value split is not a member of Array[String]
>>>               f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>> (line.split("\t").split(",")))
>>>
>>> ^
>>> Any advice will be appreciated
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>>
>>
>

Reply via email to