1. RandomForest 'predict' method supports both RDD or Vector as input ( http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.model.RandomForestModel) . So, in this case, function extract_feature should return tuple.(prediction, rawtext). If each input text can creates a list of vectors, try using "flatMap" instead of "map" 2, 3: From spark documents: "*Discretized Stream* or *DStream* is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, it is represented by a continuous sequence of RDDs, which is Spark’s abstraction of an immutable, distributed dataset. Each RDD in a DStream contains data from a certain interval, as shown in the following figure."( https://spark.apache.org/docs/0.9.1/streaming-programming-guide.html) So, in order to handle a stream, you should handle each rdd in that stream. This means with everything things want to do with your new data, put them in 'process_rdd' function. There's nothing return in output of 'foreachRdd' function, of course.
2016-05-31 14:39 GMT+07:00 obaidul karim <obaidc...@gmail.com>: > Hi nguyen, > > Thanks a lot for your time and really appreciate good suggestions. > > Please find my concerns in line below: > > def extract_feature(rf_model, x): > text = getFeatures(x).split(',') > fea = [float(i) for i in text] > prediction = rf_model.predict(fea) > return (prediction, x) <<< this will return two separate list as tuple, > but i want one to one mapping (pred, text) not (predlist, textlist) > > def process_rdd(rdd): > fea = rdd.map(lambda x: extract_feature(rf_model, x)) > //do something as you want (saving,...) <<< I want to avoid saving to > external system(definitely not in global variable). As I said, it could be > an overhead considering streaming. > > stream.foreachRDD(process_rdd) <<< As you can see here, no variable to > store the output from foreachRDD. My target is to get (pred, text) pair and > then use > > Whatever it is, the output from "extract_feature" is not what I want. > I will be more than happy if you please correct my mistakes here. > > > -Obaid > > On Tue, May 31, 2016 at 2:04 PM, nguyen duc tuan <newvalu...@gmail.com> > wrote: > >> I'm not sure what do you mean by saying "does not return any value". How >> do you use this method? >> I will use this method as following : >> def extract_feature(rf_model, x): >> text = getFeatures(x).split(',') >> fea = [float(i) for i in text] >> prediction = rf_model.predict(fea) >> return (prediction, x) >> >> def process_rdd(rdd): >> fea = rdd.map(lambda x: extract_feature(rf_model, x)) >> //do something as you want (saving,...) >> >> stream.foreachRDD(process_rdd) >> >> 2016-05-31 12:57 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >> >>> foreachRDD does not return any value. I can be used just to send result >>> to another place/context, like db,file etc. >>> I could use that but seems like over head of having another hop. >>> I wanted to make it simple and light. >>> >>> >>> On Tuesday, 31 May 2016, nguyen duc tuan <newvalu...@gmail.com> wrote: >>> >>>> How about using foreachRDD ? I think this is much better than your >>>> trick. >>>> >>>> >>>> 2016-05-31 12:32 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>>> >>>>> Hi Guys, >>>>> >>>>> In the end, I am using below. >>>>> The trick is using "native python map" along with "spark spreaming >>>>> transform". >>>>> May not an elegent way, however it works :). >>>>> >>>>> def predictScore(texts, modelRF): >>>>> predictions = texts.map( lambda txt : (txt , getFeatures(txt)) ).\ >>>>> map(lambda (txt, features) : (txt ,(features.split(','))) ).\ >>>>> map( lambda (txt, features) : (txt, ([float(i) for i in >>>>> features])) ).\ >>>>> transform( lambda rdd: sc.parallelize(\ >>>>> map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda >>>>> (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\ >>>>> )\ >>>>> ) >>>>> # in the transform operation: x=text and y=features >>>>> # Retrun will be tuple of (score,'original text') >>>>> return predictions >>>>> >>>>> >>>>> Hope, it will help somebody who is facing same problem. >>>>> If anybody has better idea, please post it here. >>>>> >>>>> -Obaid >>>>> >>>>> On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalu...@gmail.com >>>>> > wrote: >>>>> >>>>>> Dstream has an method foreachRDD, so you can walk through all RDDs >>>>>> inside DStream as you want. >>>>>> >>>>>> >>>>>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html >>>>>> >>>>>> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>>>>> >>>>>>> Hi nguyen, >>>>>>> >>>>>>> If I am not mistaken, we cannot call "predict" on "dstream" as you >>>>>>> have suggested. >>>>>>> We have to use "transform" to be able to perform normal RDD >>>>>>> operations on dstreams and here I am trapped. >>>>>>> >>>>>>> -Obaid >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan < >>>>>>> newvalu...@gmail.com> wrote: >>>>>>> >>>>>>>> How about this ? >>>>>>>> >>>>>>>> def extract_feature(rf_model, x): >>>>>>>> text = getFeatures(x).split(',') >>>>>>>> fea = [float(i) for i in text] >>>>>>>> prediction = rf_model.predict(fea) >>>>>>>> return (prediction, x) >>>>>>>> output = texts.map(lambda x: extract_feature(rf_model, x)) >>>>>>>> >>>>>>>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Anybody has any idea on below? >>>>>>>>> >>>>>>>>> -Obaid >>>>>>>>> >>>>>>>>> >>>>>>>>> On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Guys, >>>>>>>>>> >>>>>>>>>> This is my first mail to spark users mailing list. >>>>>>>>>> >>>>>>>>>> I need help on Dstream operation. >>>>>>>>>> >>>>>>>>>> In fact, I am using a MLlib randomforest model to predict using >>>>>>>>>> spark streaming. In the end, I want to combine the feature Dstream & >>>>>>>>>> prediction Dstream together for further downstream processing. >>>>>>>>>> >>>>>>>>>> I am predicting using below piece of code: >>>>>>>>>> >>>>>>>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x >>>>>>>>>> : x.split(',')).map( lambda parts : [float(i) for i in parts] >>>>>>>>>> ).transform(lambda rdd: rf_model.predict(rdd)) >>>>>>>>>> >>>>>>>>>> Here texts is dstream having single line of text as records >>>>>>>>>> getFeatures generates a comma separated features extracted from >>>>>>>>>> each record >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I want the output as below tuple: >>>>>>>>>> ("predicted value", "original text") >>>>>>>>>> >>>>>>>>>> How can I achieve that ? >>>>>>>>>> or >>>>>>>>>> at least can I perform .zip like normal RDD operation on two >>>>>>>>>> Dstreams, like below: >>>>>>>>>> output = texts.zip(predictions) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks in advance. >>>>>>>>>> >>>>>>>>>> -Obaid >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> >