[ 
https://issues.apache.org/jira/browse/SPARK-12944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122518#comment-15122518
 ] 

Seth Hendrickson commented on SPARK-12944:
------------------------------------------

Wanted to update - 
There are two parts to this problem: First, the current {{Params.copy}} method 
does not call {{_set}} and so does not use the {{expectedType}} mechanism that 
was added for parameter type safety in Python. Second, not all parameters 
currently provide an {{expectedType}}.

The current plan is to add expected types for model-specific params 
[here|https://issues.apache.org/jira/browse/SPARK-13066]. Then in this JIRA we 
can update the {{copy}} method to set the parameter values properly, which 
makes the above problem go away. Please let me know if there are concerns with 
this plan.

> CrossValidator doesn't accept a Pipeline as an estimator
> --------------------------------------------------------
>
>                 Key: SPARK-12944
>                 URL: https://issues.apache.org/jira/browse/SPARK-12944
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, PySpark
>    Affects Versions: 1.6.0
>         Environment: spark-1.6.0-bin-hadoop2.6
> Python 3.4.4 :: Anaconda 2.4.1
>            Reporter: John Hogue
>            Priority: Minor
>
> Pipeline is supposed to act as an estimator which CrossValidator currently 
> throws error.
> {code}
> from pyspark.ml.evaluation import MulticlassClassificationEvaluator
> from pyspark.ml.tuning import ParamGridBuilder
> from pyspark.ml.tuning import CrossValidator
> # Configure an ML pipeline, which consists of tree stages: tokenizer, 
> hashingTF, and nb.
> tokenizer = Tokenizer(inputCol="text", outputCol="words")
> hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
> nb = NaiveBayes()
> pipeline = Pipeline(stages=[tokenizer, hashingTF, nb])
> paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0, 1]).build()
> cv = CrossValidator(estimator=pipeline, 
>                     estimatorParamMaps=paramGrid, 
>                     evaluator=MulticlassClassificationEvaluator(), 
>                     numFolds=4)
> cvModel = cv.fit(training_df)
> {code}
> Sample dataset can be found here:
> https://github.com/dreyco676/nlp_spark/blob/master/data.zip
> The file can be converted to a DataFrame with:
> {code}
> # Load precleaned training set
> training_rdd = sc.textFile("data/clean_training.txt")
> parts_rdd = training_rdd.map(lambda l: l.split("\t"))
> # Filter bad rows out
> garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)
> typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2])))
> # Create DataFrame
> training_df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"])
> {code}
> Running the pipeline throws the following stack trace:
> {code}
> ---------------------------------------------------------------------------Py4JJavaError
>                              Traceback (most recent call 
> last)<ipython-input-3-34e9e27acada> in <module>()
>      17                     numFolds=4)
>      18 
> ---> 19 cvModel = cv.fit(training_df)
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in 
> fit(self, dataset, params)
>      67                 return self.copy(params)._fit(dataset)
>      68             else:
> ---> 69                 return self._fit(dataset)
>      70         else:
>      71             raise ValueError("Params must be either a param map or a 
> list/tuple of param maps, "
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/tuning.py in 
> _fit(self, dataset)
>     237             train = df.filter(~condition)
>     238             for j in range(numModels):
> --> 239                 model = est.fit(train, epm[j])
>     240                 # TODO: duplicate evaluator to take extra params from 
> input
>     241                 metric = eva.evaluate(model.transform(validation, 
> epm[j]))
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in 
> fit(self, dataset, params)
>      65         elif isinstance(params, dict):
>      66             if params:
> ---> 67                 return self.copy(params)._fit(dataset)
>      68             else:
>      69                 return self._fit(dataset)
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in 
> _fit(self, dataset)
>     211                     dataset = stage.transform(dataset)
>     212                 else:  # must be an Estimator
> --> 213                     model = stage.fit(dataset)
>     214                     transformers.append(model)
>     215                     if i < indexOfLastEstimator:
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in 
> fit(self, dataset, params)
>      67                 return self.copy(params)._fit(dataset)
>      68             else:
> ---> 69                 return self._fit(dataset)
>      70         else:
>      71             raise ValueError("Params must be either a param map or a 
> list/tuple of param maps, "
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in 
> _fit(self, dataset)
>     130 
>     131     def _fit(self, dataset):
> --> 132         java_model = self._fit_java(dataset)
>     133         return self._create_model(java_model)
>     134 
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in 
> _fit_java(self, dataset)
>     126         :return: fitted Java model
>     127         """
> --> 128         self._transfer_params_to_java()
>     129         return self._java_obj.fit(dataset._jdf)
>     130 
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in 
> _transfer_params_to_java(self)
>      80         for param in self.params:
>      81             if param in paramMap:
> ---> 82                 pair = self._make_java_param_pair(param, 
> paramMap[param])
>      83                 self._java_obj.set(pair)
>      84 
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in 
> _make_java_param_pair(self, param, value)
>      71         java_param = self._java_obj.getParam(param.name)
>      72         java_value = _py2java(sc, value)
> ---> 73         return java_param.w(java_value)
>      74 
>      75     def _transfer_params_to_java(self):
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py
>  in __call__(self, *args)
>     811         answer = self.gateway_client.send_command(command)
>     812         return_value = get_return_value(
> --> 813             answer, self.gateway_client, self.target_id, self.name)
>     814 
>     815         for temp_arg in temp_args:
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.py in 
> deco(*a, **kw)
>      43     def deco(*a, **kw):
>      44         try:
> ---> 45             return f(*a, **kw)
>      46         except py4j.protocol.Py4JJavaError as e:
>      47             s = e.java_exception.toString()
> /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py
>  in get_return_value(answer, gateway_client, target_id, name)
>     306                 raise Py4JJavaError(
>     307                     "An error occurred while calling {0}{1}{2}.\n".
> --> 308                     format(target_id, ".", name), value)
>     309             else:
>     310                 raise Py4JError(
> Py4JJavaError: An error occurred while calling o113.w.
> : java.lang.ClassCastException: java.lang.Integer cannot be cast to 
> java.lang.Double
>       at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
>       at org.apache.spark.ml.param.DoubleParam.w(params.scala:223)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
>       at py4j.Gateway.invoke(Gateway.java:259)
>       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>       at py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at py4j.GatewayConnection.run(GatewayConnection.java:209)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> Workaround is to run Transformers outside of pipeline. This ruins the purpose 
> of Pipelines.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to