Repository: spark
Updated Branches:
  refs/heads/master 0114c89d0 -> fb0562f34


[SPARK-22810][ML][PYSPARK] Expose Python API for LinearRegression with huber 
loss.

## What changes were proposed in this pull request?
Expose Python API for _LinearRegression_ with _huber_ loss.

## How was this patch tested?
Unit test.

Author: Yanbo Liang <yblia...@gmail.com>

Closes #19994 from yanboliang/spark-22810.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb0562f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb0562f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb0562f3

Branch: refs/heads/master
Commit: fb0562f34605cd27fd39d09e6664a46e55eac327
Parents: 0114c89
Author: Yanbo Liang <yblia...@gmail.com>
Authored: Wed Dec 20 17:51:42 2017 -0800
Committer: Yanbo Liang <yblia...@gmail.com>
Committed: Wed Dec 20 17:51:42 2017 -0800

----------------------------------------------------------------------
 .../pyspark/ml/param/_shared_params_code_gen.py |  3 +-
 python/pyspark/ml/param/shared.py               | 23 +++++++
 python/pyspark/ml/regression.py                 | 64 +++++++++++++++-----
 python/pyspark/ml/tests.py                      | 21 +++++++
 4 files changed, 96 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fb0562f3/python/pyspark/ml/param/_shared_params_code_gen.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py 
b/python/pyspark/ml/param/_shared_params_code_gen.py
index 130d1a0..d55d209 100644
--- a/python/pyspark/ml/param/_shared_params_code_gen.py
+++ b/python/pyspark/ml/param/_shared_params_code_gen.py
@@ -154,7 +154,8 @@ if __name__ == "__main__":
         ("aggregationDepth", "suggested depth for treeAggregate (>= 2).", "2",
          "TypeConverters.toInt"),
         ("parallelism", "the number of threads to use when running parallel 
algorithms (>= 1).",
-         "1", "TypeConverters.toInt")]
+         "1", "TypeConverters.toInt"),
+        ("loss", "the loss function to be optimized.", None, 
"TypeConverters.toString")]
 
     code = []
     for name, doc, defaultValueStr, typeConverter in shared:

http://git-wip-us.apache.org/repos/asf/spark/blob/fb0562f3/python/pyspark/ml/param/shared.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/param/shared.py 
b/python/pyspark/ml/param/shared.py
index 4041d9c..e5c5ddf 100644
--- a/python/pyspark/ml/param/shared.py
+++ b/python/pyspark/ml/param/shared.py
@@ -632,6 +632,29 @@ class HasParallelism(Params):
         return self.getOrDefault(self.parallelism)
 
 
+class HasLoss(Params):
+    """
+    Mixin for param loss: the loss function to be optimized.
+    """
+
+    loss = Param(Params._dummy(), "loss", "the loss function to be 
optimized.", typeConverter=TypeConverters.toString)
+
+    def __init__(self):
+        super(HasLoss, self).__init__()
+
+    def setLoss(self, value):
+        """
+        Sets the value of :py:attr:`loss`.
+        """
+        return self._set(loss=value)
+
+    def getLoss(self):
+        """
+        Gets the value of loss or its default value.
+        """
+        return self.getOrDefault(self.loss)
+
+
 class DecisionTreeParams(Params):
     """
     Mixin for Decision Tree parameters.

http://git-wip-us.apache.org/repos/asf/spark/blob/fb0562f3/python/pyspark/ml/regression.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py
index 9d5b768..f0812bd 100644
--- a/python/pyspark/ml/regression.py
+++ b/python/pyspark/ml/regression.py
@@ -39,23 +39,26 @@ __all__ = ['AFTSurvivalRegression', 
'AFTSurvivalRegressionModel',
 @inherit_doc
 class LinearRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, 
HasPredictionCol, HasMaxIter,
                        HasRegParam, HasTol, HasElasticNetParam, 
HasFitIntercept,
-                       HasStandardization, HasSolver, HasWeightCol, 
HasAggregationDepth,
+                       HasStandardization, HasSolver, HasWeightCol, 
HasAggregationDepth, HasLoss,
                        JavaMLWritable, JavaMLReadable):
     """
     Linear regression.
 
-    The learning objective is to minimize the squared error, with 
regularization.
-    The specific squared error loss function used is: L = 1/2n ||A 
coefficients - y||^2^
+    The learning objective is to minimize the specified loss function, with 
regularization.
+    This supports two kinds of loss:
 
-    This supports multiple types of regularization:
-
-     * none (a.k.a. ordinary least squares)
+    * squaredError (a.k.a squared loss)
+    * huber (a hybrid of squared error for relatively small errors and 
absolute error for \
+    relatively large ones, and we estimate the scale parameter from training 
data)
 
-     * L2 (ridge regression)
+    This supports multiple types of regularization:
 
-     * L1 (Lasso)
+    * none (a.k.a. ordinary least squares)
+    * L2 (ridge regression)
+    * L1 (Lasso)
+    * L2 + L1 (elastic net)
 
-     * L2 + L1 (elastic net)
+    Note: Fitting with huber loss only supports none and L2 regularization.
 
     >>> from pyspark.ml.linalg import Vectors
     >>> df = spark.createDataFrame([
@@ -98,19 +101,28 @@ class LinearRegression(JavaEstimator, HasFeaturesCol, 
HasLabelCol, HasPrediction
     solver = Param(Params._dummy(), "solver", "The solver algorithm for 
optimization. Supported " +
                    "options: auto, normal, l-bfgs.", 
typeConverter=TypeConverters.toString)
 
+    loss = Param(Params._dummy(), "loss", "The loss function to be optimized. 
Supported " +
+                 "options: squaredError, huber.", 
typeConverter=TypeConverters.toString)
+
+    epsilon = Param(Params._dummy(), "epsilon", "The shape parameter to 
control the amount of " +
+                    "robustness. Must be > 1.0. Only valid when loss is huber",
+                    typeConverter=TypeConverters.toFloat)
+
     @keyword_only
     def __init__(self, featuresCol="features", labelCol="label", 
predictionCol="prediction",
                  maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, 
fitIntercept=True,
-                 standardization=True, solver="auto", weightCol=None, 
aggregationDepth=2):
+                 standardization=True, solver="auto", weightCol=None, 
aggregationDepth=2,
+                 loss="squaredError", epsilon=1.35):
         """
         __init__(self, featuresCol="features", labelCol="label", 
predictionCol="prediction", \
                  maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, 
fitIntercept=True, \
-                 standardization=True, solver="auto", weightCol=None, 
aggregationDepth=2)
+                 standardization=True, solver="auto", weightCol=None, 
aggregationDepth=2, \
+                 loss="squaredError", epsilon=1.35)
         """
         super(LinearRegression, self).__init__()
         self._java_obj = self._new_java_obj(
             "org.apache.spark.ml.regression.LinearRegression", self.uid)
-        self._setDefault(maxIter=100, regParam=0.0, tol=1e-6)
+        self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, 
loss="squaredError", epsilon=1.35)
         kwargs = self._input_kwargs
         self.setParams(**kwargs)
 
@@ -118,11 +130,13 @@ class LinearRegression(JavaEstimator, HasFeaturesCol, 
HasLabelCol, HasPrediction
     @since("1.4.0")
     def setParams(self, featuresCol="features", labelCol="label", 
predictionCol="prediction",
                   maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, 
fitIntercept=True,
-                  standardization=True, solver="auto", weightCol=None, 
aggregationDepth=2):
+                  standardization=True, solver="auto", weightCol=None, 
aggregationDepth=2,
+                  loss="squaredError", epsilon=1.35):
         """
         setParams(self, featuresCol="features", labelCol="label", 
predictionCol="prediction", \
                   maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, 
fitIntercept=True, \
-                  standardization=True, solver="auto", weightCol=None, 
aggregationDepth=2)
+                  standardization=True, solver="auto", weightCol=None, 
aggregationDepth=2, \
+                  loss="squaredError", epsilon=1.35)
         Sets params for linear regression.
         """
         kwargs = self._input_kwargs
@@ -131,6 +145,20 @@ class LinearRegression(JavaEstimator, HasFeaturesCol, 
HasLabelCol, HasPrediction
     def _create_model(self, java_model):
         return LinearRegressionModel(java_model)
 
+    @since("2.3.0")
+    def setEpsilon(self, value):
+        """
+        Sets the value of :py:attr:`epsilon`.
+        """
+        return self._set(epsilon=value)
+
+    @since("2.3.0")
+    def getEpsilon(self):
+        """
+        Gets the value of epsilon or its default value.
+        """
+        return self.getOrDefault(self.epsilon)
+
 
 class LinearRegressionModel(JavaModel, JavaPredictionModel, JavaMLWritable, 
JavaMLReadable):
     """
@@ -156,6 +184,14 @@ class LinearRegressionModel(JavaModel, 
JavaPredictionModel, JavaMLWritable, Java
         return self._call_java("intercept")
 
     @property
+    @since("2.3.0")
+    def scale(self):
+        """
+        The value by which \|y - X'w\| is scaled down when loss is "huber", 
otherwise 1.0.
+        """
+        return self._call_java("scale")
+
+    @property
     @since("2.0.0")
     def summary(self):
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/fb0562f3/python/pyspark/ml/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index be15211..afcb088 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -1726,6 +1726,27 @@ class 
GeneralizedLinearRegressionTest(SparkSessionTestCase):
         self.assertTrue(np.isclose(model.intercept, -1.561613, atol=1E-4))
 
 
+class LinearRegressionTest(SparkSessionTestCase):
+
+    def test_linear_regression_with_huber_loss(self):
+
+        data_path = "data/mllib/sample_linear_regression_data.txt"
+        df = self.spark.read.format("libsvm").load(data_path)
+
+        lir = LinearRegression(loss="huber", epsilon=2.0)
+        model = lir.fit(df)
+
+        expectedCoefficients = [0.136, 0.7648, -0.7761, 2.4236, 0.537,
+                                1.2612, -0.333, -0.5694, -0.6311, 0.6053]
+        expectedIntercept = 0.1607
+        expectedScale = 9.758
+
+        self.assertTrue(
+            np.allclose(model.coefficients.toArray(), expectedCoefficients, 
atol=1E-3))
+        self.assertTrue(np.isclose(model.intercept, expectedIntercept, 
atol=1E-3))
+        self.assertTrue(np.isclose(model.scale, expectedScale, atol=1E-3))
+
+
 class LogisticRegressionTest(SparkSessionTestCase):
 
     def test_binomial_logistic_regression_with_bound(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to