Hi

Instead of using print() directly on Dstream, I will suggest you use foreachRDD
if you  wanted to materialize all rows , example shown here:-

https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

dstream.foreachRDD(rdd => {
      val connection = createNewConnection()  // executed at the driver
      rdd.foreach(record => {
          connection.send(record) // executed at the worker
      })
  })


On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> I am afraid print(Integer.MAX_VALUE) does not return any lines!  However,
> print(1000) does
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 3 April 2016 at 19:46, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Since num is an Int, you can specify Integer.MAX_VALUE
>>
>> FYI
>>
>> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Thanks Ted.
>>>
>>> As I see print() materializes the first 10 rows. On the other hand
>>> print(n) will materialise n rows.
>>>
>>> How about if I wanted to materialize all rows?
>>>
>>> Cheers
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 3 April 2016 at 18:05, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Mich:
>>>> See the following method of DStream:
>>>>
>>>>    * Print the first num elements of each RDD generated in this
>>>> DStream. This is an output
>>>>    * operator, so this DStream will be registered as an output stream
>>>> and there materialized.
>>>>    */
>>>>   def print(num: Int): Unit = ssc.withScope {
>>>>
>>>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> However this works. I am checking the logic to see if it does what I
>>>>> asked it to do
>>>>>
>>>>> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE
>>>>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word,
>>>>> 1)).reduceByKey(_ + _).print
>>>>>
>>>>> scala> ssc.start()
>>>>>
>>>>> scala> -------------------------------------------
>>>>> Time: 1459701925000 ms
>>>>> -------------------------------------------
>>>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15
>>>>> databases,27)
>>>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 OR,27)
>>>>> (Once databases are loaded to ASE 15, then you will need to maintain
>>>>> them the way you maintain your PROD. For example run UPDATE INDEX
>>>>> STATISTICS and REORG COMPACT as necessary. One of the frequent mistakes
>>>>> that people do is NOT pruning data from daily log tables in ASE 15 etc as
>>>>> they do it in PROD. This normally results in slower performance on ASE 15
>>>>> databases as test cycles continue. Use MDA readings to measure daily DML
>>>>> activities on ASE 15 tables and compare them with those of PROD. A 24 hour
>>>>> cycle measurement should be good. If you notice that certain tables have
>>>>> different DML hits (insert/update/delete) compared to PROD you will know
>>>>> that either ASE 15 is not doing everything in terms of batch activity 
>>>>> (some
>>>>> jobs are missing), or there is something inconsistent somewhere. ,27)
>>>>> (* Make sure that you have enough tempdb system segment space for
>>>>> UPDATE INDEX STATISTICS. It is always advisable to gauge the tempdb size
>>>>> required in ASE 15 QA and expand the tempdb database in production
>>>>> accordingly. The last thing you want is to blow up tempdb over the
>>>>> migration weekend.,27)
>>>>> (o In ASE 15 you can subdivide the task by running parallel UPDATE
>>>>> INDEX STATISTICS on different tables in the same database at the same 
>>>>> time.
>>>>> Watch tempdb segment growth though! OR,27)
>>>>>
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>> On 3 April 2016 at 17:06, Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Ted
>>>>>>
>>>>>> This works
>>>>>>
>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>>>> String)] = 
>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063
>>>>>>
>>>>>> scala> // Get the lines
>>>>>> scala> val lines = messages.map(_._2)
>>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] =
>>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64
>>>>>>
>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
>>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
>>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>>>>>>
>>>>>> However, this fails
>>>>>>
>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>> _).collect.foreach(println)
>>>>>> <console>:43: error: value collect is not a member of
>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>>>>          val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>> _).collect.foreach(println)
>>>>>>
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * 
>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 3 April 2016 at 16:01, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>
>>>>>>> bq. is not a member of (String, String)
>>>>>>>
>>>>>>> As shown above, contains shouldn't be applied directly on a tuple.
>>>>>>>
>>>>>>> Choose the element of the tuple and then apply contains on it.
>>>>>>>
>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thank you gents.
>>>>>>>>
>>>>>>>> That should "\n" as carriage return
>>>>>>>>
>>>>>>>> OK I am using spark streaming to analyse the message
>>>>>>>>
>>>>>>>> It does the streaming
>>>>>>>>
>>>>>>>> import _root_.kafka.serializer.StringDecoder
>>>>>>>> import org.apache.spark.SparkConf
>>>>>>>> import org.apache.spark.streaming._
>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>>>> //
>>>>>>>> scala> val sparkConf = new SparkConf().
>>>>>>>>      |              setAppName("StreamTest").
>>>>>>>>      |              setMaster("local[12]").
>>>>>>>>      |              set("spark.driver.allowMultipleContexts",
>>>>>>>> "true").
>>>>>>>>      |              set("spark.hadoop.validateOutputSpecs", "false")
>>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>>>>> scala>
>>>>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>>>>>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>>>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id
>>>>>>>> -> StreamTest)
>>>>>>>> scala> val topic = Set("newtopic")
>>>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>>>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>>>>>> String)] = 
>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>>>>>>
>>>>>>>> This part is tricky
>>>>>>>>
>>>>>>>> scala> val showlines = messages.filter(_ contains("ASE
>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>> _).collect.foreach(println)
>>>>>>>> <console>:47: error: value contains is not a member of (String,
>>>>>>>> String)
>>>>>>>>          val showlines = messages.filter(_ contains("ASE
>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>> _).collect.foreach(println)
>>>>>>>>
>>>>>>>>
>>>>>>>> How does one refer to the content of the stream here?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> //
>>>>>>>> // Now want to do some analysis on the same text file
>>>>>>>> //
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn * 
>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> bq. split"\t," splits the filter by carriage return
>>>>>>>>>
>>>>>>>>> Minor correction: "\t" denotes tab character.
>>>>>>>>>
>>>>>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Mich,
>>>>>>>>>>
>>>>>>>>>> 1. The first underscore in your filter call is refering to a line
>>>>>>>>>> in the file (as textFile() results in a collection of strings)
>>>>>>>>>> 2. You're correct. No need for it.
>>>>>>>>>> 3. Filter is expecting a Boolean result. So you can merge your
>>>>>>>>>> contains filters to one with AND (&&) statement.
>>>>>>>>>> 4. Correct. Each character in split() is used as a divider.
>>>>>>>>>>
>>>>>>>>>> Eliran Bivas
>>>>>>>>>>
>>>>>>>>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>>>>>> *Sent:* Apr 3, 2016 15:06
>>>>>>>>>> *To:* Eliran Bivas
>>>>>>>>>> *Cc:* user @spark
>>>>>>>>>> *Subject:* Re: multiple splits fails
>>>>>>>>>>
>>>>>>>>>> Hi Eliran,
>>>>>>>>>>
>>>>>>>>>> Many thanks for your input on this.
>>>>>>>>>>
>>>>>>>>>> I thought about what I was trying to achieve so I rewrote the
>>>>>>>>>> logic as follows:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>    1. Read the text file in
>>>>>>>>>>    2. Filter out empty lines (well not really needed here)
>>>>>>>>>>    3. Search for lines that contain "ASE 15" and further have
>>>>>>>>>>    sentence "UPDATE INDEX STATISTICS" in the said line
>>>>>>>>>>    4. Split the text by "\t" and ","
>>>>>>>>>>    5. Print the outcome
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This was what I did with your suggestions included
>>>>>>>>>>
>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>>> f.cache()
>>>>>>>>>>  f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_
>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Couple of questions if I may
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>    1. I take that "_" refers to content of the file read in by
>>>>>>>>>>    default?
>>>>>>>>>>    2. _.length > 0 basically filters out blank lines (not really
>>>>>>>>>>    needed here)
>>>>>>>>>>    3. Multiple filters are needed for each *contains* logic
>>>>>>>>>>    4. split"\t," splits the filter by carriage return AND ,?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Regards
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> LinkedIn * 
>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Mich,
>>>>>>>>>>>
>>>>>>>>>>> Few comments:
>>>>>>>>>>>
>>>>>>>>>>> When doing .filter(_ > “”) you’re actually doing a lexicographic
>>>>>>>>>>> comparison and not filtering for empty lines (which could be 
>>>>>>>>>>> achieved with
>>>>>>>>>>> _.notEmpty or _.length > 0).
>>>>>>>>>>> I think that filtering with _.contains should be sufficient and
>>>>>>>>>>> the first filter can be omitted.
>>>>>>>>>>>
>>>>>>>>>>> As for line => line.split(“\t”).split(“,”):
>>>>>>>>>>> You have to do a second map or (since split() requires a regex
>>>>>>>>>>> as input) .split(“\t,”).
>>>>>>>>>>> The problem is that your first split() call will generate an
>>>>>>>>>>> Array and then your second call will result in an error.
>>>>>>>>>>> e.g.
>>>>>>>>>>>
>>>>>>>>>>> val lines: Array[String] = line.split(“\t”)
>>>>>>>>>>> lines.split(“,”) // Compilation error - no method split() exists
>>>>>>>>>>> for Array
>>>>>>>>>>>
>>>>>>>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>>>>>>>>>>> map(_.split(“\t,”))
>>>>>>>>>>>
>>>>>>>>>>> Hope that helps.
>>>>>>>>>>>
>>>>>>>>>>> *Eliran Bivas*
>>>>>>>>>>> Data Team | iguaz.io
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <
>>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I am not sure this is the correct approach
>>>>>>>>>>>
>>>>>>>>>>> Read a text file in
>>>>>>>>>>>
>>>>>>>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Now I want to get rid of empty lines and filter only the lines
>>>>>>>>>>> that contain "ASE15"
>>>>>>>>>>>
>>>>>>>>>>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>>>>>>>>>>
>>>>>>>>>>> The above works but I am not sure whether I need two filter
>>>>>>>>>>> transformation above? Can it be done in one?
>>>>>>>>>>>
>>>>>>>>>>> Now I want to map the above filter to lines with carriage return
>>>>>>>>>>> ans split them by ","
>>>>>>>>>>>
>>>>>>>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>>>>>> (line.split("\t")))
>>>>>>>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] =
>>>>>>>>>>> MapPartitionsRDD[131] at map at <console>:30
>>>>>>>>>>>
>>>>>>>>>>> Now I want to split the output by ","
>>>>>>>>>>>
>>>>>>>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>>>>>>>> (line.split("\t").split(",")))
>>>>>>>>>>> <console>:30: error: value split is not a member of Array[String]
>>>>>>>>>>>               f.filter(_ > "").filter(_
>>>>>>>>>>> contains("ASE15")).map(line => (line.split("\t").split(",")))
>>>>>>>>>>>
>>>>>>>>>>> ^
>>>>>>>>>>> Any advice will be appreciated
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


-- 

Thanks & Regards

Sachin Aggarwal
7760502772

Reply via email to