Since num is an Int, you can specify Integer.MAX_VALUE FYI
On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Thanks Ted. > > As I see print() materializes the first 10 rows. On the other hand > print(n) will materialise n rows. > > How about if I wanted to materialize all rows? > > Cheers > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote: > >> Mich: >> See the following method of DStream: >> >> * Print the first num elements of each RDD generated in this DStream. >> This is an output >> * operator, so this DStream will be registered as an output stream and >> there materialized. >> */ >> def print(num: Int): Unit = ssc.withScope { >> >> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> However this works. I am checking the logic to see if it does what I >>> asked it to do >>> >>> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE >>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, >>> 1)).reduceByKey(_ + _).print >>> >>> scala> ssc.start() >>> >>> scala> ------------------------------------------- >>> Time: 1459701925000 ms >>> ------------------------------------------- >>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15 >>> databases,27) >>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 OR,27) >>> (Once databases are loaded to ASE 15, then you will need to maintain >>> them the way you maintain your PROD. For example run UPDATE INDEX >>> STATISTICS and REORG COMPACT as necessary. One of the frequent mistakes >>> that people do is NOT pruning data from daily log tables in ASE 15 etc as >>> they do it in PROD. This normally results in slower performance on ASE 15 >>> databases as test cycles continue. Use MDA readings to measure daily DML >>> activities on ASE 15 tables and compare them with those of PROD. A 24 hour >>> cycle measurement should be good. If you notice that certain tables have >>> different DML hits (insert/update/delete) compared to PROD you will know >>> that either ASE 15 is not doing everything in terms of batch activity (some >>> jobs are missing), or there is something inconsistent somewhere. ,27) >>> (* Make sure that you have enough tempdb system segment space for UPDATE >>> INDEX STATISTICS. It is always advisable to gauge the tempdb size required >>> in ASE 15 QA and expand the tempdb database in production accordingly. The >>> last thing you want is to blow up tempdb over the migration weekend.,27) >>> (o In ASE 15 you can subdivide the task by running parallel UPDATE INDEX >>> STATISTICS on different tables in the same database at the same time. Watch >>> tempdb segment growth though! OR,27) >>> >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> On 3 April 2016 at 17:06, Mich Talebzadeh <mich.talebza...@gmail.com> >>> wrote: >>> >>>> Thanks Ted >>>> >>>> This works >>>> >>>> scala> val messages = KafkaUtils.createDirectStream[String, String, >>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String, >>>> String)] = >>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063 >>>> >>>> scala> // Get the lines >>>> scala> val lines = messages.map(_._2) >>>> lines: org.apache.spark.streaming.dstream.DStream[String] = >>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64 >>>> >>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) >>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] = >>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d >>>> >>>> However, this fails >>>> >>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>> _).collect.foreach(println) >>>> <console>:43: error: value collect is not a member of >>>> org.apache.spark.streaming.dstream.DStream[(String, Int)] >>>> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>> _).collect.foreach(println) >>>> >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> >>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> bq. is not a member of (String, String) >>>>> >>>>> As shown above, contains shouldn't be applied directly on a tuple. >>>>> >>>>> Choose the element of the tuple and then apply contains on it. >>>>> >>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh < >>>>> mich.talebza...@gmail.com> wrote: >>>>> >>>>>> Thank you gents. >>>>>> >>>>>> That should "\n" as carriage return >>>>>> >>>>>> OK I am using spark streaming to analyse the message >>>>>> >>>>>> It does the streaming >>>>>> >>>>>> import _root_.kafka.serializer.StringDecoder >>>>>> import org.apache.spark.SparkConf >>>>>> import org.apache.spark.streaming._ >>>>>> import org.apache.spark.streaming.kafka.KafkaUtils >>>>>> // >>>>>> scala> val sparkConf = new SparkConf(). >>>>>> | setAppName("StreamTest"). >>>>>> | setMaster("local[12]"). >>>>>> | set("spark.driver.allowMultipleContexts", "true"). >>>>>> | set("spark.hadoop.validateOutputSpecs", "false") >>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55)) >>>>>> scala> >>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" -> >>>>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", >>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) >>>>>> kafkaParams: scala.collection.immutable.Map[String,String] = >>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url -> >>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id -> >>>>>> StreamTest) >>>>>> scala> val topic = Set("newtopic") >>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic) >>>>>> scala> val messages = KafkaUtils.createDirectStream[String, String, >>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String, >>>>>> String)] = >>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c >>>>>> >>>>>> This part is tricky >>>>>> >>>>>> scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_ >>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>> _).collect.foreach(println) >>>>>> <console>:47: error: value contains is not a member of (String, >>>>>> String) >>>>>> val showlines = messages.filter(_ contains("ASE >>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>> _).collect.foreach(println) >>>>>> >>>>>> >>>>>> How does one refer to the content of the stream here? >>>>>> >>>>>> Thanks >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> // >>>>>> // Now want to do some analysis on the same text file >>>>>> // >>>>>> >>>>>> >>>>>> >>>>>> Dr Mich Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> LinkedIn * >>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>> >>>>>> >>>>>> >>>>>> http://talebzadehmich.wordpress.com >>>>>> >>>>>> >>>>>> >>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>> >>>>>>> bq. split"\t," splits the filter by carriage return >>>>>>> >>>>>>> Minor correction: "\t" denotes tab character. >>>>>>> >>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Mich, >>>>>>>> >>>>>>>> 1. The first underscore in your filter call is refering to a line >>>>>>>> in the file (as textFile() results in a collection of strings) >>>>>>>> 2. You're correct. No need for it. >>>>>>>> 3. Filter is expecting a Boolean result. So you can merge your >>>>>>>> contains filters to one with AND (&&) statement. >>>>>>>> 4. Correct. Each character in split() is used as a divider. >>>>>>>> >>>>>>>> Eliran Bivas >>>>>>>> >>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com> >>>>>>>> *Sent:* Apr 3, 2016 15:06 >>>>>>>> *To:* Eliran Bivas >>>>>>>> *Cc:* user @spark >>>>>>>> *Subject:* Re: multiple splits fails >>>>>>>> >>>>>>>> Hi Eliran, >>>>>>>> >>>>>>>> Many thanks for your input on this. >>>>>>>> >>>>>>>> I thought about what I was trying to achieve so I rewrote the logic >>>>>>>> as follows: >>>>>>>> >>>>>>>> >>>>>>>> 1. Read the text file in >>>>>>>> 2. Filter out empty lines (well not really needed here) >>>>>>>> 3. Search for lines that contain "ASE 15" and further have >>>>>>>> sentence "UPDATE INDEX STATISTICS" in the said line >>>>>>>> 4. Split the text by "\t" and "," >>>>>>>> 5. Print the outcome >>>>>>>> >>>>>>>> >>>>>>>> This was what I did with your suggestions included >>>>>>>> >>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>> f.cache() >>>>>>>> f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_ >>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>> _).collect.foreach(println) >>>>>>>> >>>>>>>> >>>>>>>> Couple of questions if I may >>>>>>>> >>>>>>>> >>>>>>>> 1. I take that "_" refers to content of the file read in by >>>>>>>> default? >>>>>>>> 2. _.length > 0 basically filters out blank lines (not really >>>>>>>> needed here) >>>>>>>> 3. Multiple filters are needed for each *contains* logic >>>>>>>> 4. split"\t," splits the filter by carriage return AND ,? >>>>>>>> >>>>>>>> >>>>>>>> Regards >>>>>>>> >>>>>>>> >>>>>>>> Dr Mich Talebzadeh >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> LinkedIn * >>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote: >>>>>>>> >>>>>>>>> Hi Mich, >>>>>>>>> >>>>>>>>> Few comments: >>>>>>>>> >>>>>>>>> When doing .filter(_ > “”) you’re actually doing a lexicographic >>>>>>>>> comparison and not filtering for empty lines (which could be achieved >>>>>>>>> with >>>>>>>>> _.notEmpty or _.length > 0). >>>>>>>>> I think that filtering with _.contains should be sufficient and >>>>>>>>> the first filter can be omitted. >>>>>>>>> >>>>>>>>> As for line => line.split(“\t”).split(“,”): >>>>>>>>> You have to do a second map or (since split() requires a regex as >>>>>>>>> input) .split(“\t,”). >>>>>>>>> The problem is that your first split() call will generate an Array >>>>>>>>> and then your second call will result in an error. >>>>>>>>> e.g. >>>>>>>>> >>>>>>>>> val lines: Array[String] = line.split(“\t”) >>>>>>>>> lines.split(“,”) // Compilation error - no method split() exists >>>>>>>>> for Array >>>>>>>>> >>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or >>>>>>>>> map(_.split(“\t,”)) >>>>>>>>> >>>>>>>>> Hope that helps. >>>>>>>>> >>>>>>>>> *Eliran Bivas* >>>>>>>>> Data Team | iguaz.io >>>>>>>>> >>>>>>>>> >>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh < >>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I am not sure this is the correct approach >>>>>>>>> >>>>>>>>> Read a text file in >>>>>>>>> >>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>>> >>>>>>>>> >>>>>>>>> Now I want to get rid of empty lines and filter only the lines >>>>>>>>> that contain "ASE15" >>>>>>>>> >>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")). >>>>>>>>> >>>>>>>>> The above works but I am not sure whether I need two filter >>>>>>>>> transformation above? Can it be done in one? >>>>>>>>> >>>>>>>>> Now I want to map the above filter to lines with carriage return >>>>>>>>> ans split them by "," >>>>>>>>> >>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>>>> (line.split("\t"))) >>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] = >>>>>>>>> MapPartitionsRDD[131] at map at <console>:30 >>>>>>>>> >>>>>>>>> Now I want to split the output by "," >>>>>>>>> >>>>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>>>> (line.split("\t").split(","))) >>>>>>>>> <console>:30: error: value split is not a member of Array[String] >>>>>>>>> f.filter(_ > "").filter(_ >>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(","))) >>>>>>>>> >>>>>>>>> ^ >>>>>>>>> Any advice will be appreciated >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> Dr Mich Talebzadeh >>>>>>>>> >>>>>>>>> >>>>>>>>> LinkedIn * >>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>> >>>>>>>>> >>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >