Thanks Ted.

As I see print() materializes the first 10 rows. On the other hand
print(n) will materialise n rows.

How about if I wanted to materialize all rows?

Cheers

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote:

> Mich:
> See the following method of DStream:
>
>    * Print the first num elements of each RDD generated in this DStream.
> This is an output
>    * operator, so this DStream will be registered as an output stream and
> there materialized.
>    */
>   def print(num: Int): Unit = ssc.withScope {
>
> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <mich.talebza...@gmail.com
> > wrote:
>
>> However this works. I am checking the logic to see if it does what I
>> asked it to do
>>
>> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE
>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word,
>> 1)).reduceByKey(_ + _).print
>>
>> scala> ssc.start()
>>
>> scala> -------------------------------------------
>> Time: 1459701925000 ms
>> -------------------------------------------
>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15
>> databases,27)
>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 OR,27)
>> (Once databases are loaded to ASE 15, then you will need to maintain them
>> the way you maintain your PROD. For example run UPDATE INDEX STATISTICS and
>> REORG COMPACT as necessary. One of the frequent mistakes that people do is
>> NOT pruning data from daily log tables in ASE 15 etc as they do it in PROD.
>> This normally results in slower performance on ASE 15 databases as test
>> cycles continue. Use MDA readings to measure daily DML activities on ASE 15
>> tables and compare them with those of PROD. A 24 hour cycle measurement
>> should be good. If you notice that certain tables have different DML hits
>> (insert/update/delete) compared to PROD you will know that either ASE 15 is
>> not doing everything in terms of batch activity (some jobs are missing), or
>> there is something inconsistent somewhere. ,27)
>> (* Make sure that you have enough tempdb system segment space for UPDATE
>> INDEX STATISTICS. It is always advisable to gauge the tempdb size required
>> in ASE 15 QA and expand the tempdb database in production accordingly. The
>> last thing you want is to blow up tempdb over the migration weekend.,27)
>> (o In ASE 15 you can subdivide the task by running parallel UPDATE INDEX
>> STATISTICS on different tables in the same database at the same time. Watch
>> tempdb segment growth though! OR,27)
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 3 April 2016 at 17:06, Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> Thanks Ted
>>>
>>> This works
>>>
>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063
>>>
>>> scala> // Get the lines
>>> scala> val lines = messages.map(_._2)
>>> lines: org.apache.spark.streaming.dstream.DStream[String] =
>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64
>>>
>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>>>
>>> However, this fails
>>>
>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>> _).collect.foreach(println)
>>> <console>:43: error: value collect is not a member of
>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>          val v = lines.filter(_.contains("ASE 15")).filter(_
>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>> _).collect.foreach(println)
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> bq. is not a member of (String, String)
>>>>
>>>> As shown above, contains shouldn't be applied directly on a tuple.
>>>>
>>>> Choose the element of the tuple and then apply contains on it.
>>>>
>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Thank you gents.
>>>>>
>>>>> That should "\n" as carriage return
>>>>>
>>>>> OK I am using spark streaming to analyse the message
>>>>>
>>>>> It does the streaming
>>>>>
>>>>> import _root_.kafka.serializer.StringDecoder
>>>>> import org.apache.spark.SparkConf
>>>>> import org.apache.spark.streaming._
>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>> //
>>>>> scala> val sparkConf = new SparkConf().
>>>>>      |              setAppName("StreamTest").
>>>>>      |              setMaster("local[12]").
>>>>>      |              set("spark.driver.allowMultipleContexts", "true").
>>>>>      |              set("spark.hadoop.validateOutputSpecs", "false")
>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>> scala>
>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id ->
>>>>> StreamTest)
>>>>> scala> val topic = Set("newtopic")
>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>>> String)] = 
>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>>>
>>>>> This part is tricky
>>>>>
>>>>> scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_
>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>> _).collect.foreach(println)
>>>>> <console>:47: error: value contains is not a member of (String, String)
>>>>>          val showlines = messages.filter(_ contains("ASE
>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>> _).collect.foreach(println)
>>>>>
>>>>>
>>>>> How does one refer to the content of the stream here?
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> //
>>>>> // Now want to do some analysis on the same text file
>>>>> //
>>>>>
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> bq. split"\t," splits the filter by carriage return
>>>>>>
>>>>>> Minor correction: "\t" denotes tab character.
>>>>>>
>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Mich,
>>>>>>>
>>>>>>> 1. The first underscore in your filter call is refering to a line in
>>>>>>> the file (as textFile() results in a collection of strings)
>>>>>>> 2. You're correct. No need for it.
>>>>>>> 3. Filter is expecting a Boolean result. So you can merge your
>>>>>>> contains filters to one with AND (&&) statement.
>>>>>>> 4. Correct. Each character in split() is used as a divider.
>>>>>>>
>>>>>>> Eliran Bivas
>>>>>>>
>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>>> *Sent:* Apr 3, 2016 15:06
>>>>>>> *To:* Eliran Bivas
>>>>>>> *Cc:* user @spark
>>>>>>> *Subject:* Re: multiple splits fails
>>>>>>>
>>>>>>> Hi Eliran,
>>>>>>>
>>>>>>> Many thanks for your input on this.
>>>>>>>
>>>>>>> I thought about what I was trying to achieve so I rewrote the logic
>>>>>>> as follows:
>>>>>>>
>>>>>>>
>>>>>>>    1. Read the text file in
>>>>>>>    2. Filter out empty lines (well not really needed here)
>>>>>>>    3. Search for lines that contain "ASE 15" and further have
>>>>>>>    sentence "UPDATE INDEX STATISTICS" in the said line
>>>>>>>    4. Split the text by "\t" and ","
>>>>>>>    5. Print the outcome
>>>>>>>
>>>>>>>
>>>>>>> This was what I did with your suggestions included
>>>>>>>
>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>> f.cache()
>>>>>>>  f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_
>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>> _).collect.foreach(println)
>>>>>>>
>>>>>>>
>>>>>>> Couple of questions if I may
>>>>>>>
>>>>>>>
>>>>>>>    1. I take that "_" refers to content of the file read in by
>>>>>>>    default?
>>>>>>>    2. _.length > 0 basically filters out blank lines (not really
>>>>>>>    needed here)
>>>>>>>    3. Multiple filters are needed for each *contains* logic
>>>>>>>    4. split"\t," splits the filter by carriage return AND ,?
>>>>>>>
>>>>>>>
>>>>>>> Regards
>>>>>>>
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * 
>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote:
>>>>>>>
>>>>>>>> Hi Mich,
>>>>>>>>
>>>>>>>> Few comments:
>>>>>>>>
>>>>>>>> When doing .filter(_ > “”) you’re actually doing a lexicographic
>>>>>>>> comparison and not filtering for empty lines (which could be achieved 
>>>>>>>> with
>>>>>>>> _.notEmpty or _.length > 0).
>>>>>>>> I think that filtering with _.contains should be sufficient and
>>>>>>>> the first filter can be omitted.
>>>>>>>>
>>>>>>>> As for line => line.split(“\t”).split(“,”):
>>>>>>>> You have to do a second map or (since split() requires a regex as
>>>>>>>> input) .split(“\t,”).
>>>>>>>> The problem is that your first split() call will generate an Array
>>>>>>>> and then your second call will result in an error.
>>>>>>>> e.g.
>>>>>>>>
>>>>>>>> val lines: Array[String] = line.split(“\t”)
>>>>>>>> lines.split(“,”) // Compilation error - no method split() exists
>>>>>>>> for Array
>>>>>>>>
>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>>>>>>>> map(_.split(“\t,”))
>>>>>>>>
>>>>>>>> Hope that helps.
>>>>>>>>
>>>>>>>> *Eliran Bivas*
>>>>>>>> Data Team | iguaz.io
>>>>>>>>
>>>>>>>>
>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am not sure this is the correct approach
>>>>>>>>
>>>>>>>> Read a text file in
>>>>>>>>
>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>
>>>>>>>>
>>>>>>>> Now I want to get rid of empty lines and filter only the lines that
>>>>>>>> contain "ASE15"
>>>>>>>>
>>>>>>>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>>>>>>>
>>>>>>>> The above works but I am not sure whether I need two filter
>>>>>>>> transformation above? Can it be done in one?
>>>>>>>>
>>>>>>>> Now I want to map the above filter to lines with carriage return
>>>>>>>> ans split them by ","
>>>>>>>>
>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>>> (line.split("\t")))
>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] =
>>>>>>>> MapPartitionsRDD[131] at map at <console>:30
>>>>>>>>
>>>>>>>> Now I want to split the output by ","
>>>>>>>>
>>>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>>> (line.split("\t").split(",")))
>>>>>>>> <console>:30: error: value split is not a member of Array[String]
>>>>>>>>               f.filter(_ > "").filter(_ contains("ASE15")).map(line
>>>>>>>> => (line.split("\t").split(",")))
>>>>>>>>
>>>>>>>> ^
>>>>>>>> Any advice will be appreciated
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn * 
>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to