I'm not sure what do you mean by saying "does not return any value". How do you use this method? I will use this method as following : def extract_feature(rf_model, x): text = getFeatures(x).split(',') fea = [float(i) for i in text] prediction = rf_model.predict(fea) return (prediction, x)
def process_rdd(rdd): fea = rdd.map(lambda x: extract_feature(rf_model, x)) //do something as you want (saving,...) stream.foreachRDD(process_rdd) 2016-05-31 12:57 GMT+07:00 obaidul karim <obaidc...@gmail.com>: > foreachRDD does not return any value. I can be used just to send result to > another place/context, like db,file etc. > I could use that but seems like over head of having another hop. > I wanted to make it simple and light. > > > On Tuesday, 31 May 2016, nguyen duc tuan <newvalu...@gmail.com> wrote: > >> How about using foreachRDD ? I think this is much better than your trick. >> >> >> 2016-05-31 12:32 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >> >>> Hi Guys, >>> >>> In the end, I am using below. >>> The trick is using "native python map" along with "spark spreaming >>> transform". >>> May not an elegent way, however it works :). >>> >>> def predictScore(texts, modelRF): >>> predictions = texts.map( lambda txt : (txt , getFeatures(txt)) ).\ >>> map(lambda (txt, features) : (txt ,(features.split(','))) ).\ >>> map( lambda (txt, features) : (txt, ([float(i) for i in features])) >>> ).\ >>> transform( lambda rdd: sc.parallelize(\ >>> map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda >>> (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\ >>> )\ >>> ) >>> # in the transform operation: x=text and y=features >>> # Retrun will be tuple of (score,'original text') >>> return predictions >>> >>> >>> Hope, it will help somebody who is facing same problem. >>> If anybody has better idea, please post it here. >>> >>> -Obaid >>> >>> On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalu...@gmail.com> >>> wrote: >>> >>>> Dstream has an method foreachRDD, so you can walk through all RDDs >>>> inside DStream as you want. >>>> >>>> >>>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html >>>> >>>> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>>> >>>>> Hi nguyen, >>>>> >>>>> If I am not mistaken, we cannot call "predict" on "dstream" as you >>>>> have suggested. >>>>> We have to use "transform" to be able to perform normal RDD operations >>>>> on dstreams and here I am trapped. >>>>> >>>>> -Obaid >>>>> >>>>> >>>>> >>>>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan <newvalu...@gmail.com >>>>> > wrote: >>>>> >>>>>> How about this ? >>>>>> >>>>>> def extract_feature(rf_model, x): >>>>>> text = getFeatures(x).split(',') >>>>>> fea = [float(i) for i in text] >>>>>> prediction = rf_model.predict(fea) >>>>>> return (prediction, x) >>>>>> output = texts.map(lambda x: extract_feature(rf_model, x)) >>>>>> >>>>>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Anybody has any idea on below? >>>>>>> >>>>>>> -Obaid >>>>>>> >>>>>>> >>>>>>> On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Guys, >>>>>>>> >>>>>>>> This is my first mail to spark users mailing list. >>>>>>>> >>>>>>>> I need help on Dstream operation. >>>>>>>> >>>>>>>> In fact, I am using a MLlib randomforest model to predict using >>>>>>>> spark streaming. In the end, I want to combine the feature Dstream & >>>>>>>> prediction Dstream together for further downstream processing. >>>>>>>> >>>>>>>> I am predicting using below piece of code: >>>>>>>> >>>>>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x : >>>>>>>> x.split(',')).map( lambda parts : [float(i) for i in parts] >>>>>>>> ).transform(lambda rdd: rf_model.predict(rdd)) >>>>>>>> >>>>>>>> Here texts is dstream having single line of text as records >>>>>>>> getFeatures generates a comma separated features extracted from >>>>>>>> each record >>>>>>>> >>>>>>>> >>>>>>>> I want the output as below tuple: >>>>>>>> ("predicted value", "original text") >>>>>>>> >>>>>>>> How can I achieve that ? >>>>>>>> or >>>>>>>> at least can I perform .zip like normal RDD operation on two >>>>>>>> Dstreams, like below: >>>>>>>> output = texts.zip(predictions) >>>>>>>> >>>>>>>> >>>>>>>> Thanks in advance. >>>>>>>> >>>>>>>> -Obaid >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>