Given that you are not specifying any key explicitly [usually people will 
user the Producer API and have a key value pair inserted] the key will be 
null....so your tuples would look like below

(null, "ID       TIMESTAMP                           PRICE")
(null, "40,20160426-080924,                  67.55738301621814598514")

For values...the positions should be 0 indexed....hence (referring to your 
invocations) words1 will return value for TIMESTAMP and words2 will return 
value for PRICE

>>I assume this is an array that can be handled as elements of an array as 
well?<<

These are all still under your DStream...you will need to invoke action on 
the DStream to use them....for instance words.foreachRDD(.....)

It should be easy for you to just run the streaming program and call print 
on each resulting DStream to understand what data is contained in it and 
decide how to make use of it.

Thanking You
---------------------------------------------------------------------------------
Praveen Devarao
Spark Technology Centre
IBM India Software Labs
---------------------------------------------------------------------------------
"Courage doesn't always roar. Sometimes courage is the quiet voice at the 
end of the day saying I will try again"



From:   Mich Talebzadeh <mich.talebza...@gmail.com>
To:     Praveen Devarao/India/IBM@IBMIN, "user @spark" 
<user@spark.apache.org>
Date:   26/04/2016 04:03 pm
Subject:        Re: Splitting spark dstream into separate fields



Thanks Praveen.

With regard to key/value pair. My kafka takes the following rows as input

cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list 
rhes564:9092 --topic newtopic

That ${IN_FILE} is the source of prices (1000  as follows

ID       TIMESTAMP                           PRICE
40,     20160426-080924,                  67.55738301621814598514

So tuples would be like below?

 (1,"ID")
(2, "TIMESTAMP")
(3, "PRICE")

For values
val words1 = lines.map(_.split(',').view(1))
val words2 = lines.map(_.split(',').view(2))
val words3 = lines.map(_.split(',').view(3))

So word1 will return value of ID, word2 will return value of TIMESTAMP and 
word3 will return value of PRICE?

I assume this is an array that can be handled as elements of an array as 
well?

Regards

Dr Mich Talebzadeh
 
LinkedIn  
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 
http://talebzadehmich.wordpress.com
 

On 26 April 2016 at 11:11, Praveen Devarao <praveen...@in.ibm.com> wrote:
Hi Mich,

        >>        
                val lines = dstream.map(_._2)

                This maps the record into components? Is that the correct 
understanding of it
        <<

        Not sure what you refer to when said record into components. The 
above function is basically giving you the tuple (key/value pair) that you 
would have inserted into Kafka. say my Kafka producer puts data as 

        1=>"abc"
        2 => "def"

        Then the above map would give you tuples as below

        (1,"abc")
        (2,"abc")

        >> 
                The following splits the line into comma separated fields. 


                val words = lines.map(_.split(',').view(2))
        <<
        Right, basically the value portion of your kafka data is being 
handled here

        >>
                val words = lines.map(_.split(',').view(2))

                I am interested in column three So view(2) returns the 
value.
                        
                I have also seen other ways like

                val words = lines.map(_.split(',').map(line => (line(0), 
(line(1),line(2) ...
        <<

        The split operation is returning back an array of String [a 
immutable StringLike collection]....calling the view method is creating a 
IndexedSeqView on the iterable while as in the second way you are 
iterating through it accessing the elements directly via the index 
position [line(0), line(1) ]. You would have to decide what is best for 
your use case based on evaluations should be lazy or immediate [see 
references below].

        References: 
http://www.scala-lang.org/files/archive/api/2.10.6/index.html#scala.collection.mutable.IndexedSeqLike
, 
http://www.scala-lang.org/files/archive/api/2.10.6/index.html#scala.collection.mutable.IndexedSeqView

        
        
Thanking You
---------------------------------------------------------------------------------
Praveen Devarao
Spark Technology Centre
IBM India Software Labs
---------------------------------------------------------------------------------
"Courage doesn't always roar. Sometimes courage is the quiet voice at the 
end of the day saying I will try again"



From:        Mich Talebzadeh <mich.talebza...@gmail.com>
To:        "user @spark" <user@spark.apache.org>
Date:        26/04/2016 12:58 pm
Subject:        Splitting spark dstream into separate fields



Hi,

Is there any optimum way of splitting a dstream into components?

I am doing Spark streaming and this the dstream I get

val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topics)
Now that dstream consists of 10,00 price lines per second like below

ID, TIMESTAMP, PRICE
31,20160426-080924,93.53608929178084896656

The columns are separated by commas/

Now couple of questions:

val lines = dstream.map(_._2)

This maps the record into components? Is that the correct understanding of 
it

The following splits the line into comma separated fields. 

val words = lines.map(_.split(',').view(2))

I am interested in column three So view(2) returns the value.

I have also seen other ways like

val words = lines.map(_.split(',').map(line => (line(0), (line(1),line(2) 
...

line(0), line(1) refer to the position of the fields?

Which one is the adopted one or the correct one?

Thanks


Dr Mich Talebzadeh
 
LinkedIn  
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 
http://talebzadehmich.wordpress.com
 






Reply via email to