Hi nguyen,

Thanks a lot for your time and really appreciate good suggestions.

Please find my concerns in line below:

def extract_feature(rf_model, x):
text = getFeatures(x).split(',')
fea = [float(i) for i in text]
prediction = rf_model.predict(fea)
return (prediction, x) <<< this will return two separate list as tuple, but
i want one to one mapping (pred, text) not (predlist, textlist)

def process_rdd(rdd):
     fea = rdd.map(lambda x: extract_feature(rf_model, x))
     //do something as you want (saving,...) <<< I want to avoid saving to
external system(definitely not in global variable). As I said, it could be
an overhead considering streaming.

stream.foreachRDD(process_rdd) <<< As you can see here, no variable to
store the output from foreachRDD. My target is to get (pred, text) pair and
then use

Whatever it is, the output from "extract_feature" is not what I want.
I will be more than happy if you please correct my mistakes here.


-Obaid

On Tue, May 31, 2016 at 2:04 PM, nguyen duc tuan <newvalu...@gmail.com>
wrote:

> I'm not sure what do you mean by saying "does not return any value". How
> do you use this method?
> I will use this method as following :
> def extract_feature(rf_model, x):
> text = getFeatures(x).split(',')
> fea = [float(i) for i in text]
> prediction = rf_model.predict(fea)
> return (prediction, x)
>
> def process_rdd(rdd):
>      fea = rdd.map(lambda x: extract_feature(rf_model, x))
>      //do something as you want (saving,...)
>
> stream.foreachRDD(process_rdd)
>
> 2016-05-31 12:57 GMT+07:00 obaidul karim <obaidc...@gmail.com>:
>
>> foreachRDD does not return any value. I can be used just to send result
>> to another place/context, like db,file etc.
>> I could use that but seems like over head of having another hop.
>> I wanted to make it simple and light.
>>
>>
>> On Tuesday, 31 May 2016, nguyen duc tuan <newvalu...@gmail.com> wrote:
>>
>>> How about using foreachRDD ? I think this is much better than your trick.
>>>
>>>
>>> 2016-05-31 12:32 GMT+07:00 obaidul karim <obaidc...@gmail.com>:
>>>
>>>> Hi Guys,
>>>>
>>>> In the end, I am using below.
>>>> The trick is using "native python map" along with "spark spreaming
>>>> transform".
>>>> May not an elegent way, however it works :).
>>>>
>>>> def predictScore(texts, modelRF):
>>>>     predictions = texts.map( lambda txt :  (txt , getFeatures(txt)) ).\
>>>>      map(lambda (txt, features) : (txt ,(features.split(','))) ).\
>>>>      map( lambda (txt, features) : (txt, ([float(i) for i in
>>>> features])) ).\
>>>>      transform( lambda  rdd: sc.parallelize(\
>>>>        map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda
>>>> (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
>>>>        )\
>>>>      )
>>>>      # in the transform operation: x=text and y=features
>>>>     # Retrun will be tuple of (score,'original text')
>>>>     return predictions
>>>>
>>>>
>>>> Hope, it will help somebody who is facing same problem.
>>>> If anybody has better idea, please post it here.
>>>>
>>>> -Obaid
>>>>
>>>> On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Dstream has an method foreachRDD, so you can walk through all RDDs
>>>>> inside DStream as you want.
>>>>>
>>>>>
>>>>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html
>>>>>
>>>>> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com>:
>>>>>
>>>>>> Hi nguyen,
>>>>>>
>>>>>> If I am not mistaken, we cannot call  "predict" on "dstream" as you
>>>>>> have suggested.
>>>>>> We have to use "transform" to be able to perform normal RDD
>>>>>> operations on dstreams and here I am trapped.
>>>>>>
>>>>>> -Obaid
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan <
>>>>>> newvalu...@gmail.com> wrote:
>>>>>>
>>>>>>> How about this ?
>>>>>>>
>>>>>>> def extract_feature(rf_model, x):
>>>>>>> text = getFeatures(x).split(',')
>>>>>>> fea = [float(i) for i in text]
>>>>>>> prediction = rf_model.predict(fea)
>>>>>>> return (prediction, x)
>>>>>>> output = texts.map(lambda x: extract_feature(rf_model, x))
>>>>>>>
>>>>>>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidc...@gmail.com>:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Anybody has any idea on below?
>>>>>>>>
>>>>>>>> -Obaid
>>>>>>>>
>>>>>>>>
>>>>>>>> On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Guys,
>>>>>>>>>
>>>>>>>>> This is my first mail to spark users mailing list.
>>>>>>>>>
>>>>>>>>> I need help on Dstream operation.
>>>>>>>>>
>>>>>>>>> In fact, I am using a MLlib randomforest model to predict using
>>>>>>>>> spark streaming. In the end, I want to combine the feature Dstream &
>>>>>>>>> prediction Dstream together for further downstream processing.
>>>>>>>>>
>>>>>>>>> I am predicting using below piece of code:
>>>>>>>>>
>>>>>>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x
>>>>>>>>> : x.split(',')).map( lambda parts : [float(i) for i in parts]
>>>>>>>>> ).transform(lambda rdd: rf_model.predict(rdd))
>>>>>>>>>
>>>>>>>>> Here texts is dstream having single line of text as records
>>>>>>>>> getFeatures generates a comma separated features extracted from
>>>>>>>>> each record
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I want the output as below tuple:
>>>>>>>>> ("predicted value", "original text")
>>>>>>>>>
>>>>>>>>> How can I achieve that ?
>>>>>>>>> or
>>>>>>>>> at least can I perform .zip like normal RDD operation on two
>>>>>>>>> Dstreams, like below:
>>>>>>>>> output = texts.zip(predictions)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks in advance.
>>>>>>>>>
>>>>>>>>> -Obaid
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Reply via email to