I am afraid print(Integer.MAX_VALUE) does not return any lines!  However,
print(1000) does

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 3 April 2016 at 19:46, Ted Yu <yuzhih...@gmail.com> wrote:

> Since num is an Int, you can specify Integer.MAX_VALUE
>
> FYI
>
> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Thanks Ted.
>>
>> As I see print() materializes the first 10 rows. On the other hand
>> print(n) will materialise n rows.
>>
>> How about if I wanted to materialize all rows?
>>
>> Cheers
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Mich:
>>> See the following method of DStream:
>>>
>>>    * Print the first num elements of each RDD generated in this DStream.
>>> This is an output
>>>    * operator, so this DStream will be registered as an output stream
>>> and there materialized.
>>>    */
>>>   def print(num: Int): Unit = ssc.withScope {
>>>
>>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> However this works. I am checking the logic to see if it does what I
>>>> asked it to do
>>>>
>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE
>>>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word,
>>>> 1)).reduceByKey(_ + _).print
>>>>
>>>> scala> ssc.start()
>>>>
>>>> scala> -------------------------------------------
>>>> Time: 1459701925000 ms
>>>> -------------------------------------------
>>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15
>>>> databases,27)
>>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 OR,27)
>>>> (Once databases are loaded to ASE 15, then you will need to maintain
>>>> them the way you maintain your PROD. For example run UPDATE INDEX
>>>> STATISTICS and REORG COMPACT as necessary. One of the frequent mistakes
>>>> that people do is NOT pruning data from daily log tables in ASE 15 etc as
>>>> they do it in PROD. This normally results in slower performance on ASE 15
>>>> databases as test cycles continue. Use MDA readings to measure daily DML
>>>> activities on ASE 15 tables and compare them with those of PROD. A 24 hour
>>>> cycle measurement should be good. If you notice that certain tables have
>>>> different DML hits (insert/update/delete) compared to PROD you will know
>>>> that either ASE 15 is not doing everything in terms of batch activity (some
>>>> jobs are missing), or there is something inconsistent somewhere. ,27)
>>>> (* Make sure that you have enough tempdb system segment space for
>>>> UPDATE INDEX STATISTICS. It is always advisable to gauge the tempdb size
>>>> required in ASE 15 QA and expand the tempdb database in production
>>>> accordingly. The last thing you want is to blow up tempdb over the
>>>> migration weekend.,27)
>>>> (o In ASE 15 you can subdivide the task by running parallel UPDATE
>>>> INDEX STATISTICS on different tables in the same database at the same time.
>>>> Watch tempdb segment growth though! OR,27)
>>>>
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 3 April 2016 at 17:06, Mich Talebzadeh <mich.talebza...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Ted
>>>>>
>>>>> This works
>>>>>
>>>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>>> String)] = 
>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063
>>>>>
>>>>> scala> // Get the lines
>>>>> scala> val lines = messages.map(_._2)
>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] =
>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64
>>>>>
>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>>>>>
>>>>> However, this fails
>>>>>
>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>> _).collect.foreach(println)
>>>>> <console>:43: error: value collect is not a member of
>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>>>          val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>> _).collect.foreach(println)
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> bq. is not a member of (String, String)
>>>>>>
>>>>>> As shown above, contains shouldn't be applied directly on a tuple.
>>>>>>
>>>>>> Choose the element of the tuple and then apply contains on it.
>>>>>>
>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> Thank you gents.
>>>>>>>
>>>>>>> That should "\n" as carriage return
>>>>>>>
>>>>>>> OK I am using spark streaming to analyse the message
>>>>>>>
>>>>>>> It does the streaming
>>>>>>>
>>>>>>> import _root_.kafka.serializer.StringDecoder
>>>>>>> import org.apache.spark.SparkConf
>>>>>>> import org.apache.spark.streaming._
>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>>> //
>>>>>>> scala> val sparkConf = new SparkConf().
>>>>>>>      |              setAppName("StreamTest").
>>>>>>>      |              setMaster("local[12]").
>>>>>>>      |              set("spark.driver.allowMultipleContexts",
>>>>>>> "true").
>>>>>>>      |              set("spark.hadoop.validateOutputSpecs", "false")
>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>>>> scala>
>>>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>>>>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id ->
>>>>>>> StreamTest)
>>>>>>> scala> val topic = Set("newtopic")
>>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>>>>> String)] = 
>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>>>>>
>>>>>>> This part is tricky
>>>>>>>
>>>>>>> scala> val showlines = messages.filter(_ contains("ASE
>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>> _).collect.foreach(println)
>>>>>>> <console>:47: error: value contains is not a member of (String,
>>>>>>> String)
>>>>>>>          val showlines = messages.filter(_ contains("ASE
>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>> _).collect.foreach(println)
>>>>>>>
>>>>>>>
>>>>>>> How does one refer to the content of the stream here?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> //
>>>>>>> // Now want to do some analysis on the same text file
>>>>>>> //
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * 
>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>
>>>>>>>> bq. split"\t," splits the filter by carriage return
>>>>>>>>
>>>>>>>> Minor correction: "\t" denotes tab character.
>>>>>>>>
>>>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Mich,
>>>>>>>>>
>>>>>>>>> 1. The first underscore in your filter call is refering to a line
>>>>>>>>> in the file (as textFile() results in a collection of strings)
>>>>>>>>> 2. You're correct. No need for it.
>>>>>>>>> 3. Filter is expecting a Boolean result. So you can merge your
>>>>>>>>> contains filters to one with AND (&&) statement.
>>>>>>>>> 4. Correct. Each character in split() is used as a divider.
>>>>>>>>>
>>>>>>>>> Eliran Bivas
>>>>>>>>>
>>>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>>>>> *Sent:* Apr 3, 2016 15:06
>>>>>>>>> *To:* Eliran Bivas
>>>>>>>>> *Cc:* user @spark
>>>>>>>>> *Subject:* Re: multiple splits fails
>>>>>>>>>
>>>>>>>>> Hi Eliran,
>>>>>>>>>
>>>>>>>>> Many thanks for your input on this.
>>>>>>>>>
>>>>>>>>> I thought about what I was trying to achieve so I rewrote the
>>>>>>>>> logic as follows:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    1. Read the text file in
>>>>>>>>>    2. Filter out empty lines (well not really needed here)
>>>>>>>>>    3. Search for lines that contain "ASE 15" and further have
>>>>>>>>>    sentence "UPDATE INDEX STATISTICS" in the said line
>>>>>>>>>    4. Split the text by "\t" and ","
>>>>>>>>>    5. Print the outcome
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This was what I did with your suggestions included
>>>>>>>>>
>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>> f.cache()
>>>>>>>>>  f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_
>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Couple of questions if I may
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    1. I take that "_" refers to content of the file read in by
>>>>>>>>>    default?
>>>>>>>>>    2. _.length > 0 basically filters out blank lines (not really
>>>>>>>>>    needed here)
>>>>>>>>>    3. Multiple filters are needed for each *contains* logic
>>>>>>>>>    4. split"\t," splits the filter by carriage return AND ,?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn * 
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Mich,
>>>>>>>>>>
>>>>>>>>>> Few comments:
>>>>>>>>>>
>>>>>>>>>> When doing .filter(_ > “”) you’re actually doing a lexicographic
>>>>>>>>>> comparison and not filtering for empty lines (which could be 
>>>>>>>>>> achieved with
>>>>>>>>>> _.notEmpty or _.length > 0).
>>>>>>>>>> I think that filtering with _.contains should be sufficient and
>>>>>>>>>> the first filter can be omitted.
>>>>>>>>>>
>>>>>>>>>> As for line => line.split(“\t”).split(“,”):
>>>>>>>>>> You have to do a second map or (since split() requires a regex
>>>>>>>>>> as input) .split(“\t,”).
>>>>>>>>>> The problem is that your first split() call will generate an
>>>>>>>>>> Array and then your second call will result in an error.
>>>>>>>>>> e.g.
>>>>>>>>>>
>>>>>>>>>> val lines: Array[String] = line.split(“\t”)
>>>>>>>>>> lines.split(“,”) // Compilation error - no method split() exists
>>>>>>>>>> for Array
>>>>>>>>>>
>>>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>>>>>>>>>> map(_.split(“\t,”))
>>>>>>>>>>
>>>>>>>>>> Hope that helps.
>>>>>>>>>>
>>>>>>>>>> *Eliran Bivas*
>>>>>>>>>> Data Team | iguaz.io
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <
>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I am not sure this is the correct approach
>>>>>>>>>>
>>>>>>>>>> Read a text file in
>>>>>>>>>>
>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Now I want to get rid of empty lines and filter only the lines
>>>>>>>>>> that contain "ASE15"
>>>>>>>>>>
>>>>>>>>>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>>>>>>>>>
>>>>>>>>>> The above works but I am not sure whether I need two filter
>>>>>>>>>> transformation above? Can it be done in one?
>>>>>>>>>>
>>>>>>>>>> Now I want to map the above filter to lines with carriage return
>>>>>>>>>> ans split them by ","
>>>>>>>>>>
>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>>>>> (line.split("\t")))
>>>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] =
>>>>>>>>>> MapPartitionsRDD[131] at map at <console>:30
>>>>>>>>>>
>>>>>>>>>> Now I want to split the output by ","
>>>>>>>>>>
>>>>>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>>>>> (line.split("\t").split(",")))
>>>>>>>>>> <console>:30: error: value split is not a member of Array[String]
>>>>>>>>>>               f.filter(_ > "").filter(_
>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(",")))
>>>>>>>>>>
>>>>>>>>>> ^
>>>>>>>>>> Any advice will be appreciated
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> LinkedIn * 
>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to