[ https://issues.apache.org/jira/browse/SPARK-12944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15117905#comment-15117905 ]
Joseph K. Bradley commented on SPARK-12944: ------------------------------------------- Please do; it looks like a simple fix in addGrid. thanks! > CrossValidator doesn't accept a Pipeline as an estimator > -------------------------------------------------------- > > Key: SPARK-12944 > URL: https://issues.apache.org/jira/browse/SPARK-12944 > Project: Spark > Issue Type: Bug > Components: ML, PySpark > Affects Versions: 1.6.0 > Environment: spark-1.6.0-bin-hadoop2.6 > Python 3.4.4 :: Anaconda 2.4.1 > Reporter: John Hogue > Priority: Minor > > Pipeline is supposed to act as an estimator which CrossValidator currently > throws error. > {code} > from pyspark.ml.evaluation import MulticlassClassificationEvaluator > from pyspark.ml.tuning import ParamGridBuilder > from pyspark.ml.tuning import CrossValidator > # Configure an ML pipeline, which consists of tree stages: tokenizer, > hashingTF, and nb. > tokenizer = Tokenizer(inputCol="text", outputCol="words") > hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") > nb = NaiveBayes() > pipeline = Pipeline(stages=[tokenizer, hashingTF, nb]) > paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0, 1]).build() > cv = CrossValidator(estimator=pipeline, > estimatorParamMaps=paramGrid, > evaluator=MulticlassClassificationEvaluator(), > numFolds=4) > cvModel = cv.fit(training_df) > {code} > Sample dataset can be found here: > https://github.com/dreyco676/nlp_spark/blob/master/data.zip > The file can be converted to a DataFrame with: > {code} > # Load precleaned training set > training_rdd = sc.textFile("data/clean_training.txt") > parts_rdd = training_rdd.map(lambda l: l.split("\t")) > # Filter bad rows out > garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3) > typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2]))) > # Create DataFrame > training_df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"]) > {code} > Running the pipeline throws the following stack trace: > {code} > ---------------------------------------------------------------------------Py4JJavaError > Traceback (most recent call > last)<ipython-input-3-34e9e27acada> in <module>() > 17 numFolds=4) > 18 > ---> 19 cvModel = cv.fit(training_df) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/tuning.py in > _fit(self, dataset) > 237 train = df.filter(~condition) > 238 for j in range(numModels): > --> 239 model = est.fit(train, epm[j]) > 240 # TODO: duplicate evaluator to take extra params from > input > 241 metric = eva.evaluate(model.transform(validation, > epm[j])) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 65 elif isinstance(params, dict): > 66 if params: > ---> 67 return self.copy(params)._fit(dataset) > 68 else: > 69 return self._fit(dataset) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > _fit(self, dataset) > 211 dataset = stage.transform(dataset) > 212 else: # must be an Estimator > --> 213 model = stage.fit(dataset) > 214 transformers.append(model) > 215 if i < indexOfLastEstimator: > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in > _fit(self, dataset) > 130 > 131 def _fit(self, dataset): > --> 132 java_model = self._fit_java(dataset) > 133 return self._create_model(java_model) > 134 > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in > _fit_java(self, dataset) > 126 :return: fitted Java model > 127 """ > --> 128 self._transfer_params_to_java() > 129 return self._java_obj.fit(dataset._jdf) > 130 > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in > _transfer_params_to_java(self) > 80 for param in self.params: > 81 if param in paramMap: > ---> 82 pair = self._make_java_param_pair(param, > paramMap[param]) > 83 self._java_obj.set(pair) > 84 > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in > _make_java_param_pair(self, param, value) > 71 java_param = self._java_obj.getParam(param.name) > 72 java_value = _py2java(sc, value) > ---> 73 return java_param.w(java_value) > 74 > 75 def _transfer_params_to_java(self): > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py > in __call__(self, *args) > 811 answer = self.gateway_client.send_command(command) > 812 return_value = get_return_value( > --> 813 answer, self.gateway_client, self.target_id, self.name) > 814 > 815 for temp_arg in temp_args: > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.py in > deco(*a, **kw) > 43 def deco(*a, **kw): > 44 try: > ---> 45 return f(*a, **kw) > 46 except py4j.protocol.Py4JJavaError as e: > 47 s = e.java_exception.toString() > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py > in get_return_value(answer, gateway_client, target_id, name) > 306 raise Py4JJavaError( > 307 "An error occurred while calling {0}{1}{2}.\n". > --> 308 format(target_id, ".", name), value) > 309 else: > 310 raise Py4JError( > Py4JJavaError: An error occurred while calling o113.w. > : java.lang.ClassCastException: java.lang.Integer cannot be cast to > java.lang.Double > at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119) > at org.apache.spark.ml.param.DoubleParam.w(params.scala:223) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) > at py4j.Gateway.invoke(Gateway.java:259) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:209) > at java.lang.Thread.run(Thread.java:745) > {code} > Workaround is to run Transformers outside of pipeline. This ruins the purpose > of Pipelines. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org