1. RandomForest 'predict' method supports both RDD or Vector as input (
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.model.RandomForestModel)
. So, in this case, function extract_feature should return
tuple.(prediction, rawtext). If each input text can creates a list of
vectors, try using "flatMap" instead of "map"
2, 3: From spark documents: "*Discretized Stream* or *DStream* is the basic
abstraction provided by Spark Streaming. It represents a continuous stream
of data, either the input data stream received from source, or the
processed data stream generated by transforming the input stream.
Internally, it is represented by a continuous sequence of RDDs, which is
Spark’s abstraction of an immutable, distributed dataset. Each RDD in a
DStream contains data from a certain interval, as shown in the following
figure."(
https://spark.apache.org/docs/0.9.1/streaming-programming-guide.html)
​​
So, in order to handle a stream, you should handle each rdd in that stream.
This means with everything things want to do with your new data, put them
in 'process_rdd' function. There's nothing return in output of 'foreachRdd'
function, of course.

2016-05-31 14:39 GMT+07:00 obaidul karim <obaidc...@gmail.com>:

> Hi nguyen,
>
> Thanks a lot for your time and really appreciate good suggestions.
>
> Please find my concerns in line below:
>
> def extract_feature(rf_model, x):
> text = getFeatures(x).split(',')
> fea = [float(i) for i in text]
> prediction = rf_model.predict(fea)
> return (prediction, x) <<< this will return two separate list as tuple,
> but i want one to one mapping (pred, text) not (predlist, textlist)
>
> def process_rdd(rdd):
>      fea = rdd.map(lambda x: extract_feature(rf_model, x))
>      //do something as you want (saving,...) <<< I want to avoid saving to
> external system(definitely not in global variable). As I said, it could be
> an overhead considering streaming.
>
> stream.foreachRDD(process_rdd) <<< As you can see here, no variable to
> store the output from foreachRDD. My target is to get (pred, text) pair and
> then use
>
> Whatever it is, the output from "extract_feature" is not what I want.
> I will be more than happy if you please correct my mistakes here.
>
>
> -Obaid
>
> On Tue, May 31, 2016 at 2:04 PM, nguyen duc tuan <newvalu...@gmail.com>
> wrote:
>
>> I'm not sure what do you mean by saying "does not return any value". How
>> do you use this method?
>> I will use this method as following :
>> def extract_feature(rf_model, x):
>> text = getFeatures(x).split(',')
>> fea = [float(i) for i in text]
>> prediction = rf_model.predict(fea)
>> return (prediction, x)
>>
>> def process_rdd(rdd):
>>      fea = rdd.map(lambda x: extract_feature(rf_model, x))
>>      //do something as you want (saving,...)
>>
>> stream.foreachRDD(process_rdd)
>>
>> 2016-05-31 12:57 GMT+07:00 obaidul karim <obaidc...@gmail.com>:
>>
>>> foreachRDD does not return any value. I can be used just to send result
>>> to another place/context, like db,file etc.
>>> I could use that but seems like over head of having another hop.
>>> I wanted to make it simple and light.
>>>
>>>
>>> On Tuesday, 31 May 2016, nguyen duc tuan <newvalu...@gmail.com> wrote:
>>>
>>>> How about using foreachRDD ? I think this is much better than your
>>>> trick.
>>>>
>>>>
>>>> 2016-05-31 12:32 GMT+07:00 obaidul karim <obaidc...@gmail.com>:
>>>>
>>>>> Hi Guys,
>>>>>
>>>>> In the end, I am using below.
>>>>> The trick is using "native python map" along with "spark spreaming
>>>>> transform".
>>>>> May not an elegent way, however it works :).
>>>>>
>>>>> def predictScore(texts, modelRF):
>>>>>     predictions = texts.map( lambda txt :  (txt , getFeatures(txt)) ).\
>>>>>      map(lambda (txt, features) : (txt ,(features.split(','))) ).\
>>>>>      map( lambda (txt, features) : (txt, ([float(i) for i in
>>>>> features])) ).\
>>>>>      transform( lambda  rdd: sc.parallelize(\
>>>>>        map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda
>>>>> (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
>>>>>        )\
>>>>>      )
>>>>>      # in the transform operation: x=text and y=features
>>>>>     # Retrun will be tuple of (score,'original text')
>>>>>     return predictions
>>>>>
>>>>>
>>>>> Hope, it will help somebody who is facing same problem.
>>>>> If anybody has better idea, please post it here.
>>>>>
>>>>> -Obaid
>>>>>
>>>>> On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalu...@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> Dstream has an method foreachRDD, so you can walk through all RDDs
>>>>>> inside DStream as you want.
>>>>>>
>>>>>>
>>>>>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html
>>>>>>
>>>>>> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com>:
>>>>>>
>>>>>>> Hi nguyen,
>>>>>>>
>>>>>>> If I am not mistaken, we cannot call  "predict" on "dstream" as you
>>>>>>> have suggested.
>>>>>>> We have to use "transform" to be able to perform normal RDD
>>>>>>> operations on dstreams and here I am trapped.
>>>>>>>
>>>>>>> -Obaid
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan <
>>>>>>> newvalu...@gmail.com> wrote:
>>>>>>>
>>>>>>>> How about this ?
>>>>>>>>
>>>>>>>> def extract_feature(rf_model, x):
>>>>>>>> text = getFeatures(x).split(',')
>>>>>>>> fea = [float(i) for i in text]
>>>>>>>> prediction = rf_model.predict(fea)
>>>>>>>> return (prediction, x)
>>>>>>>> output = texts.map(lambda x: extract_feature(rf_model, x))
>>>>>>>>
>>>>>>>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidc...@gmail.com>:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Anybody has any idea on below?
>>>>>>>>>
>>>>>>>>> -Obaid
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Guys,
>>>>>>>>>>
>>>>>>>>>> This is my first mail to spark users mailing list.
>>>>>>>>>>
>>>>>>>>>> I need help on Dstream operation.
>>>>>>>>>>
>>>>>>>>>> In fact, I am using a MLlib randomforest model to predict using
>>>>>>>>>> spark streaming. In the end, I want to combine the feature Dstream &
>>>>>>>>>> prediction Dstream together for further downstream processing.
>>>>>>>>>>
>>>>>>>>>> I am predicting using below piece of code:
>>>>>>>>>>
>>>>>>>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x
>>>>>>>>>> : x.split(',')).map( lambda parts : [float(i) for i in parts]
>>>>>>>>>> ).transform(lambda rdd: rf_model.predict(rdd))
>>>>>>>>>>
>>>>>>>>>> Here texts is dstream having single line of text as records
>>>>>>>>>> getFeatures generates a comma separated features extracted from
>>>>>>>>>> each record
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I want the output as below tuple:
>>>>>>>>>> ("predicted value", "original text")
>>>>>>>>>>
>>>>>>>>>> How can I achieve that ?
>>>>>>>>>> or
>>>>>>>>>> at least can I perform .zip like normal RDD operation on two
>>>>>>>>>> Dstreams, like below:
>>>>>>>>>> output = texts.zip(predictions)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks in advance.
>>>>>>>>>>
>>>>>>>>>> -Obaid
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Reply via email to