Since num is an Int, you can specify Integer.MAX_VALUE

FYI

On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Thanks Ted.
>
> As I see print() materializes the first 10 rows. On the other hand
> print(n) will materialise n rows.
>
> How about if I wanted to materialize all rows?
>
> Cheers
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Mich:
>> See the following method of DStream:
>>
>>    * Print the first num elements of each RDD generated in this DStream.
>> This is an output
>>    * operator, so this DStream will be registered as an output stream and
>> there materialized.
>>    */
>>   def print(num: Int): Unit = ssc.withScope {
>>
>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> However this works. I am checking the logic to see if it does what I
>>> asked it to do
>>>
>>> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE
>>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word,
>>> 1)).reduceByKey(_ + _).print
>>>
>>> scala> ssc.start()
>>>
>>> scala> -------------------------------------------
>>> Time: 1459701925000 ms
>>> -------------------------------------------
>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15
>>> databases,27)
>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 OR,27)
>>> (Once databases are loaded to ASE 15, then you will need to maintain
>>> them the way you maintain your PROD. For example run UPDATE INDEX
>>> STATISTICS and REORG COMPACT as necessary. One of the frequent mistakes
>>> that people do is NOT pruning data from daily log tables in ASE 15 etc as
>>> they do it in PROD. This normally results in slower performance on ASE 15
>>> databases as test cycles continue. Use MDA readings to measure daily DML
>>> activities on ASE 15 tables and compare them with those of PROD. A 24 hour
>>> cycle measurement should be good. If you notice that certain tables have
>>> different DML hits (insert/update/delete) compared to PROD you will know
>>> that either ASE 15 is not doing everything in terms of batch activity (some
>>> jobs are missing), or there is something inconsistent somewhere. ,27)
>>> (* Make sure that you have enough tempdb system segment space for UPDATE
>>> INDEX STATISTICS. It is always advisable to gauge the tempdb size required
>>> in ASE 15 QA and expand the tempdb database in production accordingly. The
>>> last thing you want is to blow up tempdb over the migration weekend.,27)
>>> (o In ASE 15 you can subdivide the task by running parallel UPDATE INDEX
>>> STATISTICS on different tables in the same database at the same time. Watch
>>> tempdb segment growth though! OR,27)
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 3 April 2016 at 17:06, Mich Talebzadeh <mich.talebza...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Ted
>>>>
>>>> This works
>>>>
>>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>> String)] = 
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063
>>>>
>>>> scala> // Get the lines
>>>> scala> val lines = messages.map(_._2)
>>>> lines: org.apache.spark.streaming.dstream.DStream[String] =
>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64
>>>>
>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>>>>
>>>> However, this fails
>>>>
>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>> _).collect.foreach(println)
>>>> <console>:43: error: value collect is not a member of
>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>>          val v = lines.filter(_.contains("ASE 15")).filter(_
>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>> _).collect.foreach(println)
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> bq. is not a member of (String, String)
>>>>>
>>>>> As shown above, contains shouldn't be applied directly on a tuple.
>>>>>
>>>>> Choose the element of the tuple and then apply contains on it.
>>>>>
>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> Thank you gents.
>>>>>>
>>>>>> That should "\n" as carriage return
>>>>>>
>>>>>> OK I am using spark streaming to analyse the message
>>>>>>
>>>>>> It does the streaming
>>>>>>
>>>>>> import _root_.kafka.serializer.StringDecoder
>>>>>> import org.apache.spark.SparkConf
>>>>>> import org.apache.spark.streaming._
>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>> //
>>>>>> scala> val sparkConf = new SparkConf().
>>>>>>      |              setAppName("StreamTest").
>>>>>>      |              setMaster("local[12]").
>>>>>>      |              set("spark.driver.allowMultipleContexts", "true").
>>>>>>      |              set("spark.hadoop.validateOutputSpecs", "false")
>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>>> scala>
>>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>>>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id ->
>>>>>> StreamTest)
>>>>>> scala> val topic = Set("newtopic")
>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>>>> String)] = 
>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>>>>
>>>>>> This part is tricky
>>>>>>
>>>>>> scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_
>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>> _).collect.foreach(println)
>>>>>> <console>:47: error: value contains is not a member of (String,
>>>>>> String)
>>>>>>          val showlines = messages.filter(_ contains("ASE
>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>> _).collect.foreach(println)
>>>>>>
>>>>>>
>>>>>> How does one refer to the content of the stream here?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> //
>>>>>> // Now want to do some analysis on the same text file
>>>>>> //
>>>>>>
>>>>>>
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * 
>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>
>>>>>>> bq. split"\t," splits the filter by carriage return
>>>>>>>
>>>>>>> Minor correction: "\t" denotes tab character.
>>>>>>>
>>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Mich,
>>>>>>>>
>>>>>>>> 1. The first underscore in your filter call is refering to a line
>>>>>>>> in the file (as textFile() results in a collection of strings)
>>>>>>>> 2. You're correct. No need for it.
>>>>>>>> 3. Filter is expecting a Boolean result. So you can merge your
>>>>>>>> contains filters to one with AND (&&) statement.
>>>>>>>> 4. Correct. Each character in split() is used as a divider.
>>>>>>>>
>>>>>>>> Eliran Bivas
>>>>>>>>
>>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>>>> *Sent:* Apr 3, 2016 15:06
>>>>>>>> *To:* Eliran Bivas
>>>>>>>> *Cc:* user @spark
>>>>>>>> *Subject:* Re: multiple splits fails
>>>>>>>>
>>>>>>>> Hi Eliran,
>>>>>>>>
>>>>>>>> Many thanks for your input on this.
>>>>>>>>
>>>>>>>> I thought about what I was trying to achieve so I rewrote the logic
>>>>>>>> as follows:
>>>>>>>>
>>>>>>>>
>>>>>>>>    1. Read the text file in
>>>>>>>>    2. Filter out empty lines (well not really needed here)
>>>>>>>>    3. Search for lines that contain "ASE 15" and further have
>>>>>>>>    sentence "UPDATE INDEX STATISTICS" in the said line
>>>>>>>>    4. Split the text by "\t" and ","
>>>>>>>>    5. Print the outcome
>>>>>>>>
>>>>>>>>
>>>>>>>> This was what I did with your suggestions included
>>>>>>>>
>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>> f.cache()
>>>>>>>>  f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_
>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>> _).collect.foreach(println)
>>>>>>>>
>>>>>>>>
>>>>>>>> Couple of questions if I may
>>>>>>>>
>>>>>>>>
>>>>>>>>    1. I take that "_" refers to content of the file read in by
>>>>>>>>    default?
>>>>>>>>    2. _.length > 0 basically filters out blank lines (not really
>>>>>>>>    needed here)
>>>>>>>>    3. Multiple filters are needed for each *contains* logic
>>>>>>>>    4. split"\t," splits the filter by carriage return AND ,?
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards
>>>>>>>>
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn * 
>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote:
>>>>>>>>
>>>>>>>>> Hi Mich,
>>>>>>>>>
>>>>>>>>> Few comments:
>>>>>>>>>
>>>>>>>>> When doing .filter(_ > “”) you’re actually doing a lexicographic
>>>>>>>>> comparison and not filtering for empty lines (which could be achieved 
>>>>>>>>> with
>>>>>>>>> _.notEmpty or _.length > 0).
>>>>>>>>> I think that filtering with _.contains should be sufficient and
>>>>>>>>> the first filter can be omitted.
>>>>>>>>>
>>>>>>>>> As for line => line.split(“\t”).split(“,”):
>>>>>>>>> You have to do a second map or (since split() requires a regex as
>>>>>>>>> input) .split(“\t,”).
>>>>>>>>> The problem is that your first split() call will generate an Array
>>>>>>>>> and then your second call will result in an error.
>>>>>>>>> e.g.
>>>>>>>>>
>>>>>>>>> val lines: Array[String] = line.split(“\t”)
>>>>>>>>> lines.split(“,”) // Compilation error - no method split() exists
>>>>>>>>> for Array
>>>>>>>>>
>>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>>>>>>>>> map(_.split(“\t,”))
>>>>>>>>>
>>>>>>>>> Hope that helps.
>>>>>>>>>
>>>>>>>>> *Eliran Bivas*
>>>>>>>>> Data Team | iguaz.io
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <
>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am not sure this is the correct approach
>>>>>>>>>
>>>>>>>>> Read a text file in
>>>>>>>>>
>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Now I want to get rid of empty lines and filter only the lines
>>>>>>>>> that contain "ASE15"
>>>>>>>>>
>>>>>>>>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>>>>>>>>
>>>>>>>>> The above works but I am not sure whether I need two filter
>>>>>>>>> transformation above? Can it be done in one?
>>>>>>>>>
>>>>>>>>> Now I want to map the above filter to lines with carriage return
>>>>>>>>> ans split them by ","
>>>>>>>>>
>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>>>> (line.split("\t")))
>>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] =
>>>>>>>>> MapPartitionsRDD[131] at map at <console>:30
>>>>>>>>>
>>>>>>>>> Now I want to split the output by ","
>>>>>>>>>
>>>>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>>>> (line.split("\t").split(",")))
>>>>>>>>> <console>:30: error: value split is not a member of Array[String]
>>>>>>>>>               f.filter(_ > "").filter(_
>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(",")))
>>>>>>>>>
>>>>>>>>> ^
>>>>>>>>> Any advice will be appreciated
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn * 
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to