[jira] [Commented] (SPARK-21542) Helper functions for custom Python Persistence
[ https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681895#comment-16681895 ] John Bauer commented on SPARK-21542: This is a) much more minimal, b) genuinely useful, and c) actually works with save and load, for example: impute.write().save("impute") imp = ImputeNormal.load("impute") imp.explainParams() impute_model.write().save("impute_model") impm = ImputeNormalModel.load("imputer_model") impm = ImputeNormalModel.load("impute_model") impm.getInputCol() impm.getOutputCol() impm.getMean() impm.getStddev() > Helper functions for custom Python Persistence > -- > > Key: SPARK-21542 > URL: https://issues.apache.org/jira/browse/SPARK-21542 > Project: Spark > Issue Type: New Feature > Components: ML, PySpark >Affects Versions: 2.2.0 >Reporter: Ajay Saini >Assignee: Ajay Saini >Priority: Major > Fix For: 2.3.0 > > > Currently, there is no way to easily persist Json-serializable parameters in > Python only. All parameters in Python are persisted by converting them to > Java objects and using the Java persistence implementation. In order to > facilitate the creation of custom Python-only pipeline stages, it would be > good to have a Python-only persistence framework so that these stages do not > need to be implemented in Scala for persistence. > This task involves: > - Adding implementations for DefaultParamsReadable, DefaultParamsWriteable, > DefaultParamsReader, and DefaultParamsWriter in pyspark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21542) Helper functions for custom Python Persistence
[ https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681891#comment-16681891 ] John Bauer commented on SPARK-21542: {code} from pyspark.sql import SparkSession from pyspark.sql.functions import col, when, randn from pyspark import keyword_only from pyspark.ml import Estimator, Model #from pyspark.ml.feature import SQLTransformer from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable from pyspark.ml.param import Param, Params, TypeConverters from pyspark.ml.param.shared import HasInputCol, HasOutputCol spark = SparkSession\ .builder\ .appName("ImputeNormal")\ .getOrCreate() class ImputeNormal(Estimator, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable, ): @keyword_only def __init__(self, inputCol="inputCol", outputCol="outputCol"): super(ImputeNormal, self).__init__() self._setDefault(inputCol="inputCol", outputCol="outputCol") kwargs = self._input_kwargs self.setParams(**kwargs) @keyword_only def setParams(self, inputCol="inputCol", outputCol="outputCol"): """ setParams(self, inputCol="inputCol", outputCol="outputCol") """ kwargs = self._input_kwargs self._set(**kwargs) return self def _fit(self, data): inputCol = self.getInputCol() outputCol = self.getOutputCol() stats = data.select(inputCol).describe() mean = stats.where(col("summary") == "mean").take(1)[0][inputCol] stddev = stats.where(col("summary") == "stddev").take(1)[0][inputCol] return ImputeNormalModel(mean=float(mean), stddev=float(stddev), inputCol=inputCol, outputCol=outputCol, ) # FOR A TRULY MINIMAL BUT LESS DIDACTICALLY EFFECTIVE DEMO, DO INSTEAD: #sql_text = "SELECT *, IF({inputCol} IS NULL, {stddev} * randn() + {mean}, {inputCol}) AS {outputCol} FROM __THIS__" # #return SQLTransformer(statement=sql_text.format(stddev=stddev, mean=mean, inputCol=inputCol, outputCol=outputCol)) class ImputeNormalModel(Model, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable, ): mean = Param(Params._dummy(), "mean", "Mean value of imputations. Calculated by fit method.", typeConverter=TypeConverters.toFloat) stddev = Param(Params._dummy(), "stddev", "Standard deviation of imputations. Calculated by fit method.", typeConverter=TypeConverters.toFloat) @keyword_only def __init__(self, mean=0.0, stddev=1.0, inputCol="inputCol", outputCol="outputCol"): super(ImputeNormalModel, self).__init__() self._setDefault(mean=0.0, stddev=1.0, inputCol="inputCol", outputCol="outputCol") kwargs = self._input_kwargs self.setParams(**kwargs) @keyword_only def setParams(self, mean=0.0, stddev=1.0, inputCol="inputCol", outputCol="outputCol"): """ setParams(self, mean=0.0, stddev=1.0, inputCol="inputCol", outputCol="outputCol") """ kwargs = self._input_kwargs self._set(**kwargs) return self def getMean(self): return self.getOrDefault(self.mean) def setMean(self, mean): self._set(mean=mean) def getStddev(self): return self.getOrDefault(self.stddev) def setStddev(self, stddev): self._set(stddev=stddev) def _transform(self, data): mean = self.getMean() stddev = self.getStddev() inputCol = self.getInputCol() outputCol = self.getOutputCol() df = data.withColumn(outputCol, when(col(inputCol).isNull(), stddev * randn() + mean).\ otherwise(col(inputCol))) return df if __name__ == "__main__": train = spark.createDataFrame([[0],[1],[2]] + [[None]]*100,['input']) impute = ImputeNormal(inputCol='input', outputCol='output') impute_model = impute.fit(train) print("Input column: {}".format(impute_model.getInputCol())) print("Output column: {}".format(impute_model.getOutputCol())) print("Mean: {}".format(impute_model.getMean())) print("Standard Deviation: {}".format(impute_model.getStddev())) test = impute_model.transform(train) test.show(10) test.describe().show() print("mean and stddev for outputCol should be close to those of inputCol"){code} > Helper functions for custom Python Persistence > ---
[jira] [Commented] (SPARK-21542) Helper functions for custom Python Persistence
[ https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16634679#comment-16634679 ] John Bauer commented on SPARK-21542: The above is not as minimal as I would have liked... It is based on the unit tests associated with the fix referenced for DefaultParamsReadable, DefaultParamsWritable which I thought would test the desired behavior, i.e. save and load a pipeline after calling fit(). Unfortunately this was not tested, so I flailed at the code for a while until I got something that worked. A lot of stuff left over from setting up unit tests could probably be removed. But at least this seems to work.. > Helper functions for custom Python Persistence > -- > > Key: SPARK-21542 > URL: https://issues.apache.org/jira/browse/SPARK-21542 > Project: Spark > Issue Type: New Feature > Components: ML, PySpark >Affects Versions: 2.2.0 >Reporter: Ajay Saini >Assignee: Ajay Saini >Priority: Major > Fix For: 2.3.0 > > > Currently, there is no way to easily persist Json-serializable parameters in > Python only. All parameters in Python are persisted by converting them to > Java objects and using the Java persistence implementation. In order to > facilitate the creation of custom Python-only pipeline stages, it would be > good to have a Python-only persistence framework so that these stages do not > need to be implemented in Scala for persistence. > This task involves: > - Adding implementations for DefaultParamsReadable, DefaultParamsWriteable, > DefaultParamsReader, and DefaultParamsWriter in pyspark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21542) Helper functions for custom Python Persistence
[ https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16634677#comment-16634677 ] John Bauer commented on SPARK-21542: {code:python} #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Thu Sep 27 10:25:10 2018 @author: JohnBauer """ from pyspark.sql import DataFrame, Row from pyspark.sql import SQLContext from pyspark.sql import SparkSession from pyspark.sql.functions import lit from pyspark.sql.functions import udf from pyspark import keyword_only, SparkContext from pyspark.ml import Estimator, Model, Pipeline, PipelineModel, Transformer, UnaryTransformer from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable #from pyspark.ml.util import * from pyspark.ml.param import Param, Params, TypeConverters from pyspark.ml.param.shared import HasInputCol, HasOutputCol from pyspark.sql.types import FloatType, DoubleType #, LongType, ArrayType, StringType, StructType, StructField spark = SparkSession\ .builder\ .appName("Minimal_1")\ .getOrCreate() data_path = "/Users/JohnBauer/spark/data/mllib" # Load training data data = spark.read.format("libsvm").load("{}/sample_libsvm_data.txt".format(data_path)) train, test = data.randomSplit([0.7, 0.3]) train.show(5) class MockDataset(DataFrame): def __init__(self): self.index = 0 class HasFake(Params): def __init__(self): super(HasFake, self).__init__() self.fake = Param(self, "fake", "fake param") def getFake(self): return self.getOrDefault(self.fake) class MockTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable, HasFake): def __init__(self): super(MockTransformer, self).__init__() self.dataset_index = None def _transform(self, dataset): self.dataset_index = dataset.index dataset.index += 1 return dataset class MockUnaryTransformer(UnaryTransformer, DefaultParamsReadable, DefaultParamsWritable,): #HasInputCol): shift = Param(Params._dummy(), "shift", "The amount by which to shift " + "data in a DataFrame", typeConverter=TypeConverters.toFloat) inputCol = Param(Params._dummy(), "inputCol", "column of DataFrame to transform", typeConverter=TypeConverters.toString) outputCol = Param(Params._dummy(), "outputCol", "name of transformed column " + "to be added to DataFrame", typeConverter=TypeConverters.toString) @keyword_only def __init__(self, shiftVal=1, inputCol="features", outputCol="outputCol"): #, inputCol='features'): super(MockUnaryTransformer, self).__init__() self._setDefault(shift=1) self._set(shift=shiftVal) self._setDefault(inputCol=inputCol) self._setDefault(outputCol=outputCol) def getShift(self): return self.getOrDefault(self.shift) def setShift(self, shift): self._set(shift=shift) def createTransformFunc(self): shiftVal = self.getShift() return lambda x: x + shiftVal def outputDataType(self): return DoubleType() def validateInputType(self, inputType): if inputType != DoubleType(): print("input type: {}".format(inputType)) return #raise TypeError("Bad input type: {}. ".format(inputType) + #"Requires Double.") def _transform(self, dataset): shift = self.getOrDefault("shift") def f(v): return v + shift t = FloatType() out_col = self.getOutputCol() in_col = dataset[self.getInputCol()] return dataset.withColumn(out_col, udf(f, t)(in_col)) class MockEstimator(Estimator, DefaultParamsReadable, DefaultParamsWritable, HasFake): def __init__(self): super(MockEstimator, self).__init__() self.dataset_index = None def _fit(self, dataset): self.dataset_index = dataset.index model = MockModel() self._copyValues(model) return model class MockModel(MockTransformer, Model, HasFake): pass #class PipelineTests(PySparkTestCase): class PipelineTests(object): def test_pipeline(self, data=None): #dataset = MockDataset() dataset = MockDataset() if data is None else data estimator0 = MockEstimator() transformer1 = MockTransformer() estimator2 = MockEstimator() transformer3 = MockTransformer() transformer4 = MockUnaryTransformer(inputCol="label", outputCol="shifted_label") pipeline = Pipeline(stages=[estimator0, transformer1, estimator2, transformer3, transformer4]) pipeline_model = pipeline.fit(dataset,
[jira] [Commented] (SPARK-21542) Helper functions for custom Python Persistence
[ https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611733#comment-16611733 ] Peter Knight commented on SPARK-21542: -- Thanks for the replay [~JohnHBauer]. Yes I am using @keyword_only decorator exactly like in the stack overflow example you cite. I'll be interested to see your code if you get it working. Thanks. > Helper functions for custom Python Persistence > -- > > Key: SPARK-21542 > URL: https://issues.apache.org/jira/browse/SPARK-21542 > Project: Spark > Issue Type: New Feature > Components: ML, PySpark >Affects Versions: 2.2.0 >Reporter: Ajay Saini >Assignee: Ajay Saini >Priority: Major > Fix For: 2.3.0 > > > Currently, there is no way to easily persist Json-serializable parameters in > Python only. All parameters in Python are persisted by converting them to > Java objects and using the Java persistence implementation. In order to > facilitate the creation of custom Python-only pipeline stages, it would be > good to have a Python-only persistence framework so that these stages do not > need to be implemented in Scala for persistence. > This task involves: > - Adding implementations for DefaultParamsReadable, DefaultParamsWriteable, > DefaultParamsReader, and DefaultParamsWriter in pyspark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21542) Helper functions for custom Python Persistence
[ https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611085#comment-16611085 ] John Bauer commented on SPARK-21542: You don't show your code for __init__ or setParams. I recall getting this error before using the @keyword_only decorator, for example see https://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml I will be trying to get my custom transformer pipeline to persist sometime next week I hope. If I succeed, I will try to provide an example if no one else has. > Helper functions for custom Python Persistence > -- > > Key: SPARK-21542 > URL: https://issues.apache.org/jira/browse/SPARK-21542 > Project: Spark > Issue Type: New Feature > Components: ML, PySpark >Affects Versions: 2.2.0 >Reporter: Ajay Saini >Assignee: Ajay Saini >Priority: Major > Fix For: 2.3.0 > > > Currently, there is no way to easily persist Json-serializable parameters in > Python only. All parameters in Python are persisted by converting them to > Java objects and using the Java persistence implementation. In order to > facilitate the creation of custom Python-only pipeline stages, it would be > good to have a Python-only persistence framework so that these stages do not > need to be implemented in Scala for persistence. > This task involves: > - Adding implementations for DefaultParamsReadable, DefaultParamsWriteable, > DefaultParamsReader, and DefaultParamsWriter in pyspark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21542) Helper functions for custom Python Persistence
[ https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16609495#comment-16609495 ] Peter Knight commented on SPARK-21542: -- It would be really helpful to have some example code on how to use these. I have tried: {code} from pyspark.ml import Transformer from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable class MedianTrend(Transformer, DefaultParamsReadable, DefaultParamsWritable): # code here to define Params and transform # instantiate it mt1 = MedianTrend(inputColList = ["v1"], outputColList = ["v1_trend_no_reset"], sortCol = "date") # then save andit path1 = "test_MedianTrend" mt1.write().overwrite().save(path1) # then load it mt1_loaded = mt1.load(path1) df2 = mt1_loaded.transform(df) df2.show() {code} This gives the following error: {noformat} 'module' object has no attribute 'MedianTrend'{noformat} > Helper functions for custom Python Persistence > -- > > Key: SPARK-21542 > URL: https://issues.apache.org/jira/browse/SPARK-21542 > Project: Spark > Issue Type: New Feature > Components: ML, PySpark >Affects Versions: 2.2.0 >Reporter: Ajay Saini >Assignee: Ajay Saini >Priority: Major > Fix For: 2.3.0 > > > Currently, there is no way to easily persist Json-serializable parameters in > Python only. All parameters in Python are persisted by converting them to > Java objects and using the Java persistence implementation. In order to > facilitate the creation of custom Python-only pipeline stages, it would be > good to have a Python-only persistence framework so that these stages do not > need to be implemented in Scala for persistence. > This task involves: > - Adding implementations for DefaultParamsReadable, DefaultParamsWriteable, > DefaultParamsReader, and DefaultParamsWriter in pyspark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org