[Edit] I got few details wrong in my eagerness to reply: 1. Spark uses the corrected standard deviation with sqrt(n-1), and scikit uses the one with sqrt(n). 2. You should scale down the regularization by sum of weights, in case you have a column of weights. When there are no weights, it is equivalent to sum of instances.
Dhanesh +91-9741125245 On Mon, Mar 13, 2017 at 5:31 PM, Dhanesh Padmanabhan <dhanesh12...@gmail.com > wrote: > Hi Frank > > Thanks for this question. I have been comparing logistic regression in > sklearn with spark mllib as well. Your example code gave me a perfect way > to compare what is going on in both the packages. > > I looked at both the source codes. There are quite a few differences in > how the model fitting is done. I have a solution for the logistic > regression problem. I do not have a solution for the linear regression > problem yet. > > Here are the key differences: > 1. In spark, Regularization for L2 is divided by feature standard > deviation. In sklearn, it is not. > 2. In spark, X's are standardized. This changes the solution because of > regularization. In sklearn, no standardization is done. > 3. In Spark, Average log loss is used for training. The log loss is > averaged by sum of weights, which is the number of training instances. > Sklearn uses sum of log loss instead. So the spark regularization is very > heavy. You should scale down the regularization parameter by the number of > instances. > > > So, if you do the following, you should be able to match the outputs of > logistic regression: > 1. Standardize the spark and pandas dataframes in a similar fashion. Note: > The standardization in spark works a little differently for ensuring unit > variance - spark uses sqrt(n) as denominator, and sklearn's standardscaler > uses sqrt(n-1) (unbiased estimator when mean is not known) > 2. Scale down the regularization in spark by number of instances. Use 0.03 > in your example instead of 0.3, given you have 10 training instances. > > Hope this helps > -Dhanesh > > Spark ml code (I changed it to work with Spark 2.1): > ---------------------------------------------------------------- > > import org.apache.spark.{SparkConf, SparkContext} > import org.apache.spark.ml.classification.LogisticRegression > import org.apache.spark.ml.regression.LinearRegression > import org.apache.spark.ml.linalg.Vectors > import org.apache.spark.sql.SQLContext > import org.apache.spark.ml.feature.StandardScaler > > val sparkTrainingData_orig = new SQLContext(sc). > createDataFrame(Seq( > (0.0, Vectors.dense(Array(-0.7306653538519616, 0.0))), > (1.0, Vectors.dense(Array(0.6750417712898752, -0.4232874171873786))), > (1.0, Vectors.dense(Array(0.1863463229359709, -0.8163423997075965))), > (0.0, Vectors.dense(Array(-0.6719842051493347, 0.0))), > (1.0, Vectors.dense(Array(0.9699938346531928, 0.0))), > (1.0, Vectors.dense(Array(0.22759406190283604, 0.0))), > (1.0, Vectors.dense(Array(0.9688721028330911, 0.0))), > (0.0, Vectors.dense(Array(0.5993795346650845, 0.0))), > (0.0, Vectors.dense(Array(0.9219423508390701, -0.8972778242305388))), > (0.0, Vectors.dense(Array(0.7006904841584055, > -0.5607635619919824))))). > toDF("label", "features_orig") > > val sparkTrainingData=new StandardScaler().setWithMean( > true).setInputCol("features_orig").setOutputCol("features" > ).fit(sparkTrainingData_orig).transform(sparkTrainingData_orig) > > val logisticModel = new LogisticRegression(). > setRegParam(0.03). > setLabelCol("label"). > setFeaturesCol("features"). > setTol(1e-12). > setMaxIter(100). > fit(sparkTrainingData) > > println(s"Spark logistic model coefficients: ${logisticModel.coefficients} > Intercept: ${logisticModel.intercept}") > // Spark logistic model coefficients: [0.8212244419577079,0.32615245441495727] > Intercept: -0.011815325216668142 > > > Sklearn Code: > ----------------- > > import numpy as np > from sklearn.linear_model import LogisticRegression, Ridge > > X = np.array([ > [-0.7306653538519616, 0.0], > [0.6750417712898752, -0.4232874171873786], > [0.1863463229359709, -0.8163423997075965], > [-0.6719842051493347, 0.0], > [0.9699938346531928, 0.0], > [0.22759406190283604, 0.0], > [0.9688721028330911, 0.0], > [0.5993795346650845, 0.0], > [0.9219423508390701, -0.8972778242305388], > [0.7006904841584055, -0.5607635619919824] > ]) > > y = np.array([ > 0.0, > 1.0, > 1.0, > 0.0, > 1.0, > 1.0, > 1.0, > 0.0, > 0.0, > 0.0 > ]) > > m, n = X.shape > > # Scale and Add intercept term to simulate inputs to GameEstimator > > from sklearn.preprocessing import StandardScaler > > # Adjust by factor sqrt(n-1)/sqrt(n) to take care of standard deviation > formula differences > Xsc=StandardScaler().fit_transform(X)*3/np.sqrt(10) > Xsc_with_intercept = np.hstack((Xsc, np.ones(m)[:,np.newaxis])) > > l = 0.3 > e = LogisticRegression( > fit_intercept=True, > penalty='l2', > C=1/l, > max_iter=100, > tol=1e-11, > solver='lbfgs',verbose=1) > > e.fit(Xsc, y) > > print e.coef_, e.intercept_ > # => [[ 0.82122437 <0821%2022437> 0.32615256]] [-0.01181534] > > > > Dhanesh > +91-9741125245 <+91%2097411%2025245> > > On Mon, Mar 13, 2017 at 7:50 AM, Frank Astier < > fast...@linkedin.com.invalid> wrote: > >> (this was also posted to stackoverflow on 03/10) >> >> I am setting up a very simple logistic regression problem in scikit-learn >> and in spark.ml, and the results diverge: the models they learn are >> different, but I can't figure out why (data is the same, model type is the >> same, regularization is the same...). >> >> No doubt I am missing some setting on one side or the other. Which >> setting? How should I set up either scikit or spark.ml to find the same >> model as its counterpart? >> >> I give the sklearn code and spark.ml code below. Both should be ready to >> cut-and-paste and run. >> >> scikit-learn code: >> ---------------------- >> >> import numpy as np >> from sklearn.linear_model import LogisticRegression, Ridge >> >> X = np.array([ >> [-0.7306653538519616, 0.0], >> [0.6750417712898752, -0.4232874171873786], >> [0.1863463229359709, -0.8163423997075965], >> [-0.6719842051493347, 0.0], >> [0.9699938346531928, 0.0], >> [0.22759406190283604, 0.0], >> [0.9688721028330911, 0.0], >> [0.5993795346650845, 0.0], >> [0.9219423508390701, -0.8972778242305388], >> [0.7006904841584055, -0.5607635619919824] >> ]) >> >> y = np.array([ >> 0.0, >> 1.0, >> 1.0, >> 0.0, >> 1.0, >> 1.0, >> 1.0, >> 0.0, >> 0.0, >> 0.0 >> ]) >> >> m, n = X.shape >> >> # Add intercept term to simulate inputs to GameEstimator >> X_with_intercept = np.hstack((X, np.ones(m)[:,np.newaxis])) >> >> l = 0.3 >> e = LogisticRegression( >> fit_intercept=False, >> penalty='l2', >> C=1/l, >> max_iter=100, >> tol=1e-11) >> >> e.fit(X_with_intercept, y) >> >> print e.coef_ >> # => [[ 0.98662189 <09866%202189> 0.45571052 <04557%201052> - >> 0.23467255 <0234%2067255>]] >> >> # Linear regression is called Ridge in sklearn >> e = Ridge( >> fit_intercept=False, >> alpha=l, >> max_iter=100, >> tol=1e-11) >> >> e.fit(X_with_intercept, y) >> >> print e.coef_ >> # =>[ 0.32155545 0.17904355 0.41222418 <04122%202418>] >> >> spark.ml code: >> ------------------- >> >> import org.apache.spark.{SparkConf, SparkContext} >> import org.apache.spark.ml.classification.LogisticRegression >> import org.apache.spark.ml.regression.LinearRegression >> import org.apache.spark.mllib.linalg.Vectors >> import org.apache.spark.mllib.regression.LabeledPoint >> import org.apache.spark.sql.SQLContext >> >> object TestSparkRegression { >> def main(args: Array[String]): Unit = { >> import org.apache.log4j.{Level, Logger} >> >> Logger.getLogger("org").setLevel(Level.OFF) >> Logger.getLogger("akka").setLevel(Level.OFF) >> >> val conf = new SparkConf().setAppName("test").setMaster("local") >> val sc = new SparkContext(conf) >> >> val sparkTrainingData = new SQLContext(sc) >> .createDataFrame(Seq( >> LabeledPoint(0.0, Vectors.dense(-0.7306653538519 >> <07306%20653538519>616, 0.0)), >> LabeledPoint(1.0, Vectors.dense(0.67504177128987 >> <06750%204177128987>52, -0.4232874171873786)), >> LabeledPoint(1.0, Vectors.dense(0.1863463229359709, >> -0.8163423997075965)), >> LabeledPoint(0.0, Vectors.dense(-0.6719842051493 >> <0671%209842051493>347, 0.0)), >> LabeledPoint(1.0, Vectors.dense(0.9699938346531928, 0.0)), >> LabeledPoint(1.0, Vectors.dense(0.22759406190283 >> <02275%209406190283>604, 0.0)), >> LabeledPoint(1.0, Vectors.dense(0.9688721028330911, 0.0)), >> LabeledPoint(0.0, Vectors.dense(0.5993795346650845, 0.0)), >> LabeledPoint(0.0, Vectors.dense(0.9219423508390701, >> -0.8972778242305388)), >> LabeledPoint(0.0, Vectors.dense(0.7006904841584055, >> -0.5607635619919824)))) >> .toDF("label", "features") >> >> val logisticModel = new LogisticRegression() >> .setRegParam(0.3) >> .setLabelCol("label") >> .setFeaturesCol("features") >> .fit(sparkTrainingData) >> >> println(s"Spark logistic model coefficients: >> ${logisticModel.coefficients} Intercept: ${logisticModel.intercept}") >> // Spark logistic model coefficients: [0.5451588538376263, >> 0.26740606 <02674%200606>573584713] Intercept: -0.13897955358689987 >> >> val linearModel = new LinearRegression() >> .setRegParam(0.3) >> .setLabelCol("label") >> .setFeaturesCol("features") >> .setSolver("l-bfgs") >> .fit(sparkTrainingData) >> >> println(s"Spark linear model coefficients: >> ${linearModel.coefficients} Intercept: ${linearModel.intercept}") >> // Spark linear model coefficients: [0.19852664861346023,0.1150120 >> 0541407802 <0541%20407802>] Intercept: 0.45464906876832323 >> >> sc.stop() >> } >> } >> >> Thanks, >> >> Frank >> > >