Thanks Ted

This works

scala> val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topic)
messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)]
= org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063

scala> // Get the lines
scala> val lines = messages.map(_._2)
lines: org.apache.spark.streaming.dstream.DStream[String] =
org.apache.spark.streaming.dstream.MappedDStream@1e4afd64

scala> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE
INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word,
1)).reduceByKey(_ + _)
v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d

However, this fails

scala> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE
INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word,
1)).reduceByKey(_ + _).collect.foreach(println)
<console>:43: error: value collect is not a member of
org.apache.spark.streaming.dstream.DStream[(String, Int)]
         val v = lines.filter(_.contains("ASE 15")).filter(_
contains("UPDATE INDEX STATISTICS")).flatMap(line =>
line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
_).collect.foreach(println)


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote:

> bq. is not a member of (String, String)
>
> As shown above, contains shouldn't be applied directly on a tuple.
>
> Choose the element of the tuple and then apply contains on it.
>
> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <mich.talebza...@gmail.com
> > wrote:
>
>> Thank you gents.
>>
>> That should "\n" as carriage return
>>
>> OK I am using spark streaming to analyse the message
>>
>> It does the streaming
>>
>> import _root_.kafka.serializer.StringDecoder
>> import org.apache.spark.SparkConf
>> import org.apache.spark.streaming._
>> import org.apache.spark.streaming.kafka.KafkaUtils
>> //
>> scala> val sparkConf = new SparkConf().
>>      |              setAppName("StreamTest").
>>      |              setMaster("local[12]").
>>      |              set("spark.driver.allowMultipleContexts", "true").
>>      |              set("spark.hadoop.validateOutputSpecs", "false")
>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>> scala>
>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>> kafkaParams: scala.collection.immutable.Map[String,String] =
>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id ->
>> StreamTest)
>> scala> val topic = Set("newtopic")
>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>
>> This part is tricky
>>
>> scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_
>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>> _).collect.foreach(println)
>> <console>:47: error: value contains is not a member of (String, String)
>>          val showlines = messages.filter(_ contains("ASE 15")).filter(_
>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>> _).collect.foreach(println)
>>
>>
>> How does one refer to the content of the stream here?
>>
>> Thanks
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> //
>> // Now want to do some analysis on the same text file
>> //
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> bq. split"\t," splits the filter by carriage return
>>>
>>> Minor correction: "\t" denotes tab character.
>>>
>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io> wrote:
>>>
>>>> Hi Mich,
>>>>
>>>> 1. The first underscore in your filter call is refering to a line in
>>>> the file (as textFile() results in a collection of strings)
>>>> 2. You're correct. No need for it.
>>>> 3. Filter is expecting a Boolean result. So you can merge your contains
>>>> filters to one with AND (&&) statement.
>>>> 4. Correct. Each character in split() is used as a divider.
>>>>
>>>> Eliran Bivas
>>>>
>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
>>>> *Sent:* Apr 3, 2016 15:06
>>>> *To:* Eliran Bivas
>>>> *Cc:* user @spark
>>>> *Subject:* Re: multiple splits fails
>>>>
>>>> Hi Eliran,
>>>>
>>>> Many thanks for your input on this.
>>>>
>>>> I thought about what I was trying to achieve so I rewrote the logic as
>>>> follows:
>>>>
>>>>
>>>>    1. Read the text file in
>>>>    2. Filter out empty lines (well not really needed here)
>>>>    3. Search for lines that contain "ASE 15" and further have sentence
>>>>    "UPDATE INDEX STATISTICS" in the said line
>>>>    4. Split the text by "\t" and ","
>>>>    5. Print the outcome
>>>>
>>>>
>>>> This was what I did with your suggestions included
>>>>
>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>> f.cache()
>>>>  f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_
>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>>>> _).collect.foreach(println)
>>>>
>>>>
>>>> Couple of questions if I may
>>>>
>>>>
>>>>    1. I take that "_" refers to content of the file read in by default?
>>>>    2. _.length > 0 basically filters out blank lines (not really
>>>>    needed here)
>>>>    3. Multiple filters are needed for each *contains* logic
>>>>    4. split"\t," splits the filter by carriage return AND ,?
>>>>
>>>>
>>>> Regards
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote:
>>>>
>>>>> Hi Mich,
>>>>>
>>>>> Few comments:
>>>>>
>>>>> When doing .filter(_ > “”) you’re actually doing a lexicographic
>>>>> comparison and not filtering for empty lines (which could be achieved with
>>>>> _.notEmpty or _.length > 0).
>>>>> I think that filtering with _.contains should be sufficient and the
>>>>> first filter can be omitted.
>>>>>
>>>>> As for line => line.split(“\t”).split(“,”):
>>>>> You have to do a second map or (since split() requires a regex as
>>>>> input) .split(“\t,”).
>>>>> The problem is that your first split() call will generate an Array
>>>>> and then your second call will result in an error.
>>>>> e.g.
>>>>>
>>>>> val lines: Array[String] = line.split(“\t”)
>>>>> lines.split(“,”) // Compilation error - no method split() exists for
>>>>> Array
>>>>>
>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>>>>> map(_.split(“\t,”))
>>>>>
>>>>> Hope that helps.
>>>>>
>>>>> *Eliran Bivas*
>>>>> Data Team | iguaz.io
>>>>>
>>>>>
>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am not sure this is the correct approach
>>>>>
>>>>> Read a text file in
>>>>>
>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>
>>>>>
>>>>> Now I want to get rid of empty lines and filter only the lines that
>>>>> contain "ASE15"
>>>>>
>>>>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>>>>
>>>>> The above works but I am not sure whether I need two filter
>>>>> transformation above? Can it be done in one?
>>>>>
>>>>> Now I want to map the above filter to lines with carriage return ans
>>>>> split them by ","
>>>>>
>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>> (line.split("\t")))
>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[131]
>>>>> at map at <console>:30
>>>>>
>>>>> Now I want to split the output by ","
>>>>>
>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>> (line.split("\t").split(",")))
>>>>> <console>:30: error: value split is not a member of Array[String]
>>>>>               f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>> (line.split("\t").split(",")))
>>>>>
>>>>> ^
>>>>> Any advice will be appreciated
>>>>>
>>>>> Thanks
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to