You need to first fit just the VectorIndexer which returns the model, then add the model to the pipeline where it will only transform.
val featureVectorIndexer = new VectorIndexer() .setInputCol("feature") .setOutputCol("indexedfeature") .setMaxCategories(180) .fit(completeDataset) On Tue, Aug 30, 2016 at 9:57 AM, Bahubali Jain <bahub...@gmail.com> wrote: > Hi, > I had run into similar exception " java.util.NoSuchElementException: key > not found: " . > After further investigation I realized it is happening due to > vectorindexer being executed on training dataset and not on entire dataset. > > In the dataframe I have 5 categories , each of these have to go thru > stringindexer and then these are put thru a vector indexer to generate > feature vector. > What is the right way to do this, so that vector indexer can be run on the > entire data and not just on training data? > > Below is the current approach, as evident VectorIndexer is being > generated based on the training set. > > Please Note: fit() on Vectorindexer cannot be called on entireset > dataframe since it doesn't have the required column(*feature *column is > being generated dynamically in pipeline execution) > How can the vectorindexer be *fit()* on the entireset? > > val col1_indexer = new StringIndexer().setInputCol(" > col1").setOutputCol("indexed_col1") > val col2_indexer = new StringIndexer().setInputCol(" > col2").setOutputCol("indexed_col2") > val col3_indexer = new StringIndexer().setInputCol(" > col3").setOutputCol("indexed_col3") > val col4_indexer = new StringIndexer().setInputCol(" > col4").setOutputCol("indexed_col4") > val col5_indexer = new StringIndexer().setInputCol(" > col5").setOutputCol("indexed_col5") > > val featureArray = Array("indexed_col1","indexed_ > col2","indexed_col3","indexed_col4","indexed_col5") > val vectorAssembler = new VectorAssembler().setInputCols(featureArray). > setOutputCol("*feature*") > val featureVectorIndexer = new VectorIndexer() > .setInputCol("feature") > .setOutputCol("indexedfeature") > .setMaxCategories(180) > > val decisionTree = new DecisionTreeClassifier(). > setMaxBins(300).setMaxDepth(1).setImpurity("entropy"). > setLabelCol("indexed_user_action").setFeaturesCol("indexedfeature"). > setPredictionCol("prediction") > > val pipeline = new Pipeline().setStages(Array(col1_indexer,col2_indexer, > col3_indexer,col4_indexer,col5_indexer,vectorAssembler, > featureVectorIndexer,decisionTree)) > val model = pipeline.*fit(trainingSet)* > val output = model.transform(cvSet) > > > Thanks, > Baahu > > On Fri, Jul 8, 2016 at 11:24 PM, Bryan Cutler <cutl...@gmail.com> wrote: > >> Hi Rich, >> >> I looked at the notebook and it seems like you are fitting the >> StringIndexer and VectorIndexer to only the training data, and it should >> the the entire data set. So if the training data does not include all of >> the labels and an unknown label appears in the test data during evaluation, >> then it will not know how to index it. So your code should be like this, >> fit with 'digits' instead of 'training' >> >> val labelIndexer = new StringIndexer().setInputCol("l >> abel").setOutputCol("indexedLabel").fit(digits) >> // Automatically identify categorical features, and index them. >> // Set maxCategories so features with > 4 distinct values are treated as >> continuous. >> val featureIndexer = new VectorIndexer().setInputCol("f >> eatures").setOutputCol("indexedFeatures").setMaxCategories(4).fit(digits) >> >> Hope that helps! >> >> On Fri, Jul 1, 2016 at 9:24 AM, Rich Tarro <richta...@gmail.com> wrote: >> >>> Hi Bryan. >>> >>> Thanks for your continued help. >>> >>> Here is the code shown in a Jupyter notebook. I figured this was easier >>> that cutting and pasting the code into an email. If you would like me to >>> send you the code in a different format let, me know. The necessary data is >>> all downloaded within the notebook itself. >>> >>> https://console.ng.bluemix.net/data/notebooks/fe7e578a-401f- >>> 4744-a318-b1b6bcf6f5f8/view?access_token=2f6df7b1dfcb3c1c2 >>> d94a794506bb282729dab8f05118fafe5f11886326e02fc >>> >>> A few additional pieces of information. >>> >>> 1. The training dataset is cached before training the model. If you do >>> not cache the training dataset, the model will not train. The code >>> model.transform(test) fails with a similar error. No other changes besides >>> caching or not caching. Again, with the training dataset cached, the model >>> can be successfully trained as seen in the notebook. >>> >>> 2. I have another version of the notebook where I download the same data >>> in libsvm format rather than csv. That notebook works fine. All the code is >>> essentially the same accounting for the difference in file formats. >>> >>> 3. I tested this same code on another Spark cloud platform and it >>> displays the same symptoms when run there. >>> >>> Thanks. >>> Rich >>> >>> >>> On Wed, Jun 29, 2016 at 12:59 AM, Bryan Cutler <cutl...@gmail.com> >>> wrote: >>> >>>> Are you fitting the VectorIndexer to the entire data set and not just >>>> training or test data? If you are able to post your code and some data to >>>> reproduce, that would help in troubleshooting. >>>> >>>> On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro <richta...@gmail.com> >>>> wrote: >>>> >>>>> Thanks for the response, but in my case I reversed the meaning of >>>>> "prediction" and "predictedLabel". It seemed to make more sense to me that >>>>> way, but in retrospect, it probably only causes confusion to anyone else >>>>> looking at this. I reran the code with all the pipeline stage inputs and >>>>> outputs named exactly as in the Random Forest Classifier example to make >>>>> sure I hadn't messed anything up when I renamed things. Same error. >>>>> >>>>> I'm still at the point where I can train the model and make >>>>> predictions, but not able to get the MulticlassClassificationEvaluator >>>>> to work on the DataFrame of predictions. >>>>> >>>>> Any other suggestions? Thanks. >>>>> >>>>> >>>>> >>>>> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro <richta...@gmail.com> >>>>> wrote: >>>>> >>>>>> I created a ML pipeline using the Random Forest Classifier - similar >>>>>> to what is described here except in my case the source data is in csv >>>>>> format rather than libsvm. >>>>>> >>>>>> https://spark.apache.org/docs/latest/ml-classification-regre >>>>>> ssion.html#random-forest-classifier >>>>>> >>>>>> I am able to successfully train the model and make predictions (on >>>>>> test data not used to train the model) as shown here. >>>>>> >>>>>> +------------+--------------+-----+----------+--------------------+ >>>>>> |indexedLabel|predictedLabel|label|prediction| features| >>>>>> +------------+--------------+-----+----------+--------------------+ >>>>>> | 4.0| 4.0| 0| 0|(784,[124,125,126...| >>>>>> | 2.0| 2.0| 3| 3|(784,[119,120,121...| >>>>>> | 8.0| 8.0| 8| 8|(784,[180,181,182...| >>>>>> | 0.0| 0.0| 1| 1|(784,[154,155,156...| >>>>>> | 3.0| 8.0| 2| 8|(784,[148,149,150...| >>>>>> +------------+--------------+-----+----------+--------------------+ >>>>>> only showing top 5 rows >>>>>> >>>>>> However, when I attempt to calculate the error between the indexedLabel >>>>>> and the precictedLabel using the MulticlassClassificationEvaluator, I >>>>>> get the NoSuchElementException error attached below. >>>>>> >>>>>> val evaluator = new >>>>>> MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision") >>>>>> val accuracy = evaluator.evaluate(predictions) >>>>>> println("Test Error = " + (1.0 - accuracy)) >>>>>> >>>>>> What could be the issue? >>>>>> >>>>>> >>>>>> >>>>>> Name: org.apache.spark.SparkException >>>>>> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed >>>>>> 10 times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, >>>>>> yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not >>>>>> found: 132.0 >>>>>> at scala.collection.MapLike$class.default(MapLike.scala:228) >>>>>> at scala.collection.AbstractMap.default(Map.scala:58) >>>>>> at scala.collection.MapLike$class.apply(MapLike.scala:141) >>>>>> at scala.collection.AbstractMap.apply(Map.scala:58) >>>>>> at >>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331) >>>>>> at >>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309) >>>>>> at >>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351) >>>>>> at >>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351) >>>>>> at >>>>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown >>>>>> Source) >>>>>> at >>>>>> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67) >>>>>> at >>>>>> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67) >>>>>> at >>>>>> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74) >>>>>> at >>>>>> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72) >>>>>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) >>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>> at >>>>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189) >>>>>> at >>>>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) >>>>>> at >>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>>>>> at >>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>>>>> at java.lang.Thread.run(Thread.java:785) >>>>>> >>>>>> Driver stacktrace: >>>>>> StackTrace: >>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) >>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) >>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) >>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) >>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >>>>>> scala.Option.foreach(Option.scala:236) >>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) >>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) >>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) >>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) >>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>> java.lang.Thread.getStackTrace(Thread.java:1117) >>>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) >>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) >>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1850) >>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1863) >>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1934) >>>>>> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) >>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) >>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) >>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316) >>>>>> org.apache.spark.rdd.RDD.collect(RDD.scala:926) >>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:741) >>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:740) >>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) >>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) >>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316) >>>>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:740) >>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:49) >>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:45) >>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision$lzycompute(MulticlassMetrics.scala:142) >>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision(MulticlassMetrics.scala:142)org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:84) >>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59) >>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64) >>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66) >>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68) >>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70) >>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72) >>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74) >>>>>> $line110.$read$$iwC$$iwC$$iwC.<init>(<console>:76) >>>>>> $line110.$read$$iwC$$iwC.<init>(<console>:78) >>>>>> $line110.$read$$iwC.<init>(<console>:80) >>>>>> $line110.$read.<init>(<console>:82) >>>>>> $line110.$read$.<init>(<console>:86) >>>>>> $line110.$read$.<clinit>(<console>) >>>>>> $line110.$eval$.<init>(<console>:7) >>>>>> $line110.$eval$.<clinit>(<console>) >>>>>> $line110.$eval.$print(<console>) >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95) >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55) >>>>>> java.lang.reflect.Method.invoke(Method.java:507) >>>>>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) >>>>>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) >>>>>> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) >>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) >>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) >>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:296) >>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:291) >>>>>> com.ibm.spark.global.StreamState$.withStreams(StreamState.scala:80) >>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290) >>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290) >>>>>> com.ibm.spark.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:123) >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>>>>> java.lang.Thread.run(Thread.java:785) >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> > > > -- > Twitter:http://twitter.com/Baahu > >