Hi Frank

Thanks for this question. I have been comparing logistic regression in
sklearn with spark mllib as well. Your example code gave me a perfect way
to compare what is going on in both the packages.

I looked at both the source codes. There are quite a few differences in how
the model fitting is done. I have a solution for the logistic regression
problem. I do not have a solution for the linear regression problem yet.

Here are the key differences:
1. In spark, Regularization for L2 is divided by feature standard
deviation. In sklearn, it is not.
2. In spark, X's are standardized. This changes the solution because of
regularization. In sklearn, no standardization is done.
3. In Spark, Average log loss is used for training. The log loss is
averaged by sum of weights, which is the number of training instances.
Sklearn uses sum of log loss instead. So the spark regularization is very
heavy. You should scale down the regularization parameter by the number of
instances.


So, if you do the following, you should be able to match the outputs of
logistic regression:
1. Standardize the spark and pandas dataframes in a similar fashion. Note:
The standardization in spark works a little differently for ensuring unit
variance - spark uses sqrt(n) as denominator, and sklearn's standardscaler
uses sqrt(n-1) (unbiased estimator when mean is not known)
2. Scale down the regularization in spark by number of instances. Use 0.03
in your example instead of 0.3, given you have 10 training instances.

Hope this helps
-Dhanesh

Spark ml code (I changed it to work with Spark 2.1):
----------------------------------------------------------------

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.feature.StandardScaler

val sparkTrainingData_orig = new SQLContext(sc).
  createDataFrame(Seq(
    (0.0, Vectors.dense(Array(-0.7306653538519616, 0.0))),
    (1.0, Vectors.dense(Array(0.6750417712898752, -0.4232874171873786))),
    (1.0, Vectors.dense(Array(0.1863463229359709, -0.8163423997075965))),
    (0.0, Vectors.dense(Array(-0.6719842051493347, 0.0))),
    (1.0, Vectors.dense(Array(0.9699938346531928, 0.0))),
    (1.0, Vectors.dense(Array(0.22759406190283604, 0.0))),
    (1.0, Vectors.dense(Array(0.9688721028330911, 0.0))),
    (0.0, Vectors.dense(Array(0.5993795346650845, 0.0))),
    (0.0, Vectors.dense(Array(0.9219423508390701, -0.8972778242305388))),
    (0.0, Vectors.dense(Array(0.7006904841584055, -0.5607635619919824))))).
  toDF("label", "features_orig")

val sparkTrainingData=new
StandardScaler().setWithMean(true).setInputCol("features_orig").setOutputCol("features").fit(sparkTrainingData_orig).transform(sparkTrainingData_orig)

val logisticModel = new LogisticRegression().
  setRegParam(0.03).
  setLabelCol("label").
  setFeaturesCol("features").
  setTol(1e-12).
  setMaxIter(100).
  fit(sparkTrainingData)

println(s"Spark logistic model coefficients: ${logisticModel.coefficients}
Intercept: ${logisticModel.intercept}")
// Spark logistic model coefficients:
[0.8212244419577079,0.32615245441495727] Intercept: -0.011815325216668142


Sklearn Code:
-----------------

import numpy as np
from sklearn.linear_model import LogisticRegression, Ridge

X = np.array([
    [-0.7306653538519616, 0.0],
    [0.6750417712898752, -0.4232874171873786],
    [0.1863463229359709, -0.8163423997075965],
    [-0.6719842051493347, 0.0],
    [0.9699938346531928, 0.0],
    [0.22759406190283604, 0.0],
    [0.9688721028330911, 0.0],
    [0.5993795346650845, 0.0],
    [0.9219423508390701, -0.8972778242305388],
    [0.7006904841584055, -0.5607635619919824]
])

y = np.array([
    0.0,
    1.0,
    1.0,
    0.0,
    1.0,
    1.0,
    1.0,
    0.0,
    0.0,
    0.0
])

m, n = X.shape

# Scale and Add intercept term to simulate inputs to GameEstimator

from sklearn.preprocessing import StandardScaler

# Adjust by factor sqrt(n-1)/sqrt(n) to take care of standard deviation
formula differences
Xsc=StandardScaler().fit_transform(X)*3/np.sqrt(10)
Xsc_with_intercept = np.hstack((Xsc, np.ones(m)[:,np.newaxis]))

l = 0.3
e = LogisticRegression(
    fit_intercept=True,
    penalty='l2',
    C=1/l,
    max_iter=100,
    tol=1e-11,
    solver='lbfgs',verbose=1)

e.fit(Xsc, y)

print e.coef_, e.intercept_
# => [[ 0.82122437 0.32615256]] [-0.01181534]



Dhanesh
+91-9741125245

On Mon, Mar 13, 2017 at 7:50 AM, Frank Astier <fast...@linkedin.com.invalid>
wrote:

> (this was also posted to stackoverflow on 03/10)
>
> I am setting up a very simple logistic regression problem in scikit-learn
> and in spark.ml, and the results diverge: the models they learn are
> different, but I can't figure out why (data is the same, model type is the
> same, regularization is the same...).
>
> No doubt I am missing some setting on one side or the other. Which
> setting? How should I set up either scikit or spark.ml to find the same
> model as its counterpart?
>
> I give the sklearn code and spark.ml code below. Both should be ready to
> cut-and-paste and run.
>
> scikit-learn code:
> ----------------------
>
>     import numpy as np
>     from sklearn.linear_model import LogisticRegression, Ridge
>
>     X = np.array([
>         [-0.7306653538519616, 0.0],
>         [0.6750417712898752, -0.4232874171873786],
>         [0.1863463229359709, -0.8163423997075965],
>         [-0.6719842051493347, 0.0],
>         [0.9699938346531928, 0.0],
>         [0.22759406190283604, 0.0],
>         [0.9688721028330911, 0.0],
>         [0.5993795346650845, 0.0],
>         [0.9219423508390701, -0.8972778242305388],
>         [0.7006904841584055, -0.5607635619919824]
>     ])
>
>     y = np.array([
>         0.0,
>         1.0,
>         1.0,
>         0.0,
>         1.0,
>         1.0,
>         1.0,
>         0.0,
>         0.0,
>         0.0
>     ])
>
>     m, n = X.shape
>
>     # Add intercept term to simulate inputs to GameEstimator
>     X_with_intercept = np.hstack((X, np.ones(m)[:,np.newaxis]))
>
>     l = 0.3
>     e = LogisticRegression(
>         fit_intercept=False,
>         penalty='l2',
>         C=1/l,
>         max_iter=100,
>         tol=1e-11)
>
>     e.fit(X_with_intercept, y)
>
>     print e.coef_
>     # => [[ 0.98662189  0.45571052 -0.23467255 <0234%2067255>]]
>
>     # Linear regression is called Ridge in sklearn
>     e = Ridge(
>         fit_intercept=False,
>         alpha=l,
>         max_iter=100,
>         tol=1e-11)
>
>     e.fit(X_with_intercept, y)
>
>     print e.coef_
>     # =>[ 0.32155545  0.17904355  0.41222418 <04122%202418>]
>
> spark.ml code:
> -------------------
>
>     import org.apache.spark.{SparkConf, SparkContext}
>     import org.apache.spark.ml.classification.LogisticRegression
>     import org.apache.spark.ml.regression.LinearRegression
>     import org.apache.spark.mllib.linalg.Vectors
>     import org.apache.spark.mllib.regression.LabeledPoint
>     import org.apache.spark.sql.SQLContext
>
>     object TestSparkRegression {
>       def main(args: Array[String]): Unit = {
>         import org.apache.log4j.{Level, Logger}
>
>         Logger.getLogger("org").setLevel(Level.OFF)
>         Logger.getLogger("akka").setLevel(Level.OFF)
>
>         val conf = new SparkConf().setAppName("test").setMaster("local")
>         val sc = new SparkContext(conf)
>
>         val sparkTrainingData = new SQLContext(sc)
>           .createDataFrame(Seq(
>             LabeledPoint(0.0, Vectors.dense(-0.7306653538519616, 0.0)),
>             LabeledPoint(1.0, Vectors.dense(0.6750417712898752,
> -0.4232874171873786)),
>             LabeledPoint(1.0, Vectors.dense(0.1863463229359709,
> -0.8163423997075965)),
>             LabeledPoint(0.0, Vectors.dense(-0.6719842051493347, 0.0)),
>             LabeledPoint(1.0, Vectors.dense(0.9699938346531928, 0.0)),
>             LabeledPoint(1.0, Vectors.dense(0.22759406190283604, 0.0)),
>             LabeledPoint(1.0, Vectors.dense(0.9688721028330911, 0.0)),
>             LabeledPoint(0.0, Vectors.dense(0.5993795346650845, 0.0)),
>             LabeledPoint(0.0, Vectors.dense(0.9219423508390701,
> -0.8972778242305388)),
>             LabeledPoint(0.0, Vectors.dense(0.7006904841584055,
> -0.5607635619919824))))
>           .toDF("label", "features")
>
>         val logisticModel = new LogisticRegression()
>           .setRegParam(0.3)
>           .setLabelCol("label")
>           .setFeaturesCol("features")
>           .fit(sparkTrainingData)
>
>         println(s"Spark logistic model coefficients:
> ${logisticModel.coefficients} Intercept: ${logisticModel.intercept}")
>         // Spark logistic model coefficients:
> [0.5451588538376263,0.26740606573584713] Intercept: -0.13897955358689987
>
>         val linearModel = new LinearRegression()
>           .setRegParam(0.3)
>           .setLabelCol("label")
>           .setFeaturesCol("features")
>           .setSolver("l-bfgs")
>           .fit(sparkTrainingData)
>
>         println(s"Spark linear model coefficients:
> ${linearModel.coefficients} Intercept: ${linearModel.intercept}")
>         // Spark linear model coefficients: 
> [0.19852664861346023,0.11501200541407802]
> Intercept: 0.45464906876832323
>
>         sc.stop()
>       }
>     }
>
> Thanks,
>
> Frank
>

Reply via email to