Thanks Ted This works
scala> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063 scala> // Get the lines scala> val lines = messages.map(_._2) lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@1e4afd64 scala> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) v: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d However, this fails scala> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).collect.foreach(println) <console>:43: error: value collect is not a member of org.apache.spark.streaming.dstream.DStream[(String, Int)] val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).collect.foreach(println) Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote: > bq. is not a member of (String, String) > > As shown above, contains shouldn't be applied directly on a tuple. > > Choose the element of the tuple and then apply contains on it. > > On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <mich.talebza...@gmail.com > > wrote: > >> Thank you gents. >> >> That should "\n" as carriage return >> >> OK I am using spark streaming to analyse the message >> >> It does the streaming >> >> import _root_.kafka.serializer.StringDecoder >> import org.apache.spark.SparkConf >> import org.apache.spark.streaming._ >> import org.apache.spark.streaming.kafka.KafkaUtils >> // >> scala> val sparkConf = new SparkConf(). >> | setAppName("StreamTest"). >> | setMaster("local[12]"). >> | set("spark.driver.allowMultipleContexts", "true"). >> | set("spark.hadoop.validateOutputSpecs", "false") >> scala> val ssc = new StreamingContext(sparkConf, Seconds(55)) >> scala> >> scala> val kafkaParams = Map[String, String]("bootstrap.servers" -> >> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", >> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) >> kafkaParams: scala.collection.immutable.Map[String,String] = >> Map(bootstrap.servers -> rhes564:9092, schema.registry.url -> >> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id -> >> StreamTest) >> scala> val topic = Set("newtopic") >> topic: scala.collection.immutable.Set[String] = Set(newtopic) >> scala> val messages = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder](ssc, kafkaParams, topic) >> messages: org.apache.spark.streaming.dstream.InputDStream[(String, >> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c >> >> This part is tricky >> >> scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_ >> contains("UPDATE INDEX STATISTICS")).flatMap(line => >> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >> _).collect.foreach(println) >> <console>:47: error: value contains is not a member of (String, String) >> val showlines = messages.filter(_ contains("ASE 15")).filter(_ >> contains("UPDATE INDEX STATISTICS")).flatMap(line => >> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >> _).collect.foreach(println) >> >> >> How does one refer to the content of the stream here? >> >> Thanks >> >> >> >> >> >> >> >> >> >> >> // >> // Now want to do some analysis on the same text file >> // >> >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> bq. split"\t," splits the filter by carriage return >>> >>> Minor correction: "\t" denotes tab character. >>> >>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io> wrote: >>> >>>> Hi Mich, >>>> >>>> 1. The first underscore in your filter call is refering to a line in >>>> the file (as textFile() results in a collection of strings) >>>> 2. You're correct. No need for it. >>>> 3. Filter is expecting a Boolean result. So you can merge your contains >>>> filters to one with AND (&&) statement. >>>> 4. Correct. Each character in split() is used as a divider. >>>> >>>> Eliran Bivas >>>> >>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com> >>>> *Sent:* Apr 3, 2016 15:06 >>>> *To:* Eliran Bivas >>>> *Cc:* user @spark >>>> *Subject:* Re: multiple splits fails >>>> >>>> Hi Eliran, >>>> >>>> Many thanks for your input on this. >>>> >>>> I thought about what I was trying to achieve so I rewrote the logic as >>>> follows: >>>> >>>> >>>> 1. Read the text file in >>>> 2. Filter out empty lines (well not really needed here) >>>> 3. Search for lines that contain "ASE 15" and further have sentence >>>> "UPDATE INDEX STATISTICS" in the said line >>>> 4. Split the text by "\t" and "," >>>> 5. Print the outcome >>>> >>>> >>>> This was what I did with your suggestions included >>>> >>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>> f.cache() >>>> f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_ >>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ + >>>> _).collect.foreach(println) >>>> >>>> >>>> Couple of questions if I may >>>> >>>> >>>> 1. I take that "_" refers to content of the file read in by default? >>>> 2. _.length > 0 basically filters out blank lines (not really >>>> needed here) >>>> 3. Multiple filters are needed for each *contains* logic >>>> 4. split"\t," splits the filter by carriage return AND ,? >>>> >>>> >>>> Regards >>>> >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> >>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote: >>>> >>>>> Hi Mich, >>>>> >>>>> Few comments: >>>>> >>>>> When doing .filter(_ > “”) you’re actually doing a lexicographic >>>>> comparison and not filtering for empty lines (which could be achieved with >>>>> _.notEmpty or _.length > 0). >>>>> I think that filtering with _.contains should be sufficient and the >>>>> first filter can be omitted. >>>>> >>>>> As for line => line.split(“\t”).split(“,”): >>>>> You have to do a second map or (since split() requires a regex as >>>>> input) .split(“\t,”). >>>>> The problem is that your first split() call will generate an Array >>>>> and then your second call will result in an error. >>>>> e.g. >>>>> >>>>> val lines: Array[String] = line.split(“\t”) >>>>> lines.split(“,”) // Compilation error - no method split() exists for >>>>> Array >>>>> >>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or >>>>> map(_.split(“\t,”)) >>>>> >>>>> Hope that helps. >>>>> >>>>> *Eliran Bivas* >>>>> Data Team | iguaz.io >>>>> >>>>> >>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <mich.talebza...@gmail.com> >>>>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I am not sure this is the correct approach >>>>> >>>>> Read a text file in >>>>> >>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>> >>>>> >>>>> Now I want to get rid of empty lines and filter only the lines that >>>>> contain "ASE15" >>>>> >>>>> f.filter(_ > "").filter(_ contains("ASE15")). >>>>> >>>>> The above works but I am not sure whether I need two filter >>>>> transformation above? Can it be done in one? >>>>> >>>>> Now I want to map the above filter to lines with carriage return ans >>>>> split them by "," >>>>> >>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>> (line.split("\t"))) >>>>> res88: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[131] >>>>> at map at <console>:30 >>>>> >>>>> Now I want to split the output by "," >>>>> >>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>> (line.split("\t").split(","))) >>>>> <console>:30: error: value split is not a member of Array[String] >>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>> (line.split("\t").split(","))) >>>>> >>>>> ^ >>>>> Any advice will be appreciated >>>>> >>>>> Thanks >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> >>>>> LinkedIn * >>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>> >>>>> >>>>> http://talebzadehmich.wordpress.com >>>>> >>>>> >>>>> >>>>> >>>> >>> >> >