Repository: spark
Updated Branches:
  refs/heads/master 415f9f342 -> f6314eab4


[SPARK-19391][SPARKR][ML] Tweedie GLM API for SparkR

## What changes were proposed in this pull request?
Port Tweedie GLM  #16344  to SparkR

felixcheung yanboliang

## How was this patch tested?
new test in SparkR

Author: actuaryzhang <actuaryzhan...@gmail.com>

Closes #16729 from actuaryzhang/sparkRTweedie.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6314eab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6314eab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6314eab

Branch: refs/heads/master
Commit: f6314eab4b494bd5b5e9e41c6f582d4f22c0967a
Parents: 415f9f3
Author: actuaryzhang <actuaryzhan...@gmail.com>
Authored: Tue Mar 14 00:50:38 2017 -0700
Committer: Felix Cheung <felixche...@apache.org>
Committed: Tue Mar 14 00:50:38 2017 -0700

----------------------------------------------------------------------
 R/pkg/R/mllib_regression.R                      | 55 +++++++++++++++++---
 .../inst/tests/testthat/test_mllib_regression.R | 38 +++++++++++++-
 R/pkg/vignettes/sparkr-vignettes.Rmd            | 19 ++++++-
 .../r/GeneralizedLinearRegressionWrapper.scala  | 19 +++++--
 4 files changed, 117 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f6314eab/R/pkg/R/mllib_regression.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/mllib_regression.R b/R/pkg/R/mllib_regression.R
index 648d363..d59c890 100644
--- a/R/pkg/R/mllib_regression.R
+++ b/R/pkg/R/mllib_regression.R
@@ -53,12 +53,23 @@ setClass("IsotonicRegressionModel", representation(jobj = 
"jobj"))
 #'               the result of a call to a family function. Refer R family at
 #'               
\url{https://stat.ethz.ch/R-manual/R-devel/library/stats/html/family.html}.
 #'               Currently these families are supported: \code{binomial}, 
\code{gaussian},
-#'               \code{Gamma}, and \code{poisson}.
+#'               \code{Gamma}, \code{poisson} and \code{tweedie}.
+#'
+#'               Note that there are two ways to specify the tweedie family.
+#'               \itemize{
+#'                \item Set \code{family = "tweedie"} and specify the 
var.power and link.power;
+#'                \item When package \code{statmod} is loaded, the tweedie 
family is specified using the
+#'                family definition therein, i.e., \code{tweedie(var.power, 
link.power)}.
+#'               }
 #' @param tol positive convergence tolerance of iterations.
 #' @param maxIter integer giving the maximal number of IRLS iterations.
 #' @param weightCol the weight column name. If this is not set or \code{NULL}, 
we treat all instance
 #'                  weights as 1.0.
 #' @param regParam regularization parameter for L2 regularization.
+#' @param var.power the power in the variance function of the Tweedie 
distribution which provides
+#'                      the relationship between the variance and mean of the 
distribution. Only
+#'                      applicable to the Tweedie family.
+#' @param link.power the index in the power link function. Only applicable to 
the Tweedie family.
 #' @param ... additional arguments passed to the method.
 #' @aliases spark.glm,SparkDataFrame,formula-method
 #' @return \code{spark.glm} returns a fitted generalized linear model.
@@ -84,14 +95,30 @@ setClass("IsotonicRegressionModel", representation(jobj = 
"jobj"))
 #' # can also read back the saved model and print
 #' savedModel <- read.ml(path)
 #' summary(savedModel)
+#'
+#' # fit tweedie model
+#' model <- spark.glm(df, Freq ~ Sex + Age, family = "tweedie",
+#'                    var.power = 1.2, link.power = 0)
+#' summary(model)
+#'
+#' # use the tweedie family from statmod
+#' library(statmod)
+#' model <- spark.glm(df, Freq ~ Sex + Age, family = tweedie(1.2, 0))
+#' summary(model)
 #' }
 #' @note spark.glm since 2.0.0
 #' @seealso \link{glm}, \link{read.ml}
 setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
           function(data, formula, family = gaussian, tol = 1e-6, maxIter = 25, 
weightCol = NULL,
-                   regParam = 0.0) {
+                   regParam = 0.0, var.power = 0.0, link.power = 1.0 - 
var.power) {
+
             if (is.character(family)) {
-              family <- get(family, mode = "function", envir = parent.frame())
+              # Handle when family = "tweedie"
+              if (tolower(family) == "tweedie") {
+                family <- list(family = "tweedie", link = NULL)
+              } else {
+                family <- get(family, mode = "function", envir = 
parent.frame())
+              }
             }
             if (is.function(family)) {
               family <- family()
@@ -100,6 +127,12 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", 
formula = "formula"),
               print(family)
               stop("'family' not recognized")
             }
+            # Handle when family = statmod::tweedie()
+            if (tolower(family$family) == "tweedie" && 
!is.null(family$variance)) {
+              var.power <- log(family$variance(exp(1)))
+              link.power <- log(family$linkfun(exp(1)))
+              family <- list(family = "tweedie", link = NULL)
+            }
 
             formula <- paste(deparse(formula), collapse = "")
             if (!is.null(weightCol) && weightCol == "") {
@@ -111,7 +144,8 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", 
formula = "formula"),
             # For known families, Gamma is upper-cased
             jobj <- 
callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper",
                                 "fit", formula, data@sdf, 
tolower(family$family), family$link,
-                                tol, as.integer(maxIter), weightCol, regParam)
+                                tol, as.integer(maxIter), weightCol, regParam,
+                                as.double(var.power), as.double(link.power))
             new("GeneralizedLinearRegressionModel", jobj = jobj)
           })
 
@@ -126,11 +160,13 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", 
formula = "formula"),
 #'               the result of a call to a family function. Refer R family at
 #'               
\url{https://stat.ethz.ch/R-manual/R-devel/library/stats/html/family.html}.
 #'               Currently these families are supported: \code{binomial}, 
\code{gaussian},
-#'               \code{Gamma}, and \code{poisson}.
+#'               \code{poisson}, \code{Gamma}, and \code{tweedie}.
 #' @param weightCol the weight column name. If this is not set or \code{NULL}, 
we treat all instance
 #'                  weights as 1.0.
 #' @param epsilon positive convergence tolerance of iterations.
 #' @param maxit integer giving the maximal number of IRLS iterations.
+#' @param var.power the index of the power variance function in the Tweedie 
family.
+#' @param link.power the index of the power link function in the Tweedie 
family.
 #' @return \code{glm} returns a fitted generalized linear model.
 #' @rdname glm
 #' @export
@@ -145,8 +181,10 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", 
formula = "formula"),
 #' @note glm since 1.5.0
 #' @seealso \link{spark.glm}
 setMethod("glm", signature(formula = "formula", family = "ANY", data = 
"SparkDataFrame"),
-          function(formula, family = gaussian, data, epsilon = 1e-6, maxit = 
25, weightCol = NULL) {
-            spark.glm(data, formula, family, tol = epsilon, maxIter = maxit, 
weightCol = weightCol)
+          function(formula, family = gaussian, data, epsilon = 1e-6, maxit = 
25, weightCol = NULL,
+                   var.power = 0.0, link.power = 1.0 - var.power) {
+            spark.glm(data, formula, family, tol = epsilon, maxIter = maxit, 
weightCol = weightCol,
+                      var.power = var.power, link.power = link.power)
           })
 
 #  Returns the summary of a model produced by glm() or spark.glm(), similarly 
to R's summary().
@@ -172,9 +210,10 @@ setMethod("summary", signature(object = 
"GeneralizedLinearRegressionModel"),
             deviance <- callJMethod(jobj, "rDeviance")
             df.null <- callJMethod(jobj, "rResidualDegreeOfFreedomNull")
             df.residual <- callJMethod(jobj, "rResidualDegreeOfFreedom")
-            aic <- callJMethod(jobj, "rAic")
             iter <- callJMethod(jobj, "rNumIterations")
             family <- callJMethod(jobj, "rFamily")
+            aic <- callJMethod(jobj, "rAic")
+            if (family == "tweedie" && aic == 0) aic <- NA
             deviance.resid <- if (is.loaded) {
               NULL
             } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/f6314eab/R/pkg/inst/tests/testthat/test_mllib_regression.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_mllib_regression.R 
b/R/pkg/inst/tests/testthat/test_mllib_regression.R
index 81a5bdc..3e9ad77 100644
--- a/R/pkg/inst/tests/testthat/test_mllib_regression.R
+++ b/R/pkg/inst/tests/testthat/test_mllib_regression.R
@@ -77,6 +77,24 @@ test_that("spark.glm and predict", {
   out <- capture.output(print(summary(model)))
   expect_true(any(grepl("Dispersion parameter for gamma family", out)))
 
+  # tweedie family
+  model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
+                     family = "tweedie", var.power = 1.2, link.power = 0.0)
+  prediction <- predict(model, training)
+  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), 
"double")
+  vals <- collect(select(prediction, "prediction"))
+
+  # manual calculation of the R predicted values to avoid dependence on statmod
+  #' library(statmod)
+  #' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris,
+  #'             family = tweedie(var.power = 1.2, link.power = 0.0))
+  #' print(coef(rModel))
+
+  rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174)
+  rVals <- exp(as.numeric(model.matrix(Sepal.Width ~ Sepal.Length + Species,
+                                       data = iris) %*% rCoef))
+  expect_true(all(abs(rVals - vals) < 1e-5), rVals - vals)
+
   # Test stats::predict is working
   x <- rnorm(15)
   y <- x + rnorm(15)
@@ -233,7 +251,7 @@ test_that("glm and predict", {
   training <- suppressWarnings(createDataFrame(iris))
   # gaussian family
   model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
-               prediction <- predict(model, training)
+  prediction <- predict(model, training)
   expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), 
"double")
   vals <- collect(select(prediction, "prediction"))
   rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), 
iris)
@@ -249,6 +267,24 @@ test_that("glm and predict", {
                                         data = iris, family = poisson(link = 
identity)), iris))
   expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
 
+  # tweedie family
+  model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training,
+               family = "tweedie", var.power = 1.2, link.power = 0.0)
+  prediction <- predict(model, training)
+  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), 
"double")
+  vals <- collect(select(prediction, "prediction"))
+
+  # manual calculation of the R predicted values to avoid dependence on statmod
+  #' library(statmod)
+  #' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris,
+  #'             family = tweedie(var.power = 1.2, link.power = 0.0))
+  #' print(coef(rModel))
+
+  rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174)
+  rVals <- exp(as.numeric(model.matrix(Sepal.Width ~ Sepal.Length + Species,
+                                   data = iris) %*% rCoef))
+  expect_true(all(abs(rVals - vals) < 1e-5), rVals - vals)
+
   # Test stats::predict is working
   x <- rnorm(15)
   y <- x + rnorm(15)

http://git-wip-us.apache.org/repos/asf/spark/blob/f6314eab/R/pkg/vignettes/sparkr-vignettes.Rmd
----------------------------------------------------------------------
diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd 
b/R/pkg/vignettes/sparkr-vignettes.Rmd
index 43c255c..a6ff650 100644
--- a/R/pkg/vignettes/sparkr-vignettes.Rmd
+++ b/R/pkg/vignettes/sparkr-vignettes.Rmd
@@ -672,6 +672,7 @@ gaussian | identity, log, inverse
 binomial | logit, probit, cloglog (complementary log-log)
 poisson | log, identity, sqrt
 gamma | inverse, identity, log
+tweedie | power link function
 
 There are three ways to specify the `family` argument.
 
@@ -679,7 +680,11 @@ There are three ways to specify the `family` argument.
 
 * Family function, e.g. `family = binomial`.
 
-* Result returned by a family function, e.g. `family = poisson(link = log)`
+* Result returned by a family function, e.g. `family = poisson(link = log)`.
+
+* Note that there are two ways to specify the tweedie family:
+  a) Set `family = "tweedie"` and specify the `var.power` and `link.power`
+  b) When package `statmod` is loaded, the tweedie family is specified using 
the family definition therein, i.e., `tweedie()`.
 
 For more information regarding the families and their link functions, see the 
Wikipedia page [Generalized Linear 
Model](https://en.wikipedia.org/wiki/Generalized_linear_model).
 
@@ -695,6 +700,18 @@ gaussianFitted <- predict(gaussianGLM, carsDF)
 head(select(gaussianFitted, "model", "prediction", "mpg", "wt", "hp"))
 ```
 
+The following is the same fit using the tweedie family:
+```{r}
+tweedieGLM1 <- spark.glm(carsDF, mpg ~ wt + hp, family = "tweedie", var.power 
= 0.0)
+summary(tweedieGLM1)
+```
+We can try other distributions in the tweedie family, for example, a compound 
Poisson distribution with a log link:
+```{r}
+tweedieGLM2 <- spark.glm(carsDF, mpg ~ wt + hp, family = "tweedie", 
+                         var.power = 1.2, link.power = 0.0)
+summary(tweedieGLM2)
+```
+
 #### Isotonic Regression
 
 `spark.isoreg` fits an [Isotonic 
Regression](https://en.wikipedia.org/wiki/Isotonic_regression) model against a 
`SparkDataFrame`. It solves a weighted univariate a regression problem under a 
complete order constraint. Specifically, given a set of real observed responses 
$y_1, \ldots, y_n$, corresponding real features $x_1, \ldots, x_n$, and 
optionally positive weights $w_1, \ldots, w_n$, we want to find a monotone 
(piecewise linear) function $f$ to  minimize

http://git-wip-us.apache.org/repos/asf/spark/blob/f6314eab/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
index cbd6cd1..c49416b 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
@@ -71,7 +71,9 @@ private[r] object GeneralizedLinearRegressionWrapper
       tol: Double,
       maxIter: Int,
       weightCol: String,
-      regParam: Double): GeneralizedLinearRegressionWrapper = {
+      regParam: Double,
+      variancePower: Double,
+      linkPower: Double): GeneralizedLinearRegressionWrapper = {
     val rFormula = new RFormula().setFormula(formula)
     checkDataColumns(rFormula, data)
     val rFormulaModel = rFormula.fit(data)
@@ -83,13 +85,17 @@ private[r] object GeneralizedLinearRegressionWrapper
     // assemble and fit the pipeline
     val glr = new GeneralizedLinearRegression()
       .setFamily(family)
-      .setLink(link)
       .setFitIntercept(rFormula.hasIntercept)
       .setTol(tol)
       .setMaxIter(maxIter)
       .setRegParam(regParam)
       .setFeaturesCol(rFormula.getFeaturesCol)
-
+    // set variancePower and linkPower if family is tweedie; otherwise, set 
link function
+    if (family.toLowerCase == "tweedie") {
+      glr.setVariancePower(variancePower).setLinkPower(linkPower)
+    } else {
+      glr.setLink(link)
+    }
     if (weightCol != null) glr.setWeightCol(weightCol)
 
     val pipeline = new Pipeline()
@@ -145,7 +151,12 @@ private[r] object GeneralizedLinearRegressionWrapper
     val rDeviance: Double = summary.deviance
     val rResidualDegreeOfFreedomNull: Long = 
summary.residualDegreeOfFreedomNull
     val rResidualDegreeOfFreedom: Long = summary.residualDegreeOfFreedom
-    val rAic: Double = summary.aic
+    val rAic: Double = if (family.toLowerCase == "tweedie" &&
+      !Array(0.0, 1.0, 2.0).exists(x => math.abs(x - variancePower) < 1e-8)) {
+      0.0
+    } else {
+      summary.aic
+    }
     val rNumIterations: Int = summary.numIterations
 
     new GeneralizedLinearRegressionWrapper(pipeline, rFeatures, rCoefficients, 
rDispersion,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to