[jira] [Commented] (SPARK-12944) CrossValidator doesn't accept a Pipeline as an estimator
[ https://issues.apache.org/jira/browse/SPARK-12944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15178709#comment-15178709 ] John Hogue commented on SPARK-12944: Perfect! > CrossValidator doesn't accept a Pipeline as an estimator > > > Key: SPARK-12944 > URL: https://issues.apache.org/jira/browse/SPARK-12944 > Project: Spark > Issue Type: Bug > Components: ML, PySpark >Affects Versions: 1.6.0 > Environment: spark-1.6.0-bin-hadoop2.6 > Python 3.4.4 :: Anaconda 2.4.1 >Reporter: John Hogue >Priority: Minor > > Pipeline is supposed to act as an estimator which CrossValidator currently > throws error. > {code} > from pyspark.ml.evaluation import MulticlassClassificationEvaluator > from pyspark.ml.tuning import ParamGridBuilder > from pyspark.ml.tuning import CrossValidator > # Configure an ML pipeline, which consists of tree stages: tokenizer, > hashingTF, and nb. > tokenizer = Tokenizer(inputCol="text", outputCol="words") > hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") > nb = NaiveBayes() > pipeline = Pipeline(stages=[tokenizer, hashingTF, nb]) > paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0, 1]).build() > cv = CrossValidator(estimator=pipeline, > estimatorParamMaps=paramGrid, > evaluator=MulticlassClassificationEvaluator(), > numFolds=4) > cvModel = cv.fit(training_df) > {code} > Sample dataset can be found here: > https://github.com/dreyco676/nlp_spark/blob/master/data.zip > The file can be converted to a DataFrame with: > {code} > # Load precleaned training set > training_rdd = sc.textFile("data/clean_training.txt") > parts_rdd = training_rdd.map(lambda l: l.split("\t")) > # Filter bad rows out > garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3) > typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2]))) > # Create DataFrame > training_df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"]) > {code} > Running the pipeline throws the following stack trace: > {code} > ---Py4JJavaError > Traceback (most recent call > last) in () > 17 numFolds=4) > 18 > ---> 19 cvModel = cv.fit(training_df) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/tuning.py in > _fit(self, dataset) > 237 train = df.filter(~condition) > 238 for j in range(numModels): > --> 239 model = est.fit(train, epm[j]) > 240 # TODO: duplicate evaluator to take extra params from > input > 241 metric = eva.evaluate(model.transform(validation, > epm[j])) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 65 elif isinstance(params, dict): > 66 if params: > ---> 67 return self.copy(params)._fit(dataset) > 68 else: > 69 return self._fit(dataset) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > _fit(self, dataset) > 211 dataset = stage.transform(dataset) > 212 else: # must be an Estimator > --> 213 model = stage.fit(dataset) > 214 transformers.append(model) > 215 if i < indexOfLastEstimator: > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in > _fit(self, dataset) > 130 > 131 def _fit(self, dataset): > --> 132 java_model = self._fit_java(dataset) > 133 return self._create_model(java_model) > 134 > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in > _fit_java(self, dataset) > 126 :return: fitted Java model > 127 """ > --> 128 self._transfer_params_to_java() > 129 return
[jira] [Commented] (SPARK-12944) CrossValidator doesn't accept a Pipeline as an estimator
[ https://issues.apache.org/jira/browse/SPARK-12944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15178705#comment-15178705 ] Joseph K. Bradley commented on SPARK-12944: --- We'll continue discussing on the linked JIRA. Thanks [~dreyco676] for bringing this up! > CrossValidator doesn't accept a Pipeline as an estimator > > > Key: SPARK-12944 > URL: https://issues.apache.org/jira/browse/SPARK-12944 > Project: Spark > Issue Type: Bug > Components: ML, PySpark >Affects Versions: 1.6.0 > Environment: spark-1.6.0-bin-hadoop2.6 > Python 3.4.4 :: Anaconda 2.4.1 >Reporter: John Hogue >Priority: Minor > > Pipeline is supposed to act as an estimator which CrossValidator currently > throws error. > {code} > from pyspark.ml.evaluation import MulticlassClassificationEvaluator > from pyspark.ml.tuning import ParamGridBuilder > from pyspark.ml.tuning import CrossValidator > # Configure an ML pipeline, which consists of tree stages: tokenizer, > hashingTF, and nb. > tokenizer = Tokenizer(inputCol="text", outputCol="words") > hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") > nb = NaiveBayes() > pipeline = Pipeline(stages=[tokenizer, hashingTF, nb]) > paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0, 1]).build() > cv = CrossValidator(estimator=pipeline, > estimatorParamMaps=paramGrid, > evaluator=MulticlassClassificationEvaluator(), > numFolds=4) > cvModel = cv.fit(training_df) > {code} > Sample dataset can be found here: > https://github.com/dreyco676/nlp_spark/blob/master/data.zip > The file can be converted to a DataFrame with: > {code} > # Load precleaned training set > training_rdd = sc.textFile("data/clean_training.txt") > parts_rdd = training_rdd.map(lambda l: l.split("\t")) > # Filter bad rows out > garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3) > typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2]))) > # Create DataFrame > training_df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"]) > {code} > Running the pipeline throws the following stack trace: > {code} > ---Py4JJavaError > Traceback (most recent call > last) in () > 17 numFolds=4) > 18 > ---> 19 cvModel = cv.fit(training_df) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/tuning.py in > _fit(self, dataset) > 237 train = df.filter(~condition) > 238 for j in range(numModels): > --> 239 model = est.fit(train, epm[j]) > 240 # TODO: duplicate evaluator to take extra params from > input > 241 metric = eva.evaluate(model.transform(validation, > epm[j])) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 65 elif isinstance(params, dict): > 66 if params: > ---> 67 return self.copy(params)._fit(dataset) > 68 else: > 69 return self._fit(dataset) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > _fit(self, dataset) > 211 dataset = stage.transform(dataset) > 212 else: # must be an Estimator > --> 213 model = stage.fit(dataset) > 214 transformers.append(model) > 215 if i < indexOfLastEstimator: > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in > _fit(self, dataset) > 130 > 131 def _fit(self, dataset): > --> 132 java_model = self._fit_java(dataset) > 133 return self._create_model(java_model) > 134 > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in > _fit_java(self, dataset) > 126 :return: fitted Java model > 127
[jira] [Commented] (SPARK-12944) CrossValidator doesn't accept a Pipeline as an estimator
[ https://issues.apache.org/jira/browse/SPARK-12944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15178704#comment-15178704 ] Joseph K. Bradley commented on SPARK-12944: --- I'm going to close this JIRA since it's less a bug and more of a needed feature (automatically converting Param types when needed). > CrossValidator doesn't accept a Pipeline as an estimator > > > Key: SPARK-12944 > URL: https://issues.apache.org/jira/browse/SPARK-12944 > Project: Spark > Issue Type: Bug > Components: ML, PySpark >Affects Versions: 1.6.0 > Environment: spark-1.6.0-bin-hadoop2.6 > Python 3.4.4 :: Anaconda 2.4.1 >Reporter: John Hogue >Priority: Minor > > Pipeline is supposed to act as an estimator which CrossValidator currently > throws error. > {code} > from pyspark.ml.evaluation import MulticlassClassificationEvaluator > from pyspark.ml.tuning import ParamGridBuilder > from pyspark.ml.tuning import CrossValidator > # Configure an ML pipeline, which consists of tree stages: tokenizer, > hashingTF, and nb. > tokenizer = Tokenizer(inputCol="text", outputCol="words") > hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") > nb = NaiveBayes() > pipeline = Pipeline(stages=[tokenizer, hashingTF, nb]) > paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0, 1]).build() > cv = CrossValidator(estimator=pipeline, > estimatorParamMaps=paramGrid, > evaluator=MulticlassClassificationEvaluator(), > numFolds=4) > cvModel = cv.fit(training_df) > {code} > Sample dataset can be found here: > https://github.com/dreyco676/nlp_spark/blob/master/data.zip > The file can be converted to a DataFrame with: > {code} > # Load precleaned training set > training_rdd = sc.textFile("data/clean_training.txt") > parts_rdd = training_rdd.map(lambda l: l.split("\t")) > # Filter bad rows out > garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3) > typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2]))) > # Create DataFrame > training_df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"]) > {code} > Running the pipeline throws the following stack trace: > {code} > ---Py4JJavaError > Traceback (most recent call > last) in () > 17 numFolds=4) > 18 > ---> 19 cvModel = cv.fit(training_df) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/tuning.py in > _fit(self, dataset) > 237 train = df.filter(~condition) > 238 for j in range(numModels): > --> 239 model = est.fit(train, epm[j]) > 240 # TODO: duplicate evaluator to take extra params from > input > 241 metric = eva.evaluate(model.transform(validation, > epm[j])) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 65 elif isinstance(params, dict): > 66 if params: > ---> 67 return self.copy(params)._fit(dataset) > 68 else: > 69 return self._fit(dataset) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > _fit(self, dataset) > 211 dataset = stage.transform(dataset) > 212 else: # must be an Estimator > --> 213 model = stage.fit(dataset) > 214 transformers.append(model) > 215 if i < indexOfLastEstimator: > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in > _fit(self, dataset) > 130 > 131 def _fit(self, dataset): > --> 132 java_model = self._fit_java(dataset) > 133 return self._create_model(java_model) > 134 > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in > _fit_java(self, dataset) > 126
[jira] [Commented] (SPARK-12944) CrossValidator doesn't accept a Pipeline as an estimator
[ https://issues.apache.org/jira/browse/SPARK-12944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15122518#comment-15122518 ] Seth Hendrickson commented on SPARK-12944: -- Wanted to update - There are two parts to this problem: First, the current {{Params.copy}} method does not call {{_set}} and so does not use the {{expectedType}} mechanism that was added for parameter type safety in Python. Second, not all parameters currently provide an {{expectedType}}. The current plan is to add expected types for model-specific params [here|https://issues.apache.org/jira/browse/SPARK-13066]. Then in this JIRA we can update the {{copy}} method to set the parameter values properly, which makes the above problem go away. Please let me know if there are concerns with this plan. > CrossValidator doesn't accept a Pipeline as an estimator > > > Key: SPARK-12944 > URL: https://issues.apache.org/jira/browse/SPARK-12944 > Project: Spark > Issue Type: Bug > Components: ML, PySpark >Affects Versions: 1.6.0 > Environment: spark-1.6.0-bin-hadoop2.6 > Python 3.4.4 :: Anaconda 2.4.1 >Reporter: John Hogue >Priority: Minor > > Pipeline is supposed to act as an estimator which CrossValidator currently > throws error. > {code} > from pyspark.ml.evaluation import MulticlassClassificationEvaluator > from pyspark.ml.tuning import ParamGridBuilder > from pyspark.ml.tuning import CrossValidator > # Configure an ML pipeline, which consists of tree stages: tokenizer, > hashingTF, and nb. > tokenizer = Tokenizer(inputCol="text", outputCol="words") > hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") > nb = NaiveBayes() > pipeline = Pipeline(stages=[tokenizer, hashingTF, nb]) > paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0, 1]).build() > cv = CrossValidator(estimator=pipeline, > estimatorParamMaps=paramGrid, > evaluator=MulticlassClassificationEvaluator(), > numFolds=4) > cvModel = cv.fit(training_df) > {code} > Sample dataset can be found here: > https://github.com/dreyco676/nlp_spark/blob/master/data.zip > The file can be converted to a DataFrame with: > {code} > # Load precleaned training set > training_rdd = sc.textFile("data/clean_training.txt") > parts_rdd = training_rdd.map(lambda l: l.split("\t")) > # Filter bad rows out > garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3) > typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2]))) > # Create DataFrame > training_df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"]) > {code} > Running the pipeline throws the following stack trace: > {code} > ---Py4JJavaError > Traceback (most recent call > last) in () > 17 numFolds=4) > 18 > ---> 19 cvModel = cv.fit(training_df) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/tuning.py in > _fit(self, dataset) > 237 train = df.filter(~condition) > 238 for j in range(numModels): > --> 239 model = est.fit(train, epm[j]) > 240 # TODO: duplicate evaluator to take extra params from > input > 241 metric = eva.evaluate(model.transform(validation, > epm[j])) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 65 elif isinstance(params, dict): > 66 if params: > ---> 67 return self.copy(params)._fit(dataset) > 68 else: > 69 return self._fit(dataset) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > _fit(self, dataset) > 211 dataset = stage.transform(dataset) > 212 else: # must be an Estimator > --> 213 model = stage.fit(dataset) > 214 transformers.append(model) > 215 if i < indexOfLastEstimator: > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise
[jira] [Commented] (SPARK-12944) CrossValidator doesn't accept a Pipeline as an estimator
[ https://issues.apache.org/jira/browse/SPARK-12944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15119976#comment-15119976 ] Seth Hendrickson commented on SPARK-12944: -- Based on what I have seen, I think the fix is more involved... could you elaborate on your proposed solution? We could "fix" it in addGrid by checking that all elements of {{values}} are instances of {{param.expectedType}}, but I don't think this really gets at the root of the problem. First, this would still throw an error with ints that are expected to be floats, which is unsatisfactory since [PR 9581|https://github.com/apache/spark/pull/9581] adds a type conversion mechanism specifically for this purpose. Second, the real problem is that params passed through the {{Estimator.fit}} and {{Transformer.transform}} methods use the {{Params.copy}} method to circumvent the set methods and type checking entirely! By modifying the {{_paramMap}} dictionary directly, the copy method allows ml pipeline elements to contain params that have no context (like a HashingTF containing a Naive Bayes smoothing parameter in its param map as in the example above). I think the correct way to do this is to change the {{copy}} method to call the {{_set}} function instead of directly modifying {{_paramMap}}. That way we ensure that the {{_paramMap}} only contains parameters that belong to that class and that type checking is performed. I am working on a PR with the method I described, but I am open to feedback. Appreciate any thoughts on this. > CrossValidator doesn't accept a Pipeline as an estimator > > > Key: SPARK-12944 > URL: https://issues.apache.org/jira/browse/SPARK-12944 > Project: Spark > Issue Type: Bug > Components: ML, PySpark >Affects Versions: 1.6.0 > Environment: spark-1.6.0-bin-hadoop2.6 > Python 3.4.4 :: Anaconda 2.4.1 >Reporter: John Hogue >Priority: Minor > > Pipeline is supposed to act as an estimator which CrossValidator currently > throws error. > {code} > from pyspark.ml.evaluation import MulticlassClassificationEvaluator > from pyspark.ml.tuning import ParamGridBuilder > from pyspark.ml.tuning import CrossValidator > # Configure an ML pipeline, which consists of tree stages: tokenizer, > hashingTF, and nb. > tokenizer = Tokenizer(inputCol="text", outputCol="words") > hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") > nb = NaiveBayes() > pipeline = Pipeline(stages=[tokenizer, hashingTF, nb]) > paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0, 1]).build() > cv = CrossValidator(estimator=pipeline, > estimatorParamMaps=paramGrid, > evaluator=MulticlassClassificationEvaluator(), > numFolds=4) > cvModel = cv.fit(training_df) > {code} > Sample dataset can be found here: > https://github.com/dreyco676/nlp_spark/blob/master/data.zip > The file can be converted to a DataFrame with: > {code} > # Load precleaned training set > training_rdd = sc.textFile("data/clean_training.txt") > parts_rdd = training_rdd.map(lambda l: l.split("\t")) > # Filter bad rows out > garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3) > typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2]))) > # Create DataFrame > training_df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"]) > {code} > Running the pipeline throws the following stack trace: > {code} > ---Py4JJavaError > Traceback (most recent call > last) in () > 17 numFolds=4) > 18 > ---> 19 cvModel = cv.fit(training_df) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/tuning.py in > _fit(self, dataset) > 237 train = df.filter(~condition) > 238 for j in range(numModels): > --> 239 model = est.fit(train, epm[j]) > 240 # TODO: duplicate evaluator to take extra params from > input > 241 metric = eva.evaluate(model.transform(validation, > epm[j])) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 65 elif isinstance(params, dict): > 66 if params: > ---> 67 return self.copy(params)._fit(dataset) > 68 else: >
[jira] [Commented] (SPARK-12944) CrossValidator doesn't accept a Pipeline as an estimator
[ https://issues.apache.org/jira/browse/SPARK-12944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15117905#comment-15117905 ] Joseph K. Bradley commented on SPARK-12944: --- Please do; it looks like a simple fix in addGrid. thanks! > CrossValidator doesn't accept a Pipeline as an estimator > > > Key: SPARK-12944 > URL: https://issues.apache.org/jira/browse/SPARK-12944 > Project: Spark > Issue Type: Bug > Components: ML, PySpark >Affects Versions: 1.6.0 > Environment: spark-1.6.0-bin-hadoop2.6 > Python 3.4.4 :: Anaconda 2.4.1 >Reporter: John Hogue >Priority: Minor > > Pipeline is supposed to act as an estimator which CrossValidator currently > throws error. > {code} > from pyspark.ml.evaluation import MulticlassClassificationEvaluator > from pyspark.ml.tuning import ParamGridBuilder > from pyspark.ml.tuning import CrossValidator > # Configure an ML pipeline, which consists of tree stages: tokenizer, > hashingTF, and nb. > tokenizer = Tokenizer(inputCol="text", outputCol="words") > hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") > nb = NaiveBayes() > pipeline = Pipeline(stages=[tokenizer, hashingTF, nb]) > paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0, 1]).build() > cv = CrossValidator(estimator=pipeline, > estimatorParamMaps=paramGrid, > evaluator=MulticlassClassificationEvaluator(), > numFolds=4) > cvModel = cv.fit(training_df) > {code} > Sample dataset can be found here: > https://github.com/dreyco676/nlp_spark/blob/master/data.zip > The file can be converted to a DataFrame with: > {code} > # Load precleaned training set > training_rdd = sc.textFile("data/clean_training.txt") > parts_rdd = training_rdd.map(lambda l: l.split("\t")) > # Filter bad rows out > garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3) > typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2]))) > # Create DataFrame > training_df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"]) > {code} > Running the pipeline throws the following stack trace: > {code} > ---Py4JJavaError > Traceback (most recent call > last) in () > 17 numFolds=4) > 18 > ---> 19 cvModel = cv.fit(training_df) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/tuning.py in > _fit(self, dataset) > 237 train = df.filter(~condition) > 238 for j in range(numModels): > --> 239 model = est.fit(train, epm[j]) > 240 # TODO: duplicate evaluator to take extra params from > input > 241 metric = eva.evaluate(model.transform(validation, > epm[j])) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 65 elif isinstance(params, dict): > 66 if params: > ---> 67 return self.copy(params)._fit(dataset) > 68 else: > 69 return self._fit(dataset) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > _fit(self, dataset) > 211 dataset = stage.transform(dataset) > 212 else: # must be an Estimator > --> 213 model = stage.fit(dataset) > 214 transformers.append(model) > 215 if i < indexOfLastEstimator: > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in > _fit(self, dataset) > 130 > 131 def _fit(self, dataset): > --> 132 java_model = self._fit_java(dataset) > 133 return self._create_model(java_model) > 134 > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in > _fit_java(self, dataset) > 126 :return: fitted Java model > 127 """ > --> 128
[jira] [Commented] (SPARK-12944) CrossValidator doesn't accept a Pipeline as an estimator
[ https://issues.apache.org/jira/browse/SPARK-12944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15116286#comment-15116286 ] Seth Hendrickson commented on SPARK-12944: -- This issue was partly addressed by [SPARK-7675|https://issues.apache.org/jira/browse/SPARK-7675], but persists because the {{_set}} is bypassed when passing parameter maps to each cross validation model. I'd like to work on this if no one else is. > CrossValidator doesn't accept a Pipeline as an estimator > > > Key: SPARK-12944 > URL: https://issues.apache.org/jira/browse/SPARK-12944 > Project: Spark > Issue Type: Bug > Components: ML, PySpark >Affects Versions: 1.6.0 > Environment: spark-1.6.0-bin-hadoop2.6 > Python 3.4.4 :: Anaconda 2.4.1 >Reporter: John Hogue >Priority: Minor > > Pipeline is supposed to act as an estimator which CrossValidator currently > throws error. > {code} > from pyspark.ml.evaluation import MulticlassClassificationEvaluator > from pyspark.ml.tuning import ParamGridBuilder > from pyspark.ml.tuning import CrossValidator > # Configure an ML pipeline, which consists of tree stages: tokenizer, > hashingTF, and nb. > tokenizer = Tokenizer(inputCol="text", outputCol="words") > hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") > nb = NaiveBayes() > pipeline = Pipeline(stages=[tokenizer, hashingTF, nb]) > paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0, 1]).build() > cv = CrossValidator(estimator=pipeline, > estimatorParamMaps=paramGrid, > evaluator=MulticlassClassificationEvaluator(), > numFolds=4) > cvModel = cv.fit(training_df) > {code} > Sample dataset can be found here: > https://github.com/dreyco676/nlp_spark/blob/master/data.zip > The file can be converted to a DataFrame with: > {code} > # Load precleaned training set > training_rdd = sc.textFile("data/clean_training.txt") > parts_rdd = training_rdd.map(lambda l: l.split("\t")) > # Filter bad rows out > garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3) > typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2]))) > # Create DataFrame > training_df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"]) > {code} > Running the pipeline throws the following stack trace: > {code} > ---Py4JJavaError > Traceback (most recent call > last) in () > 17 numFolds=4) > 18 > ---> 19 cvModel = cv.fit(training_df) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/tuning.py in > _fit(self, dataset) > 237 train = df.filter(~condition) > 238 for j in range(numModels): > --> 239 model = est.fit(train, epm[j]) > 240 # TODO: duplicate evaluator to take extra params from > input > 241 metric = eva.evaluate(model.transform(validation, > epm[j])) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 65 elif isinstance(params, dict): > 66 if params: > ---> 67 return self.copy(params)._fit(dataset) > 68 else: > 69 return self._fit(dataset) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > _fit(self, dataset) > 211 dataset = stage.transform(dataset) > 212 else: # must be an Estimator > --> 213 model = stage.fit(dataset) > 214 transformers.append(model) > 215 if i < indexOfLastEstimator: > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in > _fit(self, dataset) > 130 > 131 def _fit(self, dataset): > --> 132 java_model = self._fit_java(dataset) > 133 return self._create_model(java_model) > 134 >
[jira] [Commented] (SPARK-12944) CrossValidator doesn't accept a Pipeline as an estimator
[ https://issues.apache.org/jira/browse/SPARK-12944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15109825#comment-15109825 ] John Hogue commented on SPARK-12944: Issue stems from {code} paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 1.0]).build() {code} Java will not cast Integers to Doubles and therefore it will fail. Either documentation should be updated or casting of values should be done by Python. > CrossValidator doesn't accept a Pipeline as an estimator > > > Key: SPARK-12944 > URL: https://issues.apache.org/jira/browse/SPARK-12944 > Project: Spark > Issue Type: Bug > Components: ML, PySpark >Affects Versions: 1.6.0 > Environment: spark-1.6.0-bin-hadoop2.6 > Python 3.4.4 :: Anaconda 2.4.1 >Reporter: John Hogue >Priority: Minor > > Pipeline is supposed to act as an estimator which CrossValidator currently > throws error. > {code} > from pyspark.ml.evaluation import MulticlassClassificationEvaluator > from pyspark.ml.tuning import ParamGridBuilder > from pyspark.ml.tuning import CrossValidator > # Configure an ML pipeline, which consists of tree stages: tokenizer, > hashingTF, and nb. > tokenizer = Tokenizer(inputCol="text", outputCol="words") > hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") > nb = NaiveBayes() > pipeline = Pipeline(stages=[tokenizer, hashingTF, nb]) > paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0, 1]).build() > cv = CrossValidator(estimator=pipeline, > estimatorParamMaps=paramGrid, > evaluator=MulticlassClassificationEvaluator(), > numFolds=4) > cvModel = cv.fit(training_df) > {code} > Sample dataset can be found here: > https://github.com/dreyco676/nlp_spark/blob/master/data.zip > The file can be converted to a DataFrame with: > {code} > # Load precleaned training set > training_rdd = sc.textFile("data/clean_training.txt") > parts_rdd = training_rdd.map(lambda l: l.split("\t")) > # Filter bad rows out > garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3) > typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2]))) > # Create DataFrame > training_df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"]) > {code} > Running the pipeline throws the following stack trace: > {code} > ---Py4JJavaError > Traceback (most recent call > last) in () > 17 numFolds=4) > 18 > ---> 19 cvModel = cv.fit(training_df) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/tuning.py in > _fit(self, dataset) > 237 train = df.filter(~condition) > 238 for j in range(numModels): > --> 239 model = est.fit(train, epm[j]) > 240 # TODO: duplicate evaluator to take extra params from > input > 241 metric = eva.evaluate(model.transform(validation, > epm[j])) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 65 elif isinstance(params, dict): > 66 if params: > ---> 67 return self.copy(params)._fit(dataset) > 68 else: > 69 return self._fit(dataset) > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > _fit(self, dataset) > 211 dataset = stage.transform(dataset) > 212 else: # must be an Estimator > --> 213 model = stage.fit(dataset) > 214 transformers.append(model) > 215 if i < indexOfLastEstimator: > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in > fit(self, dataset, params) > 67 return self.copy(params)._fit(dataset) > 68 else: > ---> 69 return self._fit(dataset) > 70 else: > 71 raise ValueError("Params must be either a param map or a > list/tuple of param maps, " > /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in > _fit(self, dataset) > 130 > 131 def _fit(self, dataset): > --> 132 java_model = self._fit_java(dataset) > 133 return self._create_model(java_model) > 134 >