At a quick glance, I think you're misunderstanding some basic features. http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations
Map is a transformation, it is lazy. You're not calling any action on the result of map. Also, closing over a mutable variable (like idx or featArray here) won't work; that closure is being run on executors, not the driver where your main code is running. On Mon, Jan 12, 2015 at 9:49 AM, rkgurram <rkgur...@gmail.com> wrote: > Hi, > I am observing some weird behavior with spark, it might be my > mis-interpretation of some fundamental concepts but I have look at it for 3 > days and have not been able to solve it. > > The source code is pretty long and complex so instead of posting it, I will > try to articulate the problem. > I am building a "Sentiment Analyser" using the Naive Bayes model in Spark. > > 1) I have taken text files in RAW format and created a RDD of > words->Array(files the word is found in). > > 2) From this I have derived the "features" array for each file which is an > Array[Double], a 0.0 if the file does not contain the word and 1.0 if the > word is found in the file > > 3) I have then created an RDD[LabeledPoints] > > from this I have created the Naive Baiyes model using the following code > > val splits = uberLbRDD.randomSplit(Array(0.6, 0.4), seed = 11L) > val training = splits(0) > // training.persist(StorageLevel.MEMORY_AND_DISK_SER_2) > val test = splits(1) > Logger.info("Training count: " + training.count() + " Testing count:" + > test.count()) > model = NaiveBayes.train(training, lambda = 1.0) > > val predictionAndLabel = test.map(p => (model.predict(p.features), > p.label)) > val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == > x._2).count() / test.count() > Logger.info("Fold:[" + fold + "] accuracy: [" + accuracy +"]") > > 4) The model seems to be fine and the accuracy is about 75% to 82% > depending > on which set of input fles I provide. > > 5) Now I am using this model to "predict()", here I am creating the same > feature array from the input text file and I have code as follows, > /* > * Print all the features (words) in the feature array > */ > allFeaturesRDD.foreach((x) => print(x + ", ")) > > /* > * Build the feature array > */ > > val features = buildFeatureArray(reviewID,wordSeqRdd) <---- Fails here, > have show this code below > logFeatureArray(features) > > val prediction = model.predict(Vectors.dense(features)) > Logger.info ("Prediction:" + prediction) > > ================================== > reviewID ----> filename > wordReviewSeqRDD -> RDD[(word, Array(filename)] > > def buildFeatureArray(reviewID:String, > wordReviewSeqRDD:RDD[(String,Seq[String])]): > Array[Double] = { > > val numWords = allFeaturesRDD.count <--- number of all words in the > feature > val wordReviewSeqMap = wordReviewSeqRDD.collectAsMap() > > var featArray:Array[Double] = new Array(numWords.toInt) <--- create an > empty features array > var idx = 0 > if (trainingDone) Logger.info("Feature len:" + numWords) > > allFeaturesRDD.map{ *<-- This is where it is failing, * > case(eachword) => { *<-- for some reason the code does not enter here > ????* > val reviewList = wordReviewSeqMap.get(eachword).get > > if (trainingDone == true) { > println("1. eachword:" + eachword + "reviewList:" + reviewList) > println("2. reviewList.size:" + reviewList.length) > println("3. reviewList(0):" + reviewList(0)) > > } > > featArray(idx) = if (reviewList.contains(reviewID)) 1.toDouble else > 0.toDouble > idx += 1 > } > } > featArray > } > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-does-not-loop-through-a-RDD-map-tp21102.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >