Thank you gents. That should "\n" as carriage return
OK I am using spark streaming to analyse the message It does the streaming import _root_.kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka.KafkaUtils // scala> val sparkConf = new SparkConf(). | setAppName("StreamTest"). | setMaster("local[12]"). | set("spark.driver.allowMultipleContexts", "true"). | set("spark.hadoop.validateOutputSpecs", "false") scala> val ssc = new StreamingContext(sparkConf, Seconds(55)) scala> scala> val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) kafkaParams: scala.collection.immutable.Map[String,String] = Map(bootstrap.servers -> rhes564:9092, schema.registry.url -> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id -> StreamTest) scala> val topic = Set("newtopic") topic: scala.collection.immutable.Set[String] = Set(newtopic) scala> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c This part is tricky scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).collect.foreach(println) <console>:47: error: value contains is not a member of (String, String) val showlines = messages.filter(_ contains("ASE 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).collect.foreach(println) How does one refer to the content of the stream here? Thanks // // Now want to do some analysis on the same text file // Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote: > bq. split"\t," splits the filter by carriage return > > Minor correction: "\t" denotes tab character. > > On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io> wrote: > >> Hi Mich, >> >> 1. The first underscore in your filter call is refering to a line in the >> file (as textFile() results in a collection of strings) >> 2. You're correct. No need for it. >> 3. Filter is expecting a Boolean result. So you can merge your contains >> filters to one with AND (&&) statement. >> 4. Correct. Each character in split() is used as a divider. >> >> Eliran Bivas >> >> *From:* Mich Talebzadeh <mich.talebza...@gmail.com> >> *Sent:* Apr 3, 2016 15:06 >> *To:* Eliran Bivas >> *Cc:* user @spark >> *Subject:* Re: multiple splits fails >> >> Hi Eliran, >> >> Many thanks for your input on this. >> >> I thought about what I was trying to achieve so I rewrote the logic as >> follows: >> >> >> 1. Read the text file in >> 2. Filter out empty lines (well not really needed here) >> 3. Search for lines that contain "ASE 15" and further have sentence >> "UPDATE INDEX STATISTICS" in the said line >> 4. Split the text by "\t" and "," >> 5. Print the outcome >> >> >> This was what I did with your suggestions included >> >> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >> f.cache() >> f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_ >> contains("UPDATE INDEX STATISTICS")).flatMap(line => >> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ + >> _).collect.foreach(println) >> >> >> Couple of questions if I may >> >> >> 1. I take that "_" refers to content of the file read in by default? >> 2. _.length > 0 basically filters out blank lines (not really needed >> here) >> 3. Multiple filters are needed for each *contains* logic >> 4. split"\t," splits the filter by carriage return AND ,? >> >> >> Regards >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote: >> >>> Hi Mich, >>> >>> Few comments: >>> >>> When doing .filter(_ > “”) you’re actually doing a lexicographic >>> comparison and not filtering for empty lines (which could be achieved with >>> _.notEmpty or _.length > 0). >>> I think that filtering with _.contains should be sufficient and the >>> first filter can be omitted. >>> >>> As for line => line.split(“\t”).split(“,”): >>> You have to do a second map or (since split() requires a regex as >>> input) .split(“\t,”). >>> The problem is that your first split() call will generate an Array and >>> then your second call will result in an error. >>> e.g. >>> >>> val lines: Array[String] = line.split(“\t”) >>> lines.split(“,”) // Compilation error - no method split() exists for >>> Array >>> >>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or >>> map(_.split(“\t,”)) >>> >>> Hope that helps. >>> >>> *Eliran Bivas* >>> Data Team | iguaz.io >>> >>> >>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <mich.talebza...@gmail.com> >>> wrote: >>> >>> Hi, >>> >>> I am not sure this is the correct approach >>> >>> Read a text file in >>> >>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>> >>> >>> Now I want to get rid of empty lines and filter only the lines that >>> contain "ASE15" >>> >>> f.filter(_ > "").filter(_ contains("ASE15")). >>> >>> The above works but I am not sure whether I need two filter >>> transformation above? Can it be done in one? >>> >>> Now I want to map the above filter to lines with carriage return ans >>> split them by "," >>> >>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>> (line.split("\t"))) >>> res88: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[131] >>> at map at <console>:30 >>> >>> Now I want to split the output by "," >>> >>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>> (line.split("\t").split(","))) >>> <console>:30: error: value split is not a member of Array[String] >>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>> (line.split("\t").split(","))) >>> >>> ^ >>> Any advice will be appreciated >>> >>> Thanks >>> >>> Dr Mich Talebzadeh >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> >> >