Mich: See the following method of DStream: * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ def print(num: Int): Unit = ssc.withScope {
On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > However this works. I am checking the logic to see if it does what I asked > it to do > > val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE INDEX > STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, > 1)).reduceByKey(_ + _).print > > scala> ssc.start() > > scala> ------------------------------------------- > Time: 1459701925000 ms > ------------------------------------------- > (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15 > databases,27) > (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 OR,27) > (Once databases are loaded to ASE 15, then you will need to maintain them > the way you maintain your PROD. For example run UPDATE INDEX STATISTICS and > REORG COMPACT as necessary. One of the frequent mistakes that people do is > NOT pruning data from daily log tables in ASE 15 etc as they do it in PROD. > This normally results in slower performance on ASE 15 databases as test > cycles continue. Use MDA readings to measure daily DML activities on ASE 15 > tables and compare them with those of PROD. A 24 hour cycle measurement > should be good. If you notice that certain tables have different DML hits > (insert/update/delete) compared to PROD you will know that either ASE 15 is > not doing everything in terms of batch activity (some jobs are missing), or > there is something inconsistent somewhere. ,27) > (* Make sure that you have enough tempdb system segment space for UPDATE > INDEX STATISTICS. It is always advisable to gauge the tempdb size required > in ASE 15 QA and expand the tempdb database in production accordingly. The > last thing you want is to blow up tempdb over the migration weekend.,27) > (o In ASE 15 you can subdivide the task by running parallel UPDATE INDEX > STATISTICS on different tables in the same database at the same time. Watch > tempdb segment growth though! OR,27) > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 3 April 2016 at 17:06, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> Thanks Ted >> >> This works >> >> scala> val messages = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder](ssc, kafkaParams, topic) >> messages: org.apache.spark.streaming.dstream.InputDStream[(String, >> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063 >> >> scala> // Get the lines >> scala> val lines = messages.map(_._2) >> lines: org.apache.spark.streaming.dstream.DStream[String] = >> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64 >> >> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >> contains("UPDATE INDEX STATISTICS")).flatMap(line => >> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) >> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] = >> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d >> >> However, this fails >> >> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >> contains("UPDATE INDEX STATISTICS")).flatMap(line => >> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >> _).collect.foreach(println) >> <console>:43: error: value collect is not a member of >> org.apache.spark.streaming.dstream.DStream[(String, Int)] >> val v = lines.filter(_.contains("ASE 15")).filter(_ >> contains("UPDATE INDEX STATISTICS")).flatMap(line => >> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >> _).collect.foreach(println) >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> bq. is not a member of (String, String) >>> >>> As shown above, contains shouldn't be applied directly on a tuple. >>> >>> Choose the element of the tuple and then apply contains on it. >>> >>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Thank you gents. >>>> >>>> That should "\n" as carriage return >>>> >>>> OK I am using spark streaming to analyse the message >>>> >>>> It does the streaming >>>> >>>> import _root_.kafka.serializer.StringDecoder >>>> import org.apache.spark.SparkConf >>>> import org.apache.spark.streaming._ >>>> import org.apache.spark.streaming.kafka.KafkaUtils >>>> // >>>> scala> val sparkConf = new SparkConf(). >>>> | setAppName("StreamTest"). >>>> | setMaster("local[12]"). >>>> | set("spark.driver.allowMultipleContexts", "true"). >>>> | set("spark.hadoop.validateOutputSpecs", "false") >>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55)) >>>> scala> >>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" -> >>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", >>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) >>>> kafkaParams: scala.collection.immutable.Map[String,String] = >>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url -> >>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id -> >>>> StreamTest) >>>> scala> val topic = Set("newtopic") >>>> topic: scala.collection.immutable.Set[String] = Set(newtopic) >>>> scala> val messages = KafkaUtils.createDirectStream[String, String, >>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String, >>>> String)] = >>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c >>>> >>>> This part is tricky >>>> >>>> scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_ >>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>> _).collect.foreach(println) >>>> <console>:47: error: value contains is not a member of (String, String) >>>> val showlines = messages.filter(_ contains("ASE 15")).filter(_ >>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>> _).collect.foreach(println) >>>> >>>> >>>> How does one refer to the content of the stream here? >>>> >>>> Thanks >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> // >>>> // Now want to do some analysis on the same text file >>>> // >>>> >>>> >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> >>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> bq. split"\t," splits the filter by carriage return >>>>> >>>>> Minor correction: "\t" denotes tab character. >>>>> >>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io> wrote: >>>>> >>>>>> Hi Mich, >>>>>> >>>>>> 1. The first underscore in your filter call is refering to a line in >>>>>> the file (as textFile() results in a collection of strings) >>>>>> 2. You're correct. No need for it. >>>>>> 3. Filter is expecting a Boolean result. So you can merge your >>>>>> contains filters to one with AND (&&) statement. >>>>>> 4. Correct. Each character in split() is used as a divider. >>>>>> >>>>>> Eliran Bivas >>>>>> >>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com> >>>>>> *Sent:* Apr 3, 2016 15:06 >>>>>> *To:* Eliran Bivas >>>>>> *Cc:* user @spark >>>>>> *Subject:* Re: multiple splits fails >>>>>> >>>>>> Hi Eliran, >>>>>> >>>>>> Many thanks for your input on this. >>>>>> >>>>>> I thought about what I was trying to achieve so I rewrote the logic >>>>>> as follows: >>>>>> >>>>>> >>>>>> 1. Read the text file in >>>>>> 2. Filter out empty lines (well not really needed here) >>>>>> 3. Search for lines that contain "ASE 15" and further have >>>>>> sentence "UPDATE INDEX STATISTICS" in the said line >>>>>> 4. Split the text by "\t" and "," >>>>>> 5. Print the outcome >>>>>> >>>>>> >>>>>> This was what I did with your suggestions included >>>>>> >>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>> f.cache() >>>>>> f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_ >>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ + >>>>>> _).collect.foreach(println) >>>>>> >>>>>> >>>>>> Couple of questions if I may >>>>>> >>>>>> >>>>>> 1. I take that "_" refers to content of the file read in by >>>>>> default? >>>>>> 2. _.length > 0 basically filters out blank lines (not really >>>>>> needed here) >>>>>> 3. Multiple filters are needed for each *contains* logic >>>>>> 4. split"\t," splits the filter by carriage return AND ,? >>>>>> >>>>>> >>>>>> Regards >>>>>> >>>>>> >>>>>> Dr Mich Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> LinkedIn * >>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>> >>>>>> >>>>>> >>>>>> http://talebzadehmich.wordpress.com >>>>>> >>>>>> >>>>>> >>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote: >>>>>> >>>>>>> Hi Mich, >>>>>>> >>>>>>> Few comments: >>>>>>> >>>>>>> When doing .filter(_ > “”) you’re actually doing a lexicographic >>>>>>> comparison and not filtering for empty lines (which could be achieved >>>>>>> with >>>>>>> _.notEmpty or _.length > 0). >>>>>>> I think that filtering with _.contains should be sufficient and the >>>>>>> first filter can be omitted. >>>>>>> >>>>>>> As for line => line.split(“\t”).split(“,”): >>>>>>> You have to do a second map or (since split() requires a regex as >>>>>>> input) .split(“\t,”). >>>>>>> The problem is that your first split() call will generate an Array >>>>>>> and then your second call will result in an error. >>>>>>> e.g. >>>>>>> >>>>>>> val lines: Array[String] = line.split(“\t”) >>>>>>> lines.split(“,”) // Compilation error - no method split() exists for >>>>>>> Array >>>>>>> >>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or >>>>>>> map(_.split(“\t,”)) >>>>>>> >>>>>>> Hope that helps. >>>>>>> >>>>>>> *Eliran Bivas* >>>>>>> Data Team | iguaz.io >>>>>>> >>>>>>> >>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <mich.talebza...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I am not sure this is the correct approach >>>>>>> >>>>>>> Read a text file in >>>>>>> >>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>> >>>>>>> >>>>>>> Now I want to get rid of empty lines and filter only the lines that >>>>>>> contain "ASE15" >>>>>>> >>>>>>> f.filter(_ > "").filter(_ contains("ASE15")). >>>>>>> >>>>>>> The above works but I am not sure whether I need two filter >>>>>>> transformation above? Can it be done in one? >>>>>>> >>>>>>> Now I want to map the above filter to lines with carriage return ans >>>>>>> split them by "," >>>>>>> >>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>> (line.split("\t"))) >>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] = >>>>>>> MapPartitionsRDD[131] at map at <console>:30 >>>>>>> >>>>>>> Now I want to split the output by "," >>>>>>> >>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>> (line.split("\t").split(","))) >>>>>>> <console>:30: error: value split is not a member of Array[String] >>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line >>>>>>> => (line.split("\t").split(","))) >>>>>>> >>>>>>> ^ >>>>>>> Any advice will be appreciated >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> Dr Mich Talebzadeh >>>>>>> >>>>>>> >>>>>>> LinkedIn * >>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>> >>>>>>> >>>>>>> http://talebzadehmich.wordpress.com >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >