[jira] [Commented] (SPARK-21542) Helper functions for custom Python Persistence

2018-11-09 Thread John Bauer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681895#comment-16681895
 ] 

John Bauer commented on SPARK-21542:


This is a) much more minimal, b) genuinely useful, and c) actually works with 
save and load, for example:

impute.write().save("impute")
imp = ImputeNormal.load("impute")
imp.explainParams()
impute_model.write().save("impute_model")
impm = ImputeNormalModel.load("imputer_model")
impm = ImputeNormalModel.load("impute_model")
impm.getInputCol()
impm.getOutputCol()
impm.getMean()
impm.getStddev()

> Helper functions for custom Python Persistence
> --
>
> Key: SPARK-21542
> URL: https://issues.apache.org/jira/browse/SPARK-21542
> Project: Spark
>  Issue Type: New Feature
>  Components: ML, PySpark
>Affects Versions: 2.2.0
>Reporter: Ajay Saini
>Assignee: Ajay Saini
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently, there is no way to easily persist Json-serializable parameters in 
> Python only. All parameters in Python are persisted by converting them to 
> Java objects and using the Java persistence implementation. In order to 
> facilitate the creation of custom Python-only pipeline stages, it would be 
> good to have a Python-only persistence framework so that these stages do not 
> need to be implemented in Scala for persistence. 
> This task involves:
> - Adding implementations for DefaultParamsReadable, DefaultParamsWriteable, 
> DefaultParamsReader, and DefaultParamsWriter in pyspark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21542) Helper functions for custom Python Persistence

2018-11-09 Thread John Bauer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681891#comment-16681891
 ] 

John Bauer commented on SPARK-21542:


{code}
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, randn

from pyspark import keyword_only
from pyspark.ml import Estimator, Model
#from pyspark.ml.feature import SQLTransformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.ml.param.shared import HasInputCol, HasOutputCol

spark = SparkSession\
.builder\
.appName("ImputeNormal")\
.getOrCreate()

class ImputeNormal(Estimator,
   HasInputCol,
   HasOutputCol,
   DefaultParamsReadable,
   DefaultParamsWritable,
   ):
@keyword_only
def __init__(self, inputCol="inputCol", outputCol="outputCol"):
super(ImputeNormal, self).__init__()

self._setDefault(inputCol="inputCol", outputCol="outputCol")
kwargs = self._input_kwargs
self.setParams(**kwargs)

@keyword_only
def setParams(self, inputCol="inputCol", outputCol="outputCol"):
"""
setParams(self, inputCol="inputCol", outputCol="outputCol")
"""
kwargs = self._input_kwargs
self._set(**kwargs)
return self

def _fit(self, data):
inputCol = self.getInputCol()
outputCol = self.getOutputCol()

stats = data.select(inputCol).describe()
mean = stats.where(col("summary") == "mean").take(1)[0][inputCol]
stddev = stats.where(col("summary") == "stddev").take(1)[0][inputCol]

return ImputeNormalModel(mean=float(mean),
 stddev=float(stddev),
 inputCol=inputCol,
 outputCol=outputCol,
 )
# FOR A TRULY MINIMAL BUT LESS DIDACTICALLY EFFECTIVE DEMO, DO INSTEAD:
#sql_text = "SELECT *, IF({inputCol} IS NULL, {stddev} * randn() + 
{mean}, {inputCol}) AS {outputCol} FROM __THIS__"
#
#return SQLTransformer(statement=sql_text.format(stddev=stddev, 
mean=mean, inputCol=inputCol, outputCol=outputCol))
   
class ImputeNormalModel(Model,
HasInputCol,
HasOutputCol,
DefaultParamsReadable,
DefaultParamsWritable,
):

mean = Param(Params._dummy(), "mean", "Mean value of imputations. 
Calculated by fit method.",
  typeConverter=TypeConverters.toFloat)

stddev = Param(Params._dummy(), "stddev", "Standard deviation of 
imputations. Calculated by fit method.",
  typeConverter=TypeConverters.toFloat)


@keyword_only
def __init__(self, mean=0.0, stddev=1.0, inputCol="inputCol", 
outputCol="outputCol"):
super(ImputeNormalModel, self).__init__()

self._setDefault(mean=0.0, stddev=1.0, inputCol="inputCol", 
outputCol="outputCol")
kwargs = self._input_kwargs
self.setParams(**kwargs)

@keyword_only
def setParams(self, mean=0.0, stddev=1.0, inputCol="inputCol", 
outputCol="outputCol"):
"""
setParams(self, mean=0.0, stddev=1.0, inputCol="inputCol", 
outputCol="outputCol")
"""
kwargs = self._input_kwargs
self._set(**kwargs)
return self

def getMean(self):
return self.getOrDefault(self.mean)

def setMean(self, mean):
self._set(mean=mean)

def getStddev(self):
return self.getOrDefault(self.stddev)

def setStddev(self, stddev):
self._set(stddev=stddev)

def _transform(self, data):
mean = self.getMean()
stddev = self.getStddev()
inputCol = self.getInputCol()
outputCol = self.getOutputCol()

df = data.withColumn(outputCol,
 when(col(inputCol).isNull(),
  stddev * randn() + mean).\
  otherwise(col(inputCol)))
return df

if __name__ == "__main__":

train = spark.createDataFrame([[0],[1],[2]] + [[None]]*100,['input'])
impute = ImputeNormal(inputCol='input', outputCol='output')
impute_model = impute.fit(train)
print("Input column: {}".format(impute_model.getInputCol()))
print("Output column: {}".format(impute_model.getOutputCol()))
print("Mean: {}".format(impute_model.getMean()))
print("Standard Deviation: {}".format(impute_model.getStddev()))
test = impute_model.transform(train)
test.show(10)
test.describe().show()
print("mean and stddev for outputCol should be close to those of 
inputCol"){code}
 

> Helper functions for custom Python Persistence
> ---

[jira] [Commented] (SPARK-21542) Helper functions for custom Python Persistence

2018-10-01 Thread John Bauer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16634679#comment-16634679
 ] 

John Bauer commented on SPARK-21542:


The above is not as minimal as I would have liked... It is based on the unit 
tests associated with the fix referenced for DefaultParamsReadable, 
DefaultParamsWritable which I thought would test the desired behavior, i.e. 
save and load a pipeline after calling fit().  Unfortunately this was not 
tested, so I flailed at the code for a while until I got something that worked. 
 A lot of stuff left over from setting up unit tests could probably be removed. 
 But at least this seems to work..

> Helper functions for custom Python Persistence
> --
>
> Key: SPARK-21542
> URL: https://issues.apache.org/jira/browse/SPARK-21542
> Project: Spark
>  Issue Type: New Feature
>  Components: ML, PySpark
>Affects Versions: 2.2.0
>Reporter: Ajay Saini
>Assignee: Ajay Saini
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently, there is no way to easily persist Json-serializable parameters in 
> Python only. All parameters in Python are persisted by converting them to 
> Java objects and using the Java persistence implementation. In order to 
> facilitate the creation of custom Python-only pipeline stages, it would be 
> good to have a Python-only persistence framework so that these stages do not 
> need to be implemented in Scala for persistence. 
> This task involves:
> - Adding implementations for DefaultParamsReadable, DefaultParamsWriteable, 
> DefaultParamsReader, and DefaultParamsWriter in pyspark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21542) Helper functions for custom Python Persistence

2018-10-01 Thread John Bauer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16634677#comment-16634677
 ] 

John Bauer commented on SPARK-21542:



{code:python}
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Sep 27 10:25:10 2018

@author: JohnBauer
"""
from pyspark.sql import DataFrame, Row
from pyspark.sql import SQLContext 
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf

from pyspark import keyword_only, SparkContext
from pyspark.ml import Estimator, Model, Pipeline, PipelineModel, Transformer, 
UnaryTransformer

from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
#from pyspark.ml.util import *
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.sql.types import FloatType, DoubleType 
#, LongType, ArrayType, StringType, StructType, StructField

spark = SparkSession\
.builder\
.appName("Minimal_1")\
.getOrCreate()

data_path = "/Users/JohnBauer/spark/data/mllib"
# Load training data
data = 
spark.read.format("libsvm").load("{}/sample_libsvm_data.txt".format(data_path))
train, test = data.randomSplit([0.7, 0.3])
train.show(5)


class MockDataset(DataFrame):

def __init__(self):
self.index = 0


class HasFake(Params):

def __init__(self):
super(HasFake, self).__init__()
self.fake = Param(self, "fake", "fake param")

def getFake(self):
return self.getOrDefault(self.fake)


class MockTransformer(Transformer, DefaultParamsReadable, 
DefaultParamsWritable, HasFake):

def __init__(self):
super(MockTransformer, self).__init__()
self.dataset_index = None

def _transform(self, dataset):
self.dataset_index = dataset.index
dataset.index += 1
return dataset


class MockUnaryTransformer(UnaryTransformer,
   DefaultParamsReadable,
   DefaultParamsWritable,):
   #HasInputCol):

shift = Param(Params._dummy(), "shift", "The amount by which to shift " +
  "data in a DataFrame",
  typeConverter=TypeConverters.toFloat)

inputCol = Param(Params._dummy(), "inputCol", "column of DataFrame to 
transform",
  typeConverter=TypeConverters.toString)

outputCol = Param(Params._dummy(), "outputCol", "name of transformed column 
" +
  "to be added to DataFrame",
  typeConverter=TypeConverters.toString)

@keyword_only
def __init__(self, shiftVal=1, inputCol="features", outputCol="outputCol"): 
#, inputCol='features'):
super(MockUnaryTransformer, self).__init__()
self._setDefault(shift=1)
self._set(shift=shiftVal)
self._setDefault(inputCol=inputCol)
self._setDefault(outputCol=outputCol)

def getShift(self):
return self.getOrDefault(self.shift)

def setShift(self, shift):
self._set(shift=shift)

def createTransformFunc(self):
shiftVal = self.getShift()
return lambda x: x + shiftVal

def outputDataType(self):
return DoubleType()

def validateInputType(self, inputType):
if inputType != DoubleType():
print("input type: {}".format(inputType))
return
#raise TypeError("Bad input type: {}. ".format(inputType) +
#"Requires Double.")

def _transform(self, dataset):
shift = self.getOrDefault("shift")

def f(v):
return v + shift

t = FloatType()
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
return dataset.withColumn(out_col, udf(f, t)(in_col))

class MockEstimator(Estimator, DefaultParamsReadable, DefaultParamsWritable, 
HasFake):

def __init__(self):
super(MockEstimator, self).__init__()
self.dataset_index = None

def _fit(self, dataset):
self.dataset_index = dataset.index
model = MockModel()
self._copyValues(model)
return model

class MockModel(MockTransformer, Model, HasFake):
pass


#class PipelineTests(PySparkTestCase):
class PipelineTests(object):

def test_pipeline(self, data=None):
#dataset = MockDataset()
dataset = MockDataset() if data is None else data
estimator0 = MockEstimator()
transformer1 = MockTransformer()
estimator2 = MockEstimator()
transformer3 = MockTransformer()
transformer4 = MockUnaryTransformer(inputCol="label",
outputCol="shifted_label")
pipeline = Pipeline(stages=[estimator0, transformer1, estimator2,
transformer3, transformer4])
pipeline_model = pipeline.fit(dataset,

[jira] [Commented] (SPARK-21542) Helper functions for custom Python Persistence

2018-09-12 Thread Peter Knight (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611733#comment-16611733
 ] 

Peter Knight commented on SPARK-21542:
--

Thanks for the replay [~JohnHBauer]. Yes I am using @keyword_only decorator 
exactly like in the stack overflow example you cite. I'll be interested to see 
your code if you get it working. Thanks.

> Helper functions for custom Python Persistence
> --
>
> Key: SPARK-21542
> URL: https://issues.apache.org/jira/browse/SPARK-21542
> Project: Spark
>  Issue Type: New Feature
>  Components: ML, PySpark
>Affects Versions: 2.2.0
>Reporter: Ajay Saini
>Assignee: Ajay Saini
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently, there is no way to easily persist Json-serializable parameters in 
> Python only. All parameters in Python are persisted by converting them to 
> Java objects and using the Java persistence implementation. In order to 
> facilitate the creation of custom Python-only pipeline stages, it would be 
> good to have a Python-only persistence framework so that these stages do not 
> need to be implemented in Scala for persistence. 
> This task involves:
> - Adding implementations for DefaultParamsReadable, DefaultParamsWriteable, 
> DefaultParamsReader, and DefaultParamsWriter in pyspark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21542) Helper functions for custom Python Persistence

2018-09-11 Thread John Bauer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611085#comment-16611085
 ] 

John Bauer commented on SPARK-21542:


You don't show your code for __init__ or setParams.  I recall getting this 
error before using the @keyword_only decorator, for example see 
https://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml

I will be trying to get my custom transformer pipeline to persist sometime next 
week I hope.  If I succeed, I will try to provide an example if no one else has.

> Helper functions for custom Python Persistence
> --
>
> Key: SPARK-21542
> URL: https://issues.apache.org/jira/browse/SPARK-21542
> Project: Spark
>  Issue Type: New Feature
>  Components: ML, PySpark
>Affects Versions: 2.2.0
>Reporter: Ajay Saini
>Assignee: Ajay Saini
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently, there is no way to easily persist Json-serializable parameters in 
> Python only. All parameters in Python are persisted by converting them to 
> Java objects and using the Java persistence implementation. In order to 
> facilitate the creation of custom Python-only pipeline stages, it would be 
> good to have a Python-only persistence framework so that these stages do not 
> need to be implemented in Scala for persistence. 
> This task involves:
> - Adding implementations for DefaultParamsReadable, DefaultParamsWriteable, 
> DefaultParamsReader, and DefaultParamsWriter in pyspark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21542) Helper functions for custom Python Persistence

2018-09-10 Thread Peter Knight (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16609495#comment-16609495
 ] 

Peter Knight commented on SPARK-21542:
--

It would be really helpful to have some example code on how to use these.

I have tried: 
{code}
from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

class MedianTrend(Transformer, DefaultParamsReadable, DefaultParamsWritable):
# code here to define Params and transform

# instantiate it
mt1 = MedianTrend(inputColList = ["v1"], outputColList = ["v1_trend_no_reset"], 
sortCol = "date")

# then save andit
path1 = "test_MedianTrend" 
mt1.write().overwrite().save(path1)

# then load it
mt1_loaded = mt1.load(path1)
df2 = mt1_loaded.transform(df)
df2.show()
{code}
This gives the following error:
{noformat}
'module' object has no attribute 'MedianTrend'{noformat}
 

> Helper functions for custom Python Persistence
> --
>
> Key: SPARK-21542
> URL: https://issues.apache.org/jira/browse/SPARK-21542
> Project: Spark
>  Issue Type: New Feature
>  Components: ML, PySpark
>Affects Versions: 2.2.0
>Reporter: Ajay Saini
>Assignee: Ajay Saini
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently, there is no way to easily persist Json-serializable parameters in 
> Python only. All parameters in Python are persisted by converting them to 
> Java objects and using the Java persistence implementation. In order to 
> facilitate the creation of custom Python-only pipeline stages, it would be 
> good to have a Python-only persistence framework so that these stages do not 
> need to be implemented in Scala for persistence. 
> This task involves:
> - Adding implementations for DefaultParamsReadable, DefaultParamsWriteable, 
> DefaultParamsReader, and DefaultParamsWriter in pyspark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org