Hi nguyen, Thanks a lot for your time and really appreciate good suggestions.
Please find my concerns in line below: def extract_feature(rf_model, x): text = getFeatures(x).split(',') fea = [float(i) for i in text] prediction = rf_model.predict(fea) return (prediction, x) <<< this will return two separate list as tuple, but i want one to one mapping (pred, text) not (predlist, textlist) def process_rdd(rdd): fea = rdd.map(lambda x: extract_feature(rf_model, x)) //do something as you want (saving,...) <<< I want to avoid saving to external system(definitely not in global variable). As I said, it could be an overhead considering streaming. stream.foreachRDD(process_rdd) <<< As you can see here, no variable to store the output from foreachRDD. My target is to get (pred, text) pair and then use Whatever it is, the output from "extract_feature" is not what I want. I will be more than happy if you please correct my mistakes here. -Obaid On Tue, May 31, 2016 at 2:04 PM, nguyen duc tuan <newvalu...@gmail.com> wrote: > I'm not sure what do you mean by saying "does not return any value". How > do you use this method? > I will use this method as following : > def extract_feature(rf_model, x): > text = getFeatures(x).split(',') > fea = [float(i) for i in text] > prediction = rf_model.predict(fea) > return (prediction, x) > > def process_rdd(rdd): > fea = rdd.map(lambda x: extract_feature(rf_model, x)) > //do something as you want (saving,...) > > stream.foreachRDD(process_rdd) > > 2016-05-31 12:57 GMT+07:00 obaidul karim <obaidc...@gmail.com>: > >> foreachRDD does not return any value. I can be used just to send result >> to another place/context, like db,file etc. >> I could use that but seems like over head of having another hop. >> I wanted to make it simple and light. >> >> >> On Tuesday, 31 May 2016, nguyen duc tuan <newvalu...@gmail.com> wrote: >> >>> How about using foreachRDD ? I think this is much better than your trick. >>> >>> >>> 2016-05-31 12:32 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>> >>>> Hi Guys, >>>> >>>> In the end, I am using below. >>>> The trick is using "native python map" along with "spark spreaming >>>> transform". >>>> May not an elegent way, however it works :). >>>> >>>> def predictScore(texts, modelRF): >>>> predictions = texts.map( lambda txt : (txt , getFeatures(txt)) ).\ >>>> map(lambda (txt, features) : (txt ,(features.split(','))) ).\ >>>> map( lambda (txt, features) : (txt, ([float(i) for i in >>>> features])) ).\ >>>> transform( lambda rdd: sc.parallelize(\ >>>> map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda >>>> (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\ >>>> )\ >>>> ) >>>> # in the transform operation: x=text and y=features >>>> # Retrun will be tuple of (score,'original text') >>>> return predictions >>>> >>>> >>>> Hope, it will help somebody who is facing same problem. >>>> If anybody has better idea, please post it here. >>>> >>>> -Obaid >>>> >>>> On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalu...@gmail.com> >>>> wrote: >>>> >>>>> Dstream has an method foreachRDD, so you can walk through all RDDs >>>>> inside DStream as you want. >>>>> >>>>> >>>>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html >>>>> >>>>> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>>>> >>>>>> Hi nguyen, >>>>>> >>>>>> If I am not mistaken, we cannot call "predict" on "dstream" as you >>>>>> have suggested. >>>>>> We have to use "transform" to be able to perform normal RDD >>>>>> operations on dstreams and here I am trapped. >>>>>> >>>>>> -Obaid >>>>>> >>>>>> >>>>>> >>>>>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan < >>>>>> newvalu...@gmail.com> wrote: >>>>>> >>>>>>> How about this ? >>>>>>> >>>>>>> def extract_feature(rf_model, x): >>>>>>> text = getFeatures(x).split(',') >>>>>>> fea = [float(i) for i in text] >>>>>>> prediction = rf_model.predict(fea) >>>>>>> return (prediction, x) >>>>>>> output = texts.map(lambda x: extract_feature(rf_model, x)) >>>>>>> >>>>>>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Anybody has any idea on below? >>>>>>>> >>>>>>>> -Obaid >>>>>>>> >>>>>>>> >>>>>>>> On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Guys, >>>>>>>>> >>>>>>>>> This is my first mail to spark users mailing list. >>>>>>>>> >>>>>>>>> I need help on Dstream operation. >>>>>>>>> >>>>>>>>> In fact, I am using a MLlib randomforest model to predict using >>>>>>>>> spark streaming. In the end, I want to combine the feature Dstream & >>>>>>>>> prediction Dstream together for further downstream processing. >>>>>>>>> >>>>>>>>> I am predicting using below piece of code: >>>>>>>>> >>>>>>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x >>>>>>>>> : x.split(',')).map( lambda parts : [float(i) for i in parts] >>>>>>>>> ).transform(lambda rdd: rf_model.predict(rdd)) >>>>>>>>> >>>>>>>>> Here texts is dstream having single line of text as records >>>>>>>>> getFeatures generates a comma separated features extracted from >>>>>>>>> each record >>>>>>>>> >>>>>>>>> >>>>>>>>> I want the output as below tuple: >>>>>>>>> ("predicted value", "original text") >>>>>>>>> >>>>>>>>> How can I achieve that ? >>>>>>>>> or >>>>>>>>> at least can I perform .zip like normal RDD operation on two >>>>>>>>> Dstreams, like below: >>>>>>>>> output = texts.zip(predictions) >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks in advance. >>>>>>>>> >>>>>>>>> -Obaid >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >