However this works. I am checking the logic to see if it does what I asked
it to do

val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE INDEX
STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word,
1)).reduceByKey(_ + _).print

scala> ssc.start()

scala> -------------------------------------------
Time: 1459701925000 ms
-------------------------------------------
(* Check that you have run UPDATE INDEX STATISTICS on all ASE 15
databases,27)
(o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 OR,27)
(Once databases are loaded to ASE 15, then you will need to maintain them
the way you maintain your PROD. For example run UPDATE INDEX STATISTICS and
REORG COMPACT as necessary. One of the frequent mistakes that people do is
NOT pruning data from daily log tables in ASE 15 etc as they do it in PROD.
This normally results in slower performance on ASE 15 databases as test
cycles continue. Use MDA readings to measure daily DML activities on ASE 15
tables and compare them with those of PROD. A 24 hour cycle measurement
should be good. If you notice that certain tables have different DML hits
(insert/update/delete) compared to PROD you will know that either ASE 15 is
not doing everything in terms of batch activity (some jobs are missing), or
there is something inconsistent somewhere. ,27)
(* Make sure that you have enough tempdb system segment space for UPDATE
INDEX STATISTICS. It is always advisable to gauge the tempdb size required
in ASE 15 QA and expand the tempdb database in production accordingly. The
last thing you want is to blow up tempdb over the migration weekend.,27)
(o In ASE 15 you can subdivide the task by running parallel UPDATE INDEX
STATISTICS on different tables in the same database at the same time. Watch
tempdb segment growth though! OR,27)



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 3 April 2016 at 17:06, Mich Talebzadeh <mich.talebza...@gmail.com> wrote:

> Thanks Ted
>
> This works
>
> scala> val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063
>
> scala> // Get the lines
> scala> val lines = messages.map(_._2)
> lines: org.apache.spark.streaming.dstream.DStream[String] =
> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64
>
> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>
> However, this fails
>
> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
> _).collect.foreach(println)
> <console>:43: error: value collect is not a member of
> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>          val v = lines.filter(_.contains("ASE 15")).filter(_
> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
> _).collect.foreach(println)
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> bq. is not a member of (String, String)
>>
>> As shown above, contains shouldn't be applied directly on a tuple.
>>
>> Choose the element of the tuple and then apply contains on it.
>>
>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Thank you gents.
>>>
>>> That should "\n" as carriage return
>>>
>>> OK I am using spark streaming to analyse the message
>>>
>>> It does the streaming
>>>
>>> import _root_.kafka.serializer.StringDecoder
>>> import org.apache.spark.SparkConf
>>> import org.apache.spark.streaming._
>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>> //
>>> scala> val sparkConf = new SparkConf().
>>>      |              setAppName("StreamTest").
>>>      |              setMaster("local[12]").
>>>      |              set("spark.driver.allowMultipleContexts", "true").
>>>      |              set("spark.hadoop.validateOutputSpecs", "false")
>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>> scala>
>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id ->
>>> StreamTest)
>>> scala> val topic = Set("newtopic")
>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>
>>> This part is tricky
>>>
>>> scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_
>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>> _).collect.foreach(println)
>>> <console>:47: error: value contains is not a member of (String, String)
>>>          val showlines = messages.filter(_ contains("ASE 15")).filter(_
>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>> _).collect.foreach(println)
>>>
>>>
>>> How does one refer to the content of the stream here?
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> //
>>> // Now want to do some analysis on the same text file
>>> //
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> bq. split"\t," splits the filter by carriage return
>>>>
>>>> Minor correction: "\t" denotes tab character.
>>>>
>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io> wrote:
>>>>
>>>>> Hi Mich,
>>>>>
>>>>> 1. The first underscore in your filter call is refering to a line in
>>>>> the file (as textFile() results in a collection of strings)
>>>>> 2. You're correct. No need for it.
>>>>> 3. Filter is expecting a Boolean result. So you can merge your
>>>>> contains filters to one with AND (&&) statement.
>>>>> 4. Correct. Each character in split() is used as a divider.
>>>>>
>>>>> Eliran Bivas
>>>>>
>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>> *Sent:* Apr 3, 2016 15:06
>>>>> *To:* Eliran Bivas
>>>>> *Cc:* user @spark
>>>>> *Subject:* Re: multiple splits fails
>>>>>
>>>>> Hi Eliran,
>>>>>
>>>>> Many thanks for your input on this.
>>>>>
>>>>> I thought about what I was trying to achieve so I rewrote the logic as
>>>>> follows:
>>>>>
>>>>>
>>>>>    1. Read the text file in
>>>>>    2. Filter out empty lines (well not really needed here)
>>>>>    3. Search for lines that contain "ASE 15" and further have
>>>>>    sentence "UPDATE INDEX STATISTICS" in the said line
>>>>>    4. Split the text by "\t" and ","
>>>>>    5. Print the outcome
>>>>>
>>>>>
>>>>> This was what I did with your suggestions included
>>>>>
>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>> f.cache()
>>>>>  f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_
>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>>>>> _).collect.foreach(println)
>>>>>
>>>>>
>>>>> Couple of questions if I may
>>>>>
>>>>>
>>>>>    1. I take that "_" refers to content of the file read in by
>>>>>    default?
>>>>>    2. _.length > 0 basically filters out blank lines (not really
>>>>>    needed here)
>>>>>    3. Multiple filters are needed for each *contains* logic
>>>>>    4. split"\t," splits the filter by carriage return AND ,?
>>>>>
>>>>>
>>>>> Regards
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote:
>>>>>
>>>>>> Hi Mich,
>>>>>>
>>>>>> Few comments:
>>>>>>
>>>>>> When doing .filter(_ > “”) you’re actually doing a lexicographic
>>>>>> comparison and not filtering for empty lines (which could be achieved 
>>>>>> with
>>>>>> _.notEmpty or _.length > 0).
>>>>>> I think that filtering with _.contains should be sufficient and the
>>>>>> first filter can be omitted.
>>>>>>
>>>>>> As for line => line.split(“\t”).split(“,”):
>>>>>> You have to do a second map or (since split() requires a regex as
>>>>>> input) .split(“\t,”).
>>>>>> The problem is that your first split() call will generate an Array
>>>>>> and then your second call will result in an error.
>>>>>> e.g.
>>>>>>
>>>>>> val lines: Array[String] = line.split(“\t”)
>>>>>> lines.split(“,”) // Compilation error - no method split() exists for
>>>>>> Array
>>>>>>
>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>>>>>> map(_.split(“\t,”))
>>>>>>
>>>>>> Hope that helps.
>>>>>>
>>>>>> *Eliran Bivas*
>>>>>> Data Team | iguaz.io
>>>>>>
>>>>>>
>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am not sure this is the correct approach
>>>>>>
>>>>>> Read a text file in
>>>>>>
>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>
>>>>>>
>>>>>> Now I want to get rid of empty lines and filter only the lines that
>>>>>> contain "ASE15"
>>>>>>
>>>>>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>>>>>
>>>>>> The above works but I am not sure whether I need two filter
>>>>>> transformation above? Can it be done in one?
>>>>>>
>>>>>> Now I want to map the above filter to lines with carriage return ans
>>>>>> split them by ","
>>>>>>
>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>> (line.split("\t")))
>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] =
>>>>>> MapPartitionsRDD[131] at map at <console>:30
>>>>>>
>>>>>> Now I want to split the output by ","
>>>>>>
>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>> (line.split("\t").split(",")))
>>>>>> <console>:30: error: value split is not a member of Array[String]
>>>>>>               f.filter(_ > "").filter(_ contains("ASE15")).map(line
>>>>>> => (line.split("\t").split(",")))
>>>>>>
>>>>>> ^
>>>>>> Any advice will be appreciated
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>> LinkedIn * 
>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to