Hi Bryan, Thanks for the reply. I am indexing 5 columns ,then using these indexed columns to generate the "feature" column thru vector assembler. Which essentially means that I cannot use *fit()* directly on "completeDataset" dataframe since it will neither have the "feature" column and nor the 5 indexed columns. Of-course there is a dirty way of doing this, but I am wondering if there some optimized/intelligent approach for this.
Thanks, Baahu On Wed, Aug 31, 2016 at 3:30 AM, Bryan Cutler <cutl...@gmail.com> wrote: > You need to first fit just the VectorIndexer which returns the model, then > add the model to the pipeline where it will only transform. > > val featureVectorIndexer = new VectorIndexer() > .setInputCol("feature") > .setOutputCol("indexedfeature") > .setMaxCategories(180) > .fit(completeDataset) > > On Tue, Aug 30, 2016 at 9:57 AM, Bahubali Jain <bahub...@gmail.com> wrote: > >> Hi, >> I had run into similar exception " java.util.NoSuchElementException: key >> not found: " . >> After further investigation I realized it is happening due to >> vectorindexer being executed on training dataset and not on entire dataset. >> >> In the dataframe I have 5 categories , each of these have to go thru >> stringindexer and then these are put thru a vector indexer to generate >> feature vector. >> What is the right way to do this, so that vector indexer can be run on >> the entire data and not just on training data? >> >> Below is the current approach, as evident VectorIndexer is being >> generated based on the training set. >> >> Please Note: fit() on Vectorindexer cannot be called on entireset >> dataframe since it doesn't have the required column(*feature *column is >> being generated dynamically in pipeline execution) >> How can the vectorindexer be *fit()* on the entireset? >> >> val col1_indexer = new StringIndexer().setInputCol("c >> ol1").setOutputCol("indexed_col1") >> val col2_indexer = new StringIndexer().setInputCol("c >> ol2").setOutputCol("indexed_col2") >> val col3_indexer = new StringIndexer().setInputCol("c >> ol3").setOutputCol("indexed_col3") >> val col4_indexer = new StringIndexer().setInputCol("c >> ol4").setOutputCol("indexed_col4") >> val col5_indexer = new StringIndexer().setInputCol("c >> ol5").setOutputCol("indexed_col5") >> >> val featureArray = Array("indexed_col1","indexed_ >> col2","indexed_col3","indexed_col4","indexed_col5") >> val vectorAssembler = new VectorAssembler().setInputCols >> (featureArray).setOutputCol("*feature*") >> val featureVectorIndexer = new VectorIndexer() >> .setInputCol("feature") >> .setOutputCol("indexedfeature") >> .setMaxCategories(180) >> >> val decisionTree = new DecisionTreeClassifier().setMa >> xBins(300).setMaxDepth(1).setImpurity("entropy").setLabe >> lCol("indexed_user_action").setFeaturesCol("indexedfeature >> ").setPredictionCol("prediction") >> >> val pipeline = new Pipeline().setStages(Array(col1_indexer,col2_indexer, >> col3_indexer,col4_indexer,col5_indexer,vectorAssembler,featureVecto >> rIndexer,decisionTree)) >> val model = pipeline.*fit(trainingSet)* >> val output = model.transform(cvSet) >> >> >> Thanks, >> Baahu >> >> On Fri, Jul 8, 2016 at 11:24 PM, Bryan Cutler <cutl...@gmail.com> wrote: >> >>> Hi Rich, >>> >>> I looked at the notebook and it seems like you are fitting the >>> StringIndexer and VectorIndexer to only the training data, and it should >>> the the entire data set. So if the training data does not include all of >>> the labels and an unknown label appears in the test data during evaluation, >>> then it will not know how to index it. So your code should be like this, >>> fit with 'digits' instead of 'training' >>> >>> val labelIndexer = new StringIndexer().setInputCol("l >>> abel").setOutputCol("indexedLabel").fit(digits) >>> // Automatically identify categorical features, and index them. >>> // Set maxCategories so features with > 4 distinct values are treated as >>> continuous. >>> val featureIndexer = new VectorIndexer().setInputCol("f >>> eatures").setOutputCol("indexedFeatures").setMaxCategories(4 >>> ).fit(digits) >>> >>> Hope that helps! >>> >>> On Fri, Jul 1, 2016 at 9:24 AM, Rich Tarro <richta...@gmail.com> wrote: >>> >>>> Hi Bryan. >>>> >>>> Thanks for your continued help. >>>> >>>> Here is the code shown in a Jupyter notebook. I figured this was easier >>>> that cutting and pasting the code into an email. If you would like me to >>>> send you the code in a different format let, me know. The necessary data is >>>> all downloaded within the notebook itself. >>>> >>>> https://console.ng.bluemix.net/data/notebooks/fe7e578a-401f- >>>> 4744-a318-b1b6bcf6f5f8/view?access_token=2f6df7b1dfcb3c1c2d9 >>>> 4a794506bb282729dab8f05118fafe5f11886326e02fc >>>> >>>> A few additional pieces of information. >>>> >>>> 1. The training dataset is cached before training the model. If you do >>>> not cache the training dataset, the model will not train. The code >>>> model.transform(test) fails with a similar error. No other changes besides >>>> caching or not caching. Again, with the training dataset cached, the model >>>> can be successfully trained as seen in the notebook. >>>> >>>> 2. I have another version of the notebook where I download the same >>>> data in libsvm format rather than csv. That notebook works fine. All the >>>> code is essentially the same accounting for the difference in file formats. >>>> >>>> 3. I tested this same code on another Spark cloud platform and it >>>> displays the same symptoms when run there. >>>> >>>> Thanks. >>>> Rich >>>> >>>> >>>> On Wed, Jun 29, 2016 at 12:59 AM, Bryan Cutler <cutl...@gmail.com> >>>> wrote: >>>> >>>>> Are you fitting the VectorIndexer to the entire data set and not just >>>>> training or test data? If you are able to post your code and some data to >>>>> reproduce, that would help in troubleshooting. >>>>> >>>>> On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro <richta...@gmail.com> >>>>> wrote: >>>>> >>>>>> Thanks for the response, but in my case I reversed the meaning of >>>>>> "prediction" and "predictedLabel". It seemed to make more sense to me >>>>>> that >>>>>> way, but in retrospect, it probably only causes confusion to anyone else >>>>>> looking at this. I reran the code with all the pipeline stage inputs and >>>>>> outputs named exactly as in the Random Forest Classifier example to make >>>>>> sure I hadn't messed anything up when I renamed things. Same error. >>>>>> >>>>>> I'm still at the point where I can train the model and make >>>>>> predictions, but not able to get the MulticlassClassificationEvaluator >>>>>> to work on the DataFrame of predictions. >>>>>> >>>>>> Any other suggestions? Thanks. >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro <richta...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I created a ML pipeline using the Random Forest Classifier - similar >>>>>>> to what is described here except in my case the source data is in csv >>>>>>> format rather than libsvm. >>>>>>> >>>>>>> https://spark.apache.org/docs/latest/ml-classification-regre >>>>>>> ssion.html#random-forest-classifier >>>>>>> >>>>>>> I am able to successfully train the model and make predictions (on >>>>>>> test data not used to train the model) as shown here. >>>>>>> >>>>>>> +------------+--------------+-----+----------+--------------------+ >>>>>>> |indexedLabel|predictedLabel|label|prediction| features| >>>>>>> +------------+--------------+-----+----------+--------------------+ >>>>>>> | 4.0| 4.0| 0| 0|(784,[124,125,126...| >>>>>>> | 2.0| 2.0| 3| 3|(784,[119,120,121...| >>>>>>> | 8.0| 8.0| 8| 8|(784,[180,181,182...| >>>>>>> | 0.0| 0.0| 1| 1|(784,[154,155,156...| >>>>>>> | 3.0| 8.0| 2| 8|(784,[148,149,150...| >>>>>>> +------------+--------------+-----+----------+--------------------+ >>>>>>> only showing top 5 rows >>>>>>> >>>>>>> However, when I attempt to calculate the error between the indexedLabel >>>>>>> and the precictedLabel using the MulticlassClassificationEvaluator, I >>>>>>> get the NoSuchElementException error attached below. >>>>>>> >>>>>>> val evaluator = new >>>>>>> MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision") >>>>>>> val accuracy = evaluator.evaluate(predictions) >>>>>>> println("Test Error = " + (1.0 - accuracy)) >>>>>>> >>>>>>> What could be the issue? >>>>>>> >>>>>>> >>>>>>> >>>>>>> Name: org.apache.spark.SparkException >>>>>>> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed >>>>>>> 10 times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, >>>>>>> yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not >>>>>>> found: 132.0 >>>>>>> at scala.collection.MapLike$class.default(MapLike.scala:228) >>>>>>> at scala.collection.AbstractMap.default(Map.scala:58) >>>>>>> at scala.collection.MapLike$class.apply(MapLike.scala:141) >>>>>>> at scala.collection.AbstractMap.apply(Map.scala:58) >>>>>>> at >>>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331) >>>>>>> at >>>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309) >>>>>>> at >>>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351) >>>>>>> at >>>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351) >>>>>>> at >>>>>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown >>>>>>> Source) >>>>>>> at >>>>>>> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67) >>>>>>> at >>>>>>> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67) >>>>>>> at >>>>>>> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74) >>>>>>> at >>>>>>> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72) >>>>>>> at >>>>>>> scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) >>>>>>> at >>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>> at >>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>> at >>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>> at >>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>> at >>>>>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189) >>>>>>> at >>>>>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) >>>>>>> at >>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>>>>>> at >>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>>>>> at >>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>>>>>> at java.lang.Thread.run(Thread.java:785) >>>>>>> >>>>>>> Driver stacktrace: >>>>>>> StackTrace: >>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) >>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) >>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) >>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) >>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >>>>>>> scala.Option.foreach(Option.scala:236) >>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) >>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) >>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) >>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) >>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>> java.lang.Thread.getStackTrace(Thread.java:1117) >>>>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) >>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) >>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1850) >>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1863) >>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1934) >>>>>>> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) >>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) >>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) >>>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316) >>>>>>> org.apache.spark.rdd.RDD.collect(RDD.scala:926) >>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:741) >>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:740) >>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) >>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) >>>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316) >>>>>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:740) >>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:49) >>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:45) >>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision$lzycompute(MulticlassMetrics.scala:142) >>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision(MulticlassMetrics.scala:142)org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:84) >>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59) >>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64) >>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66) >>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68) >>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70) >>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72) >>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74) >>>>>>> $line110.$read$$iwC$$iwC$$iwC.<init>(<console>:76) >>>>>>> $line110.$read$$iwC$$iwC.<init>(<console>:78) >>>>>>> $line110.$read$$iwC.<init>(<console>:80) >>>>>>> $line110.$read.<init>(<console>:82) >>>>>>> $line110.$read$.<init>(<console>:86) >>>>>>> $line110.$read$.<clinit>(<console>) >>>>>>> $line110.$eval$.<init>(<console>:7) >>>>>>> $line110.$eval$.<clinit>(<console>) >>>>>>> $line110.$eval.$print(<console>) >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95) >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55) >>>>>>> java.lang.reflect.Method.invoke(Method.java:507) >>>>>>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) >>>>>>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) >>>>>>> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) >>>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) >>>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) >>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:296) >>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:291) >>>>>>> com.ibm.spark.global.StreamState$.withStreams(StreamState.scala:80) >>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290) >>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290) >>>>>>> com.ibm.spark.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:123) >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>>>>>> java.lang.Thread.run(Thread.java:785) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >> >> -- >> Twitter:http://twitter.com/Baahu >> >> > -- Twitter:http://twitter.com/Baahu