I see.  You might try this, create a pipeline of just your feature
transformers, then call fit() on the complete dataset to get a model.
Finally make second pipeline and add this model and the decision tree as
stages.

On Aug 30, 2016 8:19 PM, "Bahubali Jain" <bahub...@gmail.com> wrote:

> Hi Bryan,
> Thanks for the reply.
> I am indexing 5 columns ,then using these indexed columns to generate the
> "feature" column thru vector assembler.
> Which essentially means that I cannot use *fit()* directly on
> "completeDataset" dataframe since it will neither have the "feature" column
> and nor the 5 indexed columns.
> Of-course there is a dirty way of doing this, but I am wondering if there
> some optimized/intelligent approach for this.
>
> Thanks,
> Baahu
>
> On Wed, Aug 31, 2016 at 3:30 AM, Bryan Cutler <cutl...@gmail.com> wrote:
>
>> You need to first fit just the VectorIndexer which returns the model,
>> then add the model to the pipeline where it will only transform.
>>
>> val featureVectorIndexer = new VectorIndexer()
>>     .setInputCol("feature")
>>     .setOutputCol("indexedfeature")
>>     .setMaxCategories(180)
>>     .fit(completeDataset)
>>
>> On Tue, Aug 30, 2016 at 9:57 AM, Bahubali Jain <bahub...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I had run into similar exception " java.util.NoSuchElementException:
>>> key not found: " .
>>> After further investigation I realized it is happening due to
>>> vectorindexer being executed on training dataset and not on entire dataset.
>>>
>>> In the dataframe I have 5 categories , each of these have to go thru
>>> stringindexer and then these are put thru a vector indexer to generate
>>> feature vector.
>>> What is the right way to do this, so that vector indexer can be run on
>>> the entire data and not just on training data?
>>>
>>> Below is the current approach, as evident  VectorIndexer is being
>>> generated based on the training set.
>>>
>>> Please Note: fit() on Vectorindexer cannot be called on entireset
>>> dataframe since it doesn't have the required column(*feature *column is
>>> being generated dynamically in pipeline execution)
>>> How can the vectorindexer be *fit()* on the entireset?
>>>
>>>  val col1_indexer = new StringIndexer().setInputCol("c
>>> ol1").setOutputCol("indexed_col1")
>>> val col2_indexer = new StringIndexer().setInputCol("c
>>> ol2").setOutputCol("indexed_col2")
>>> val col3_indexer = new StringIndexer().setInputCol("c
>>> ol3").setOutputCol("indexed_col3")
>>> val col4_indexer = new StringIndexer().setInputCol("c
>>> ol4").setOutputCol("indexed_col4")
>>> val col5_indexer = new StringIndexer().setInputCol("c
>>> ol5").setOutputCol("indexed_col5")
>>>
>>> val featureArray =  Array("indexed_col1","indexed_
>>> col2","indexed_col3","indexed_col4","indexed_col5")
>>> val vectorAssembler = new VectorAssembler().setInputCols
>>> (featureArray).setOutputCol("*feature*")
>>> val featureVectorIndexer = new VectorIndexer()
>>>     .setInputCol("feature")
>>>     .setOutputCol("indexedfeature")
>>>     .setMaxCategories(180)
>>>
>>> val decisionTree = new DecisionTreeClassifier().setMa
>>> xBins(300).setMaxDepth(1).setImpurity("entropy").setLabelCol
>>> ("indexed_user_action").setFeaturesCol("indexedfeature").
>>> setPredictionCol("prediction")
>>>
>>> val pipeline = new Pipeline().setStages(Array(col1_indexer,col2_indexer,
>>> col3_indexer,col4_indexer,col5_indexer,vectorAssembler,featureVecto
>>> rIndexer,decisionTree))
>>> val model = pipeline.*fit(trainingSet)*
>>> val output = model.transform(cvSet)
>>>
>>>
>>> Thanks,
>>> Baahu
>>>
>>> On Fri, Jul 8, 2016 at 11:24 PM, Bryan Cutler <cutl...@gmail.com> wrote:
>>>
>>>> Hi Rich,
>>>>
>>>> I looked at the notebook and it seems like you are fitting the
>>>> StringIndexer and VectorIndexer to only the training data, and it should
>>>> the the entire data set.  So if the training data does not include all of
>>>> the labels and an unknown label appears in the test data during evaluation,
>>>> then it will not know how to index it.  So your code should be like this,
>>>> fit with 'digits' instead of 'training'
>>>>
>>>> val labelIndexer = new StringIndexer().setInputCol("l
>>>> abel").setOutputCol("indexedLabel").fit(digits)
>>>> // Automatically identify categorical features, and index them.
>>>> // Set maxCategories so features with > 4 distinct values are treated
>>>> as continuous.
>>>> val featureIndexer = new VectorIndexer().setInputCol("f
>>>> eatures").setOutputCol("indexedFeatures").setMaxCategories(4
>>>> ).fit(digits)
>>>>
>>>> Hope that helps!
>>>>
>>>> On Fri, Jul 1, 2016 at 9:24 AM, Rich Tarro <richta...@gmail.com> wrote:
>>>>
>>>>> Hi Bryan.
>>>>>
>>>>> Thanks for your continued help.
>>>>>
>>>>> Here is the code shown in a Jupyter notebook. I figured this was
>>>>> easier that cutting and pasting the code into an email. If you  would like
>>>>> me to send you the code in a different format let, me know. The necessary
>>>>> data is all downloaded within the notebook itself.
>>>>>
>>>>> https://console.ng.bluemix.net/data/notebooks/fe7e578a-401f-
>>>>> 4744-a318-b1b6bcf6f5f8/view?access_token=2f6df7b1dfcb3c1c2d9
>>>>> 4a794506bb282729dab8f05118fafe5f11886326e02fc
>>>>>
>>>>> A few additional pieces of information.
>>>>>
>>>>> 1. The training dataset is cached before training the model. If you do
>>>>> not cache the training dataset, the model will not train. The code
>>>>> model.transform(test) fails with a similar error. No other changes besides
>>>>> caching or not caching. Again, with the training dataset cached, the model
>>>>> can be successfully trained as seen in the notebook.
>>>>>
>>>>> 2. I have another version of the notebook where I download the same
>>>>> data in libsvm format rather than csv. That notebook works fine. All the
>>>>> code is essentially the same accounting for the difference in file 
>>>>> formats.
>>>>>
>>>>> 3. I tested this same code on another Spark cloud platform and it
>>>>> displays the same symptoms when run there.
>>>>>
>>>>> Thanks.
>>>>> Rich
>>>>>
>>>>>
>>>>> On Wed, Jun 29, 2016 at 12:59 AM, Bryan Cutler <cutl...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Are you fitting the VectorIndexer to the entire data set and not just
>>>>>> training or test data?  If you are able to post your code and some data 
>>>>>> to
>>>>>> reproduce, that would help in troubleshooting.
>>>>>>
>>>>>> On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro <richta...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the response, but in my case I reversed the meaning of
>>>>>>> "prediction" and "predictedLabel". It seemed to make more sense to me 
>>>>>>> that
>>>>>>> way, but in retrospect, it probably only causes confusion to anyone else
>>>>>>> looking at this. I reran the code with all the pipeline stage inputs and
>>>>>>> outputs named exactly as in the Random Forest Classifier example to make
>>>>>>> sure I hadn't messed anything up when I renamed things. Same error.
>>>>>>>
>>>>>>> I'm still at the point where I can train the model and make
>>>>>>> predictions, but not able to get the MulticlassClassificationEvaluator
>>>>>>> to work on the DataFrame of predictions.
>>>>>>>
>>>>>>> Any other suggestions? Thanks.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro <richta...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I created a ML pipeline using the Random Forest Classifier -
>>>>>>>> similar to what is described here except in my case the source data is 
>>>>>>>> in
>>>>>>>> csv format rather than libsvm.
>>>>>>>>
>>>>>>>> https://spark.apache.org/docs/latest/ml-classification-regre
>>>>>>>> ssion.html#random-forest-classifier
>>>>>>>>
>>>>>>>> I am able to successfully train the model and make predictions (on
>>>>>>>> test data not used to train the model) as shown here.
>>>>>>>>
>>>>>>>> +------------+--------------+-----+----------+--------------------+
>>>>>>>> |indexedLabel|predictedLabel|label|prediction|            features|
>>>>>>>> +------------+--------------+-----+----------+--------------------+
>>>>>>>> |         4.0|           4.0|    0|         0|(784,[124,125,126...|
>>>>>>>> |         2.0|           2.0|    3|         3|(784,[119,120,121...|
>>>>>>>> |         8.0|           8.0|    8|         8|(784,[180,181,182...|
>>>>>>>> |         0.0|           0.0|    1|         1|(784,[154,155,156...|
>>>>>>>> |         3.0|           8.0|    2|         8|(784,[148,149,150...|
>>>>>>>> +------------+--------------+-----+----------+--------------------+
>>>>>>>> only showing top 5 rows
>>>>>>>>
>>>>>>>> However, when I attempt to calculate the error between the 
>>>>>>>> indexedLabel and the precictedLabel using the 
>>>>>>>> MulticlassClassificationEvaluator, I get the NoSuchElementException 
>>>>>>>> error attached below.
>>>>>>>>
>>>>>>>> val evaluator = new 
>>>>>>>> MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
>>>>>>>> val accuracy = evaluator.evaluate(predictions)
>>>>>>>> println("Test Error = " + (1.0 - accuracy))
>>>>>>>>
>>>>>>>> What could be the issue?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Name: org.apache.spark.SparkException
>>>>>>>> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed 
>>>>>>>> 10 times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, 
>>>>>>>> yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not 
>>>>>>>> found: 132.0
>>>>>>>>        at scala.collection.MapLike$class.default(MapLike.scala:228)
>>>>>>>>        at scala.collection.AbstractMap.default(Map.scala:58)
>>>>>>>>        at scala.collection.MapLike$class.apply(MapLike.scala:141)
>>>>>>>>        at scala.collection.AbstractMap.apply(Map.scala:58)
>>>>>>>>        at 
>>>>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331)
>>>>>>>>        at 
>>>>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309)
>>>>>>>>        at 
>>>>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>>>>>>>        at 
>>>>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>>>>>>>        at 
>>>>>>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>>>>>>>>  Source)
>>>>>>>>        at 
>>>>>>>> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>>>>>>>        at 
>>>>>>>> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>>>>>>>        at 
>>>>>>>> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
>>>>>>>>        at 
>>>>>>>> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
>>>>>>>>        at 
>>>>>>>> scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>>>>>>>>        at 
>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>        at 
>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>        at 
>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>        at 
>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>        at 
>>>>>>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>>>>>>>>        at 
>>>>>>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>>>>>>>>        at 
>>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>>>>>>        at 
>>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>>>        at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>>>>        at 
>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>>>>        at 
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>>>>>>>>        at 
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>>>>>        at java.lang.Thread.run(Thread.java:785)
>>>>>>>>
>>>>>>>> Driver stacktrace:
>>>>>>>> StackTrace: 
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>>>>> scala.Option.foreach(Option.scala:236)
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>> java.lang.Thread.getStackTrace(Thread.java:1117)
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
>>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
>>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1863)
>>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1934)
>>>>>>>> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
>>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>>>>>> org.apache.spark.rdd.RDD.collect(RDD.scala:926)
>>>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:741)
>>>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:740)
>>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>>>>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:740)
>>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:49)
>>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:45)
>>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision$lzycompute(MulticlassMetrics.scala:142)
>>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision(MulticlassMetrics.scala:142)org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:84)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
>>>>>>>> $line110.$read$$iwC$$iwC$$iwC.<init>(<console>:76)
>>>>>>>> $line110.$read$$iwC$$iwC.<init>(<console>:78)
>>>>>>>> $line110.$read$$iwC.<init>(<console>:80)
>>>>>>>> $line110.$read.<init>(<console>:82)
>>>>>>>> $line110.$read$.<init>(<console>:86)
>>>>>>>> $line110.$read$.<clinit>(<console>)
>>>>>>>> $line110.$eval$.<init>(<console>:7)
>>>>>>>> $line110.$eval$.<clinit>(<console>)
>>>>>>>> $line110.$eval.$print(<console>)
>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
>>>>>>>> java.lang.reflect.Method.invoke(Method.java:507)
>>>>>>>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>>>>>>>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
>>>>>>>> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
>>>>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
>>>>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:296)
>>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:291)
>>>>>>>> com.ibm.spark.global.StreamState$.withStreams(StreamState.scala:80)
>>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
>>>>>>>> com.ibm.spark.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:123)
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>>>>> java.lang.Thread.run(Thread.java:785)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Twitter:http://twitter.com/Baahu
>>>
>>>
>>
>
>
> --
> Twitter:http://twitter.com/Baahu
>
>

Reply via email to