Thanks.

Currently this is what I am doing

// Get the lines
//
val lines = messages.map(_._2)
// Check for message
val showResults = lines.filter(_.contains("Sending messages")).flatMap(line
=> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000)

So it prints max of 1000 lines to terminal after filter and map. Can this
be done as suggested?






Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 5 April 2016 at 06:53, Sachin Aggarwal <different.sac...@gmail.com>
wrote:

> Hi
>
> Instead of using print() directly on Dstream, I will suggest you use 
> foreachRDD
> if you  wanted to materialize all rows , example shown here:-
>
>
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>
> dstream.foreachRDD(rdd => {
>       val connection = createNewConnection()  // executed at the driver
>       rdd.foreach(record => {
>           connection.send(record) // executed at the worker
>       })
>   })
>
>
> On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> I am afraid print(Integer.MAX_VALUE) does not return any lines!  However,
>> print(1000) does
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 3 April 2016 at 19:46, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Since num is an Int, you can specify Integer.MAX_VALUE
>>>
>>> FYI
>>>
>>> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Thanks Ted.
>>>>
>>>> As I see print() materializes the first 10 rows. On the other hand
>>>> print(n) will materialise n rows.
>>>>
>>>> How about if I wanted to materialize all rows?
>>>>
>>>> Cheers
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> Mich:
>>>>> See the following method of DStream:
>>>>>
>>>>>    * Print the first num elements of each RDD generated in this
>>>>> DStream. This is an output
>>>>>    * operator, so this DStream will be registered as an output stream
>>>>> and there materialized.
>>>>>    */
>>>>>   def print(num: Int): Unit = ssc.withScope {
>>>>>
>>>>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> However this works. I am checking the logic to see if it does what I
>>>>>> asked it to do
>>>>>>
>>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE
>>>>>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word,
>>>>>> 1)).reduceByKey(_ + _).print
>>>>>>
>>>>>> scala> ssc.start()
>>>>>>
>>>>>> scala> -------------------------------------------
>>>>>> Time: 1459701925000 ms
>>>>>> -------------------------------------------
>>>>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15
>>>>>> databases,27)
>>>>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 OR,27)
>>>>>> (Once databases are loaded to ASE 15, then you will need to maintain
>>>>>> them the way you maintain your PROD. For example run UPDATE INDEX
>>>>>> STATISTICS and REORG COMPACT as necessary. One of the frequent mistakes
>>>>>> that people do is NOT pruning data from daily log tables in ASE 15 etc as
>>>>>> they do it in PROD. This normally results in slower performance on ASE 15
>>>>>> databases as test cycles continue. Use MDA readings to measure daily DML
>>>>>> activities on ASE 15 tables and compare them with those of PROD. A 24 
>>>>>> hour
>>>>>> cycle measurement should be good. If you notice that certain tables have
>>>>>> different DML hits (insert/update/delete) compared to PROD you will know
>>>>>> that either ASE 15 is not doing everything in terms of batch activity 
>>>>>> (some
>>>>>> jobs are missing), or there is something inconsistent somewhere. ,27)
>>>>>> (* Make sure that you have enough tempdb system segment space for
>>>>>> UPDATE INDEX STATISTICS. It is always advisable to gauge the tempdb size
>>>>>> required in ASE 15 QA and expand the tempdb database in production
>>>>>> accordingly. The last thing you want is to blow up tempdb over the
>>>>>> migration weekend.,27)
>>>>>> (o In ASE 15 you can subdivide the task by running parallel UPDATE
>>>>>> INDEX STATISTICS on different tables in the same database at the same 
>>>>>> time.
>>>>>> Watch tempdb segment growth though! OR,27)
>>>>>>
>>>>>>
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * 
>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 3 April 2016 at 17:06, Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Ted
>>>>>>>
>>>>>>> This works
>>>>>>>
>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>>>>> String)] = 
>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063
>>>>>>>
>>>>>>> scala> // Get the lines
>>>>>>> scala> val lines = messages.map(_._2)
>>>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] =
>>>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64
>>>>>>>
>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
>>>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
>>>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>>>>>>>
>>>>>>> However, this fails
>>>>>>>
>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>> _).collect.foreach(println)
>>>>>>> <console>:43: error: value collect is not a member of
>>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>>>>>          val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>> _).collect.foreach(println)
>>>>>>>
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * 
>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>
>>>>>>>> bq. is not a member of (String, String)
>>>>>>>>
>>>>>>>> As shown above, contains shouldn't be applied directly on a tuple.
>>>>>>>>
>>>>>>>> Choose the element of the tuple and then apply contains on it.
>>>>>>>>
>>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thank you gents.
>>>>>>>>>
>>>>>>>>> That should "\n" as carriage return
>>>>>>>>>
>>>>>>>>> OK I am using spark streaming to analyse the message
>>>>>>>>>
>>>>>>>>> It does the streaming
>>>>>>>>>
>>>>>>>>> import _root_.kafka.serializer.StringDecoder
>>>>>>>>> import org.apache.spark.SparkConf
>>>>>>>>> import org.apache.spark.streaming._
>>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>>>>> //
>>>>>>>>> scala> val sparkConf = new SparkConf().
>>>>>>>>>      |              setAppName("StreamTest").
>>>>>>>>>      |              setMaster("local[12]").
>>>>>>>>>      |              set("spark.driver.allowMultipleContexts",
>>>>>>>>> "true").
>>>>>>>>>      |              set("spark.hadoop.validateOutputSpecs",
>>>>>>>>> "false")
>>>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>>>>>> scala>
>>>>>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers"
>>>>>>>>> -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>>>>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest"
>>>>>>>>> )
>>>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id
>>>>>>>>> -> StreamTest)
>>>>>>>>> scala> val topic = Set("newtopic")
>>>>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String,
>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>>>>>>> String)] = 
>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>>>>>>>
>>>>>>>>> This part is tricky
>>>>>>>>>
>>>>>>>>> scala> val showlines = messages.filter(_ contains("ASE
>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>> _).collect.foreach(println)
>>>>>>>>> <console>:47: error: value contains is not a member of (String,
>>>>>>>>> String)
>>>>>>>>>          val showlines = messages.filter(_ contains("ASE
>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> How does one refer to the content of the stream here?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> //
>>>>>>>>> // Now want to do some analysis on the same text file
>>>>>>>>> //
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn * 
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> bq. split"\t," splits the filter by carriage return
>>>>>>>>>>
>>>>>>>>>> Minor correction: "\t" denotes tab character.
>>>>>>>>>>
>>>>>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Mich,
>>>>>>>>>>>
>>>>>>>>>>> 1. The first underscore in your filter call is refering to a
>>>>>>>>>>> line in the file (as textFile() results in a collection of strings)
>>>>>>>>>>> 2. You're correct. No need for it.
>>>>>>>>>>> 3. Filter is expecting a Boolean result. So you can merge your
>>>>>>>>>>> contains filters to one with AND (&&) statement.
>>>>>>>>>>> 4. Correct. Each character in split() is used as a divider.
>>>>>>>>>>>
>>>>>>>>>>> Eliran Bivas
>>>>>>>>>>>
>>>>>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>>>>>>> *Sent:* Apr 3, 2016 15:06
>>>>>>>>>>> *To:* Eliran Bivas
>>>>>>>>>>> *Cc:* user @spark
>>>>>>>>>>> *Subject:* Re: multiple splits fails
>>>>>>>>>>>
>>>>>>>>>>> Hi Eliran,
>>>>>>>>>>>
>>>>>>>>>>> Many thanks for your input on this.
>>>>>>>>>>>
>>>>>>>>>>> I thought about what I was trying to achieve so I rewrote the
>>>>>>>>>>> logic as follows:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>    1. Read the text file in
>>>>>>>>>>>    2. Filter out empty lines (well not really needed here)
>>>>>>>>>>>    3. Search for lines that contain "ASE 15" and further have
>>>>>>>>>>>    sentence "UPDATE INDEX STATISTICS" in the said line
>>>>>>>>>>>    4. Split the text by "\t" and ","
>>>>>>>>>>>    5. Print the outcome
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This was what I did with your suggestions included
>>>>>>>>>>>
>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>>>> f.cache()
>>>>>>>>>>>  f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_
>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Couple of questions if I may
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>    1. I take that "_" refers to content of the file read in by
>>>>>>>>>>>    default?
>>>>>>>>>>>    2. _.length > 0 basically filters out blank lines (not
>>>>>>>>>>>    really needed here)
>>>>>>>>>>>    3. Multiple filters are needed for each *contains* logic
>>>>>>>>>>>    4. split"\t," splits the filter by carriage return AND ,?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Regards
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Mich,
>>>>>>>>>>>>
>>>>>>>>>>>> Few comments:
>>>>>>>>>>>>
>>>>>>>>>>>> When doing .filter(_ > “”) you’re actually doing a
>>>>>>>>>>>> lexicographic comparison and not filtering for empty lines (which 
>>>>>>>>>>>> could be
>>>>>>>>>>>> achieved with _.notEmpty or _.length > 0).
>>>>>>>>>>>> I think that filtering with _.contains should be sufficient and
>>>>>>>>>>>> the first filter can be omitted.
>>>>>>>>>>>>
>>>>>>>>>>>> As for line => line.split(“\t”).split(“,”):
>>>>>>>>>>>> You have to do a second map or (since split() requires a regex
>>>>>>>>>>>> as input) .split(“\t,”).
>>>>>>>>>>>> The problem is that your first split() call will generate an
>>>>>>>>>>>> Array and then your second call will result in an error.
>>>>>>>>>>>> e.g.
>>>>>>>>>>>>
>>>>>>>>>>>> val lines: Array[String] = line.split(“\t”)
>>>>>>>>>>>> lines.split(“,”) // Compilation error - no method split()
>>>>>>>>>>>> exists for Array
>>>>>>>>>>>>
>>>>>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>>>>>>>>>>>> map(_.split(“\t,”))
>>>>>>>>>>>>
>>>>>>>>>>>> Hope that helps.
>>>>>>>>>>>>
>>>>>>>>>>>> *Eliran Bivas*
>>>>>>>>>>>> Data Team | iguaz.io
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <
>>>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I am not sure this is the correct approach
>>>>>>>>>>>>
>>>>>>>>>>>> Read a text file in
>>>>>>>>>>>>
>>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Now I want to get rid of empty lines and filter only the lines
>>>>>>>>>>>> that contain "ASE15"
>>>>>>>>>>>>
>>>>>>>>>>>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>>>>>>>>>>>
>>>>>>>>>>>> The above works but I am not sure whether I need two filter
>>>>>>>>>>>> transformation above? Can it be done in one?
>>>>>>>>>>>>
>>>>>>>>>>>> Now I want to map the above filter to lines with carriage
>>>>>>>>>>>> return ans split them by ","
>>>>>>>>>>>>
>>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>>>>>>> (line.split("\t")))
>>>>>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] =
>>>>>>>>>>>> MapPartitionsRDD[131] at map at <console>:30
>>>>>>>>>>>>
>>>>>>>>>>>> Now I want to split the output by ","
>>>>>>>>>>>>
>>>>>>>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>>>>>>> (line.split("\t").split(",")))
>>>>>>>>>>>> <console>:30: error: value split is not a member of
>>>>>>>>>>>> Array[String]
>>>>>>>>>>>>               f.filter(_ > "").filter(_
>>>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(",")))
>>>>>>>>>>>>
>>>>>>>>>>>> ^
>>>>>>>>>>>> Any advice will be appreciated
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
>
> Thanks & Regards
>
> Sachin Aggarwal
> 7760502772
>

Reply via email to