Hi Instead of using print() directly on Dstream, I will suggest you use foreachRDD if you wanted to materialize all rows , example shown here:-
https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams dstream.foreachRDD(rdd => { val connection = createNewConnection() // executed at the driver rdd.foreach(record => { connection.send(record) // executed at the worker }) }) On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > I am afraid print(Integer.MAX_VALUE) does not return any lines! However, > print(1000) does > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 3 April 2016 at 19:46, Ted Yu <yuzhih...@gmail.com> wrote: > >> Since num is an Int, you can specify Integer.MAX_VALUE >> >> FYI >> >> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> Thanks Ted. >>> >>> As I see print() materializes the first 10 rows. On the other hand >>> print(n) will materialise n rows. >>> >>> How about if I wanted to materialize all rows? >>> >>> Cheers >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> Mich: >>>> See the following method of DStream: >>>> >>>> * Print the first num elements of each RDD generated in this >>>> DStream. This is an output >>>> * operator, so this DStream will be registered as an output stream >>>> and there materialized. >>>> */ >>>> def print(num: Int): Unit = ssc.withScope { >>>> >>>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> However this works. I am checking the logic to see if it does what I >>>>> asked it to do >>>>> >>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE >>>>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, >>>>> 1)).reduceByKey(_ + _).print >>>>> >>>>> scala> ssc.start() >>>>> >>>>> scala> ------------------------------------------- >>>>> Time: 1459701925000 ms >>>>> ------------------------------------------- >>>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15 >>>>> databases,27) >>>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 OR,27) >>>>> (Once databases are loaded to ASE 15, then you will need to maintain >>>>> them the way you maintain your PROD. For example run UPDATE INDEX >>>>> STATISTICS and REORG COMPACT as necessary. One of the frequent mistakes >>>>> that people do is NOT pruning data from daily log tables in ASE 15 etc as >>>>> they do it in PROD. This normally results in slower performance on ASE 15 >>>>> databases as test cycles continue. Use MDA readings to measure daily DML >>>>> activities on ASE 15 tables and compare them with those of PROD. A 24 hour >>>>> cycle measurement should be good. If you notice that certain tables have >>>>> different DML hits (insert/update/delete) compared to PROD you will know >>>>> that either ASE 15 is not doing everything in terms of batch activity >>>>> (some >>>>> jobs are missing), or there is something inconsistent somewhere. ,27) >>>>> (* Make sure that you have enough tempdb system segment space for >>>>> UPDATE INDEX STATISTICS. It is always advisable to gauge the tempdb size >>>>> required in ASE 15 QA and expand the tempdb database in production >>>>> accordingly. The last thing you want is to blow up tempdb over the >>>>> migration weekend.,27) >>>>> (o In ASE 15 you can subdivide the task by running parallel UPDATE >>>>> INDEX STATISTICS on different tables in the same database at the same >>>>> time. >>>>> Watch tempdb segment growth though! OR,27) >>>>> >>>>> >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> >>>>> >>>>> LinkedIn * >>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>> >>>>> >>>>> >>>>> http://talebzadehmich.wordpress.com >>>>> >>>>> >>>>> >>>>> On 3 April 2016 at 17:06, Mich Talebzadeh <mich.talebza...@gmail.com> >>>>> wrote: >>>>> >>>>>> Thanks Ted >>>>>> >>>>>> This works >>>>>> >>>>>> scala> val messages = KafkaUtils.createDirectStream[String, String, >>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String, >>>>>> String)] = >>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063 >>>>>> >>>>>> scala> // Get the lines >>>>>> scala> val lines = messages.map(_._2) >>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] = >>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64 >>>>>> >>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) >>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] = >>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d >>>>>> >>>>>> However, this fails >>>>>> >>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>> _).collect.foreach(println) >>>>>> <console>:43: error: value collect is not a member of >>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)] >>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ >>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>> _).collect.foreach(println) >>>>>> >>>>>> >>>>>> Dr Mich Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> LinkedIn * >>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>> >>>>>> >>>>>> >>>>>> http://talebzadehmich.wordpress.com >>>>>> >>>>>> >>>>>> >>>>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>> >>>>>>> bq. is not a member of (String, String) >>>>>>> >>>>>>> As shown above, contains shouldn't be applied directly on a tuple. >>>>>>> >>>>>>> Choose the element of the tuple and then apply contains on it. >>>>>>> >>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh < >>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>> >>>>>>>> Thank you gents. >>>>>>>> >>>>>>>> That should "\n" as carriage return >>>>>>>> >>>>>>>> OK I am using spark streaming to analyse the message >>>>>>>> >>>>>>>> It does the streaming >>>>>>>> >>>>>>>> import _root_.kafka.serializer.StringDecoder >>>>>>>> import org.apache.spark.SparkConf >>>>>>>> import org.apache.spark.streaming._ >>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils >>>>>>>> // >>>>>>>> scala> val sparkConf = new SparkConf(). >>>>>>>> | setAppName("StreamTest"). >>>>>>>> | setMaster("local[12]"). >>>>>>>> | set("spark.driver.allowMultipleContexts", >>>>>>>> "true"). >>>>>>>> | set("spark.hadoop.validateOutputSpecs", "false") >>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55)) >>>>>>>> scala> >>>>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" -> >>>>>>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", >>>>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) >>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] = >>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url -> >>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id >>>>>>>> -> StreamTest) >>>>>>>> scala> val topic = Set("newtopic") >>>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic) >>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, String, >>>>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic) >>>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String, >>>>>>>> String)] = >>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c >>>>>>>> >>>>>>>> This part is tricky >>>>>>>> >>>>>>>> scala> val showlines = messages.filter(_ contains("ASE >>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>> _).collect.foreach(println) >>>>>>>> <console>:47: error: value contains is not a member of (String, >>>>>>>> String) >>>>>>>> val showlines = messages.filter(_ contains("ASE >>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>> _).collect.foreach(println) >>>>>>>> >>>>>>>> >>>>>>>> How does one refer to the content of the stream here? >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> // >>>>>>>> // Now want to do some analysis on the same text file >>>>>>>> // >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Dr Mich Talebzadeh >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> LinkedIn * >>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>> >>>>>>>>> bq. split"\t," splits the filter by carriage return >>>>>>>>> >>>>>>>>> Minor correction: "\t" denotes tab character. >>>>>>>>> >>>>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Mich, >>>>>>>>>> >>>>>>>>>> 1. The first underscore in your filter call is refering to a line >>>>>>>>>> in the file (as textFile() results in a collection of strings) >>>>>>>>>> 2. You're correct. No need for it. >>>>>>>>>> 3. Filter is expecting a Boolean result. So you can merge your >>>>>>>>>> contains filters to one with AND (&&) statement. >>>>>>>>>> 4. Correct. Each character in split() is used as a divider. >>>>>>>>>> >>>>>>>>>> Eliran Bivas >>>>>>>>>> >>>>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com> >>>>>>>>>> *Sent:* Apr 3, 2016 15:06 >>>>>>>>>> *To:* Eliran Bivas >>>>>>>>>> *Cc:* user @spark >>>>>>>>>> *Subject:* Re: multiple splits fails >>>>>>>>>> >>>>>>>>>> Hi Eliran, >>>>>>>>>> >>>>>>>>>> Many thanks for your input on this. >>>>>>>>>> >>>>>>>>>> I thought about what I was trying to achieve so I rewrote the >>>>>>>>>> logic as follows: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 1. Read the text file in >>>>>>>>>> 2. Filter out empty lines (well not really needed here) >>>>>>>>>> 3. Search for lines that contain "ASE 15" and further have >>>>>>>>>> sentence "UPDATE INDEX STATISTICS" in the said line >>>>>>>>>> 4. Split the text by "\t" and "," >>>>>>>>>> 5. Print the outcome >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> This was what I did with your suggestions included >>>>>>>>>> >>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>>>> f.cache() >>>>>>>>>> f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_ >>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>>>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ + >>>>>>>>>> _).collect.foreach(println) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Couple of questions if I may >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 1. I take that "_" refers to content of the file read in by >>>>>>>>>> default? >>>>>>>>>> 2. _.length > 0 basically filters out blank lines (not really >>>>>>>>>> needed here) >>>>>>>>>> 3. Multiple filters are needed for each *contains* logic >>>>>>>>>> 4. split"\t," splits the filter by carriage return AND ,? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Regards >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> LinkedIn * >>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Mich, >>>>>>>>>>> >>>>>>>>>>> Few comments: >>>>>>>>>>> >>>>>>>>>>> When doing .filter(_ > “”) you’re actually doing a lexicographic >>>>>>>>>>> comparison and not filtering for empty lines (which could be >>>>>>>>>>> achieved with >>>>>>>>>>> _.notEmpty or _.length > 0). >>>>>>>>>>> I think that filtering with _.contains should be sufficient and >>>>>>>>>>> the first filter can be omitted. >>>>>>>>>>> >>>>>>>>>>> As for line => line.split(“\t”).split(“,”): >>>>>>>>>>> You have to do a second map or (since split() requires a regex >>>>>>>>>>> as input) .split(“\t,”). >>>>>>>>>>> The problem is that your first split() call will generate an >>>>>>>>>>> Array and then your second call will result in an error. >>>>>>>>>>> e.g. >>>>>>>>>>> >>>>>>>>>>> val lines: Array[String] = line.split(“\t”) >>>>>>>>>>> lines.split(“,”) // Compilation error - no method split() exists >>>>>>>>>>> for Array >>>>>>>>>>> >>>>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or >>>>>>>>>>> map(_.split(“\t,”)) >>>>>>>>>>> >>>>>>>>>>> Hope that helps. >>>>>>>>>>> >>>>>>>>>>> *Eliran Bivas* >>>>>>>>>>> Data Team | iguaz.io >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh < >>>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I am not sure this is the correct approach >>>>>>>>>>> >>>>>>>>>>> Read a text file in >>>>>>>>>>> >>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Now I want to get rid of empty lines and filter only the lines >>>>>>>>>>> that contain "ASE15" >>>>>>>>>>> >>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")). >>>>>>>>>>> >>>>>>>>>>> The above works but I am not sure whether I need two filter >>>>>>>>>>> transformation above? Can it be done in one? >>>>>>>>>>> >>>>>>>>>>> Now I want to map the above filter to lines with carriage return >>>>>>>>>>> ans split them by "," >>>>>>>>>>> >>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>>>>>> (line.split("\t"))) >>>>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] = >>>>>>>>>>> MapPartitionsRDD[131] at map at <console>:30 >>>>>>>>>>> >>>>>>>>>>> Now I want to split the output by "," >>>>>>>>>>> >>>>>>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>>>>>>>>> (line.split("\t").split(","))) >>>>>>>>>>> <console>:30: error: value split is not a member of Array[String] >>>>>>>>>>> f.filter(_ > "").filter(_ >>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(","))) >>>>>>>>>>> >>>>>>>>>>> ^ >>>>>>>>>>> Any advice will be appreciated >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> >>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> LinkedIn * >>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > -- Thanks & Regards Sachin Aggarwal 7760502772