Mich:
See the following method of DStream:

   * Print the first num elements of each RDD generated in this DStream.
This is an output
   * operator, so this DStream will be registered as an output stream and
there materialized.
   */
  def print(num: Int): Unit = ssc.withScope {

On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> However this works. I am checking the logic to see if it does what I asked
> it to do
>
> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE INDEX
> STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word,
> 1)).reduceByKey(_ + _).print
>
> scala> ssc.start()
>
> scala> -------------------------------------------
> Time: 1459701925000 ms
> -------------------------------------------
> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15
> databases,27)
> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 OR,27)
> (Once databases are loaded to ASE 15, then you will need to maintain them
> the way you maintain your PROD. For example run UPDATE INDEX STATISTICS and
> REORG COMPACT as necessary. One of the frequent mistakes that people do is
> NOT pruning data from daily log tables in ASE 15 etc as they do it in PROD.
> This normally results in slower performance on ASE 15 databases as test
> cycles continue. Use MDA readings to measure daily DML activities on ASE 15
> tables and compare them with those of PROD. A 24 hour cycle measurement
> should be good. If you notice that certain tables have different DML hits
> (insert/update/delete) compared to PROD you will know that either ASE 15 is
> not doing everything in terms of batch activity (some jobs are missing), or
> there is something inconsistent somewhere. ,27)
> (* Make sure that you have enough tempdb system segment space for UPDATE
> INDEX STATISTICS. It is always advisable to gauge the tempdb size required
> in ASE 15 QA and expand the tempdb database in production accordingly. The
> last thing you want is to blow up tempdb over the migration weekend.,27)
> (o In ASE 15 you can subdivide the task by running parallel UPDATE INDEX
> STATISTICS on different tables in the same database at the same time. Watch
> tempdb segment growth though! OR,27)
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 3 April 2016 at 17:06, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Thanks Ted
>>
>> This works
>>
>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063
>>
>> scala> // Get the lines
>> scala> val lines = messages.map(_._2)
>> lines: org.apache.spark.streaming.dstream.DStream[String] =
>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64
>>
>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>>
>> However, this fails
>>
>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>> _).collect.foreach(println)
>> <console>:43: error: value collect is not a member of
>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>          val v = lines.filter(_.contains("ASE 15")).filter(_
>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>> _).collect.foreach(println)
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> bq. is not a member of (String, String)
>>>
>>> As shown above, contains shouldn't be applied directly on a tuple.
>>>
>>> Choose the element of the tuple and then apply contains on it.
>>>
>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Thank you gents.
>>>>
>>>> That should "\n" as carriage return
>>>>
>>>> OK I am using spark streaming to analyse the message
>>>>
>>>> It does the streaming
>>>>
>>>> import _root_.kafka.serializer.StringDecoder
>>>> import org.apache.spark.SparkConf
>>>> import org.apache.spark.streaming._
>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>> //
>>>> scala> val sparkConf = new SparkConf().
>>>>      |              setAppName("StreamTest").
>>>>      |              setMaster("local[12]").
>>>>      |              set("spark.driver.allowMultipleContexts", "true").
>>>>      |              set("spark.hadoop.validateOutputSpecs", "false")
>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>> scala>
>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id ->
>>>> StreamTest)
>>>> scala> val topic = Set("newtopic")
>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>> String)] = 
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>>
>>>> This part is tricky
>>>>
>>>> scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_
>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>> _).collect.foreach(println)
>>>> <console>:47: error: value contains is not a member of (String, String)
>>>>          val showlines = messages.filter(_ contains("ASE 15")).filter(_
>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>> _).collect.foreach(println)
>>>>
>>>>
>>>> How does one refer to the content of the stream here?
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> //
>>>> // Now want to do some analysis on the same text file
>>>> //
>>>>
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> bq. split"\t," splits the filter by carriage return
>>>>>
>>>>> Minor correction: "\t" denotes tab character.
>>>>>
>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io> wrote:
>>>>>
>>>>>> Hi Mich,
>>>>>>
>>>>>> 1. The first underscore in your filter call is refering to a line in
>>>>>> the file (as textFile() results in a collection of strings)
>>>>>> 2. You're correct. No need for it.
>>>>>> 3. Filter is expecting a Boolean result. So you can merge your
>>>>>> contains filters to one with AND (&&) statement.
>>>>>> 4. Correct. Each character in split() is used as a divider.
>>>>>>
>>>>>> Eliran Bivas
>>>>>>
>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>> *Sent:* Apr 3, 2016 15:06
>>>>>> *To:* Eliran Bivas
>>>>>> *Cc:* user @spark
>>>>>> *Subject:* Re: multiple splits fails
>>>>>>
>>>>>> Hi Eliran,
>>>>>>
>>>>>> Many thanks for your input on this.
>>>>>>
>>>>>> I thought about what I was trying to achieve so I rewrote the logic
>>>>>> as follows:
>>>>>>
>>>>>>
>>>>>>    1. Read the text file in
>>>>>>    2. Filter out empty lines (well not really needed here)
>>>>>>    3. Search for lines that contain "ASE 15" and further have
>>>>>>    sentence "UPDATE INDEX STATISTICS" in the said line
>>>>>>    4. Split the text by "\t" and ","
>>>>>>    5. Print the outcome
>>>>>>
>>>>>>
>>>>>> This was what I did with your suggestions included
>>>>>>
>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>> f.cache()
>>>>>>  f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_
>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>> _).collect.foreach(println)
>>>>>>
>>>>>>
>>>>>> Couple of questions if I may
>>>>>>
>>>>>>
>>>>>>    1. I take that "_" refers to content of the file read in by
>>>>>>    default?
>>>>>>    2. _.length > 0 basically filters out blank lines (not really
>>>>>>    needed here)
>>>>>>    3. Multiple filters are needed for each *contains* logic
>>>>>>    4. split"\t," splits the filter by carriage return AND ,?
>>>>>>
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * 
>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote:
>>>>>>
>>>>>>> Hi Mich,
>>>>>>>
>>>>>>> Few comments:
>>>>>>>
>>>>>>> When doing .filter(_ > “”) you’re actually doing a lexicographic
>>>>>>> comparison and not filtering for empty lines (which could be achieved 
>>>>>>> with
>>>>>>> _.notEmpty or _.length > 0).
>>>>>>> I think that filtering with _.contains should be sufficient and the
>>>>>>> first filter can be omitted.
>>>>>>>
>>>>>>> As for line => line.split(“\t”).split(“,”):
>>>>>>> You have to do a second map or (since split() requires a regex as
>>>>>>> input) .split(“\t,”).
>>>>>>> The problem is that your first split() call will generate an Array
>>>>>>> and then your second call will result in an error.
>>>>>>> e.g.
>>>>>>>
>>>>>>> val lines: Array[String] = line.split(“\t”)
>>>>>>> lines.split(“,”) // Compilation error - no method split() exists for
>>>>>>> Array
>>>>>>>
>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>>>>>>> map(_.split(“\t,”))
>>>>>>>
>>>>>>> Hope that helps.
>>>>>>>
>>>>>>> *Eliran Bivas*
>>>>>>> Data Team | iguaz.io
>>>>>>>
>>>>>>>
>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am not sure this is the correct approach
>>>>>>>
>>>>>>> Read a text file in
>>>>>>>
>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>
>>>>>>>
>>>>>>> Now I want to get rid of empty lines and filter only the lines that
>>>>>>> contain "ASE15"
>>>>>>>
>>>>>>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>>>>>>
>>>>>>> The above works but I am not sure whether I need two filter
>>>>>>> transformation above? Can it be done in one?
>>>>>>>
>>>>>>> Now I want to map the above filter to lines with carriage return ans
>>>>>>> split them by ","
>>>>>>>
>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>> (line.split("\t")))
>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] =
>>>>>>> MapPartitionsRDD[131] at map at <console>:30
>>>>>>>
>>>>>>> Now I want to split the output by ","
>>>>>>>
>>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>> (line.split("\t").split(",")))
>>>>>>> <console>:30: error: value split is not a member of Array[String]
>>>>>>>               f.filter(_ > "").filter(_ contains("ASE15")).map(line
>>>>>>> => (line.split("\t").split(",")))
>>>>>>>
>>>>>>> ^
>>>>>>> Any advice will be appreciated
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * 
>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to