Repository: spark
Updated Branches:
  refs/heads/branch-1.4 c0ec20a51 -> 4b91e18d9


[SPARK-6799] [SPARKR] Remove SparkR RDD examples, add dataframe examples

This PR also makes some of the DataFrame to RDD methods private as the RDD 
class is private in 1.4

cc rxin pwendell

Author: Shivaram Venkataraman <shiva...@cs.berkeley.edu>

Closes #5949 from shivaram/sparkr-examples and squashes the following commits:

6c42fdc [Shivaram Venkataraman] Remove SparkR RDD examples, add dataframe 
examples

(cherry picked from commit 4e930420c19ae7773b138dfc7db8fc03b4660251)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b91e18d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b91e18d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b91e18d

Branch: refs/heads/branch-1.4
Commit: 4b91e18d9b7803dbfe1e1cf20b46163d8cb8716c
Parents: c0ec20a
Author: Shivaram Venkataraman <shiva...@cs.berkeley.edu>
Authored: Wed May 6 17:28:11 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed May 6 17:32:00 2015 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                           |   4 -
 R/pkg/R/DataFrame.R                       |   2 +-
 examples/src/main/r/dataframe.R           |  54 +++++++++++++
 examples/src/main/r/kmeans.R              |  93 ---------------------
 examples/src/main/r/linear_solver_mnist.R | 107 -------------------------
 examples/src/main/r/logistic_regression.R |  62 --------------
 examples/src/main/r/pi.R                  |  46 -----------
 examples/src/main/r/wordcount.R           |  42 ----------
 8 files changed, 55 insertions(+), 355 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4b91e18d/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 528e660..3fb92be 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -45,8 +45,6 @@ exportMethods("cache",
               "showDF",
               "sortDF",
               "take",
-              "toJSON",
-              "toRDD",
               "unionAll",
               "unpersist",
               "where",
@@ -95,14 +93,12 @@ export("cacheTable",
        "createExternalTable",
        "dropTempTable",
        "jsonFile",
-       "jsonRDD",
        "loadDF",
        "parquetFile",
        "sql",
        "table",
        "tableNames",
        "tables",
-       "toDF",
        "uncacheTable")
 
 export("sparkRSQL.init",

http://git-wip-us.apache.org/repos/asf/spark/blob/4b91e18d/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 56c305d..47d92f1 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -272,7 +272,7 @@ setMethod("names",
 setMethod("registerTempTable",
           signature(x = "DataFrame", tableName = "character"),
           function(x, tableName) {
-              callJMethod(x@sdf, "registerTempTable", tableName)
+              invisible(callJMethod(x@sdf, "registerTempTable", tableName))
           })
 
 #' insertInto

http://git-wip-us.apache.org/repos/asf/spark/blob/4b91e18d/examples/src/main/r/dataframe.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/dataframe.R b/examples/src/main/r/dataframe.R
new file mode 100644
index 0000000..53b8171
--- /dev/null
+++ b/examples/src/main/r/dataframe.R
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(SparkR)
+
+# Initialize SparkContext and SQLContext
+sc <- sparkR.init(appName="SparkR-DataFrame-example")
+sqlContext <- sparkRSQL.init(sc)
+
+# Create a simple local data.frame
+localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))
+
+# Convert local data frame to a SparkR DataFrame
+df <- createDataFrame(sqlContext, localDF)
+
+# Print its schema
+printSchema(df)
+# root
+#  |-- name: string (nullable = true)
+#  |-- age: double (nullable = true)
+
+# Create a DataFrame from a JSON file
+path <- file.path(Sys.getenv("SPARK_HOME"), 
"examples/src/main/resources/people.json")
+peopleDF <- jsonFile(sqlContext, path)
+printSchema(peopleDF)
+
+# Register this DataFrame as a table.
+registerTempTable(peopleDF, "people")
+
+# SQL statements can be run by using the sql methods provided by sqlContext
+teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age 
<= 19")
+
+# Call collect to get a local data.frame
+teenagersLocalDF <- collect(teenagers)
+
+# Print the teenagers in our dataset 
+print(teenagersLocalDF)
+
+# Stop the SparkContext now
+sparkR.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/4b91e18d/examples/src/main/r/kmeans.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/kmeans.R b/examples/src/main/r/kmeans.R
deleted file mode 100644
index 6e6b5cb..0000000
--- a/examples/src/main/r/kmeans.R
+++ /dev/null
@@ -1,93 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-# Logistic regression in Spark.
-# Note: unlike the example in Scala, a point here is represented as a vector of
-# doubles.
-
-parseVectors <-  function(lines) {
-  lines <- strsplit(as.character(lines) , " ", fixed = TRUE)
-  list(matrix(as.numeric(unlist(lines)), ncol = length(lines[[1]])))
-}
-
-dist.fun <- function(P, C) {
-  apply(
-    C,
-    1, 
-    function(x) { 
-      colSums((t(P) - x)^2)
-    }
-  )
-}
-
-closestPoint <-  function(P, C) {
-  max.col(-dist.fun(P, C))
-}
-# Main program
-
-args <- commandArgs(trailing = TRUE) 
-
-if (length(args) != 3) {
-  print("Usage: kmeans <file> <K> <convergeDist>")
-  q("no")
-}
-
-sc <- sparkR.init(appName = "RKMeans")
-K <- as.integer(args[[2]])
-convergeDist <- as.double(args[[3]])
-
-lines <- textFile(sc, args[[1]])
-points <- cache(lapplyPartition(lines, parseVectors))
-# kPoints <- take(points, K)
-kPoints <- do.call(rbind, takeSample(points, FALSE, K, 16189L))
-tempDist <- 1.0
-
-while (tempDist > convergeDist) {
-  closest <- lapplyPartition(
-    lapply(points,
-           function(p) {
-             cp <- closestPoint(p, kPoints); 
-             mapply(list, unique(cp), split.data.frame(cbind(1, p), cp), 
SIMPLIFY=FALSE)
-           }),
-    function(x) {do.call(c, x)
-    })
-  
-  pointStats <- reduceByKey(closest,
-                            function(p1, p2) {
-                              t(colSums(rbind(p1, p2)))
-                            },
-                            2L)
-  
-  newPoints <- do.call(
-    rbind,
-    collect(lapply(pointStats,
-                   function(tup) {
-                     point.sum <- tup[[2]][, -1]
-                     point.count <- tup[[2]][, 1]
-                     point.sum/point.count
-                   })))
-  
-  D <- dist.fun(kPoints, newPoints)
-  tempDist <- sum(D[cbind(1:3, max.col(-D))])
-  kPoints <- newPoints
-  cat("Finished iteration (delta = ", tempDist, ")\n")
-}
-
-cat("Final centers:\n")
-writeLines(unlist(lapply(kPoints, paste, collapse = " ")))

http://git-wip-us.apache.org/repos/asf/spark/blob/4b91e18d/examples/src/main/r/linear_solver_mnist.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/linear_solver_mnist.R 
b/examples/src/main/r/linear_solver_mnist.R
deleted file mode 100644
index c864a42..0000000
--- a/examples/src/main/r/linear_solver_mnist.R
+++ /dev/null
@@ -1,107 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Instructions: 
https://github.com/amplab-extras/SparkR-pkg/wiki/SparkR-Example:-Digit-Recognition-on-EC2
-
-library(SparkR)
-library(Matrix)
-
-args <- commandArgs(trailing = TRUE)
-
-# number of random features; default to 1100
-D <- ifelse(length(args) > 0, as.integer(args[[1]]), 1100)
-# number of partitions for training dataset
-trainParts <- 12
-# dimension of digits
-d <- 784
-# number of test examples
-NTrain <- 60000
-# number of training examples
-NTest <- 10000
-# scale of features
-gamma <- 4e-4
-
-sc <- sparkR.init(appName = "SparkR-LinearSolver")
-
-# You can also use HDFS path to speed things up:
-# hdfs://<master>/train-mnist-dense-with-labels.data
-file <- textFile(sc, "/data/train-mnist-dense-with-labels.data", trainParts)
-
-W <- gamma * matrix(nrow=D, ncol=d, data=rnorm(D*d))
-b <- 2 * pi * matrix(nrow=D, ncol=1, data=runif(D))
-broadcastW <- broadcast(sc, W)
-broadcastB <- broadcast(sc, b)
-
-includePackage(sc, Matrix)
-numericLines <- lapplyPartitionsWithIndex(file,
-                       function(split, part) {
-                         matList <- sapply(part, function(line) {
-                           as.numeric(strsplit(line, ",", fixed=TRUE)[[1]])
-                         }, simplify=FALSE)
-                         mat <- Matrix(ncol=d+1, data=unlist(matList, F, F),
-                                       sparse=T, byrow=T)
-                         mat
-                       })
-
-featureLabels <- cache(lapplyPartition(
-    numericLines,
-    function(part) {
-      label <- part[,1]
-      mat <- part[,-1]
-      ones <- rep(1, nrow(mat))
-      features <- cos(
-        mat %*% t(value(broadcastW)) + (matrix(ncol=1, data=ones) %*% 
t(value(broadcastB))))
-      onesMat <- Matrix(ones)
-      featuresPlus <- cBind(features, onesMat)
-      labels <- matrix(nrow=nrow(mat), ncol=10, data=-1)
-      for (i in 1:nrow(mat)) {
-        labels[i, label[i]] <- 1
-      }
-      list(label=labels, features=featuresPlus)
-  }))
-
-FTF <- Reduce("+", collect(lapplyPartition(featureLabels,
-    function(part) {
-      t(part$features) %*% part$features
-    }), flatten=F))
-
-FTY <- Reduce("+", collect(lapplyPartition(featureLabels,
-    function(part) {
-      t(part$features) %*% part$label
-    }), flatten=F))
-
-# solve for the coefficient matrix
-C <- solve(FTF, FTY)
-
-test <- Matrix(as.matrix(read.csv("/data/test-mnist-dense-with-labels.data",
-                         header=F), sparse=T))
-testData <- test[,-1]
-testLabels <- matrix(ncol=1, test[,1])
-
-err <- 0
-
-# contstruct the feature maps for all examples from this digit
-featuresTest <- cos(testData %*% t(value(broadcastW)) +
-    (matrix(ncol=1, data=rep(1, NTest)) %*% t(value(broadcastB))))
-featuresTest <- cBind(featuresTest, Matrix(rep(1, NTest)))
-
-# extract the one vs. all assignment
-results <- featuresTest %*% C
-labelsGot <- apply(results, 1, which.max)
-err <- sum(testLabels != labelsGot) / nrow(testLabels)
-
-cat("\nFinished running. The error rate is: ", err, ".\n")

http://git-wip-us.apache.org/repos/asf/spark/blob/4b91e18d/examples/src/main/r/logistic_regression.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/logistic_regression.R 
b/examples/src/main/r/logistic_regression.R
deleted file mode 100644
index 2a86aa9..0000000
--- a/examples/src/main/r/logistic_regression.R
+++ /dev/null
@@ -1,62 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-args <- commandArgs(trailing = TRUE)
-
-if (length(args) != 3) {
-  print("Usage: logistic_regression <file> <iters> <dimension>")
-  q("no")
-}
-
-# Initialize Spark context
-sc <- sparkR.init(appName = "LogisticRegressionR")
-iterations <- as.integer(args[[2]])
-D <- as.integer(args[[3]])
-
-readPartition <- function(part){
-  part = strsplit(part, " ", fixed = T)
-  list(matrix(as.numeric(unlist(part)), ncol = length(part[[1]])))
-}
-
-# Read data points and convert each partition to a matrix
-points <- cache(lapplyPartition(textFile(sc, args[[1]]), readPartition))
-
-# Initialize w to a random value
-w <- runif(n=D, min = -1, max = 1)
-cat("Initial w: ", w, "\n")
-
-# Compute logistic regression gradient for a matrix of data points
-gradient <- function(partition) {
-  partition = partition[[1]]
-  Y <- partition[, 1]  # point labels (first column of input file)
-  X <- partition[, -1] # point coordinates
-
-  # For each point (x, y), compute gradient function
-  dot <- X %*% w
-  logit <- 1 / (1 + exp(-Y * dot))
-  grad <- t(X) %*% ((logit - 1) * Y)
-  list(grad)
-}
-
-for (i in 1:iterations) {
-  cat("On iteration ", i, "\n")
-  w <- w - reduce(lapplyPartition(points, gradient), "+")
-}
-
-cat("Final w: ", w, "\n")

http://git-wip-us.apache.org/repos/asf/spark/blob/4b91e18d/examples/src/main/r/pi.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/pi.R b/examples/src/main/r/pi.R
deleted file mode 100644
index aa7a833..0000000
--- a/examples/src/main/r/pi.R
+++ /dev/null
@@ -1,46 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-args <- commandArgs(trailing = TRUE)
-
-sc <- sparkR.init(appName = "PiR")
-
-slices <- ifelse(length(args) > 1, as.integer(args[[2]]), 2)
-
-n <- 100000 * slices
-
-piFunc <- function(elem) {
-  rands <- runif(n = 2, min = -1, max = 1)
-  val <- ifelse((rands[1]^2 + rands[2]^2) < 1, 1.0, 0.0)
-  val
-}
-
-
-piFuncVec <- function(elems) {
-  message(length(elems))
-  rands1 <- runif(n = length(elems), min = -1, max = 1)
-  rands2 <- runif(n = length(elems), min = -1, max = 1)
-  val <- ifelse((rands1^2 + rands2^2) < 1, 1.0, 0.0)
-  sum(val)
-}
-
-rdd <- parallelize(sc, 1:n, slices)
-count <- reduce(lapplyPartition(rdd, piFuncVec), sum)
-cat("Pi is roughly", 4.0 * count / n, "\n")
-cat("Num elements in RDD ", count(rdd), "\n")

http://git-wip-us.apache.org/repos/asf/spark/blob/4b91e18d/examples/src/main/r/wordcount.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/wordcount.R b/examples/src/main/r/wordcount.R
deleted file mode 100644
index b734cb0..0000000
--- a/examples/src/main/r/wordcount.R
+++ /dev/null
@@ -1,42 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(SparkR)
-
-args <- commandArgs(trailing = TRUE)
-
-if (length(args) != 1) {
-  print("Usage: wordcount <file>")
-  q("no")
-}
-
-# Initialize Spark context
-sc <- sparkR.init(appName = "RwordCount")
-lines <- textFile(sc, args[[1]])
-
-words <- flatMap(lines,
-                 function(line) {
-                   strsplit(line, " ")[[1]]
-                 })
-wordCount <- lapply(words, function(word) { list(word, 1L) })
-
-counts <- reduceByKey(wordCount, "+", 2L)
-output <- collect(counts)
-
-for (wordcount in output) {
-  cat(wordcount[[1]], ": ", wordcount[[2]], "\n")
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to