[1/2] spark git commit: [SPARKR] Match pyspark features in SparkR communication protocol.

2018-05-10 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 eab10f994 -> 16cd9ac52


[SPARKR] Match pyspark features in SparkR communication protocol.

(cherry picked from commit 628c7b517969c4a7ccb26ea67ab3dd61266073ca)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16cd9ac5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16cd9ac5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16cd9ac5

Branch: refs/heads/branch-2.3
Commit: 16cd9ac5264831e061c033b26fe1173ebc88e5d1
Parents: 323dc3a
Author: Marcelo Vanzin 
Authored: Tue Apr 17 13:29:43 2018 -0700
Committer: Marcelo Vanzin 
Committed: Thu May 10 10:47:37 2018 -0700

--
 R/pkg/R/client.R|  4 +-
 R/pkg/R/deserialize.R   | 10 ++--
 R/pkg/R/sparkR.R| 39 --
 R/pkg/inst/worker/daemon.R  |  4 +-
 R/pkg/inst/worker/worker.R  |  5 +-
 .../org/apache/spark/api/r/RAuthHelper.scala| 38 ++
 .../scala/org/apache/spark/api/r/RBackend.scala | 43 ---
 .../spark/api/r/RBackendAuthHandler.scala   | 55 
 .../scala/org/apache/spark/api/r/RRunner.scala  | 35 +
 .../scala/org/apache/spark/deploy/RRunner.scala |  6 ++-
 10 files changed, 210 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/16cd9ac5/R/pkg/R/client.R
--
diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R
index 9d82814..7244cc9 100644
--- a/R/pkg/R/client.R
+++ b/R/pkg/R/client.R
@@ -19,7 +19,7 @@
 
 # Creates a SparkR client connection object
 # if one doesn't already exist
-connectBackend <- function(hostname, port, timeout) {
+connectBackend <- function(hostname, port, timeout, authSecret) {
   if (exists(".sparkRcon", envir = .sparkREnv)) {
 if (isOpen(.sparkREnv[[".sparkRCon"]])) {
   cat("SparkRBackend client connection already exists\n")
@@ -29,7 +29,7 @@ connectBackend <- function(hostname, port, timeout) {
 
   con <- socketConnection(host = hostname, port = port, server = FALSE,
   blocking = TRUE, open = "wb", timeout = timeout)
-
+  doServerAuth(con, authSecret)
   assign(".sparkRCon", con, envir = .sparkREnv)
   con
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/16cd9ac5/R/pkg/R/deserialize.R
--
diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R
index a90f7d3..cb03f16 100644
--- a/R/pkg/R/deserialize.R
+++ b/R/pkg/R/deserialize.R
@@ -60,14 +60,18 @@ readTypedObject <- function(con, type) {
 stop(paste("Unsupported type for deserialization", type)))
 }
 
-readString <- function(con) {
-  stringLen <- readInt(con)
-  raw <- readBin(con, raw(), stringLen, endian = "big")
+readStringData <- function(con, len) {
+  raw <- readBin(con, raw(), len, endian = "big")
   string <- rawToChar(raw)
   Encoding(string) <- "UTF-8"
   string
 }
 
+readString <- function(con) {
+  stringLen <- readInt(con)
+  readStringData(con, stringLen)
+}
+
 readInt <- function(con) {
   readBin(con, integer(), n = 1, endian = "big")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/16cd9ac5/R/pkg/R/sparkR.R
--
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 965471f..7430d84 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -161,6 +161,10 @@ sparkR.sparkContext <- function(
 " please use the --packages commandline instead", sep = 
","))
 }
 backendPort <- existingPort
+authSecret <- Sys.getenv("SPARKR_BACKEND_AUTH_SECRET")
+if (nchar(authSecret) == 0) {
+  stop("Auth secret not provided in environment.")
+}
   } else {
 path <- tempfile(pattern = "backend_port")
 submitOps <- getClientModeSparkSubmitOpts(
@@ -189,16 +193,27 @@ sparkR.sparkContext <- function(
 monitorPort <- readInt(f)
 rLibPath <- readString(f)
 connectionTimeout <- readInt(f)
+
+# Don't use readString() so that we can provide a useful
+# error message if the R and Java versions are mismatched.
+authSecretLen = readInt(f)
+if (length(authSecretLen) == 0 || authSecretLen == 0) {
+  stop("Unexpected EOF in JVM connection data. Mismatched versions?")
+}
+authSecret <- readStringData(f, authSecretLen)
 close(f)
 file.remove(path)
 if (length(backendPort) == 0 || backendPort == 0 ||
 length(monitorPort) == 0 || monitorPort == 0 ||
-length(rLibPath) != 1) {
+length(rLibPath) != 1 || length(authSecret) == 0) {
   stop("JVM failed 

[1/2] spark git commit: [SPARKR] Match pyspark features in SparkR communication protocol.

2018-05-09 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 94155d039 -> 628c7b517


[SPARKR] Match pyspark features in SparkR communication protocol.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/628c7b51
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/628c7b51
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/628c7b51

Branch: refs/heads/master
Commit: 628c7b517969c4a7ccb26ea67ab3dd61266073ca
Parents: cc613b5
Author: Marcelo Vanzin 
Authored: Tue Apr 17 13:29:43 2018 -0700
Committer: Marcelo Vanzin 
Committed: Wed May 9 10:47:35 2018 -0700

--
 R/pkg/R/client.R|  4 +-
 R/pkg/R/deserialize.R   | 10 ++--
 R/pkg/R/sparkR.R| 39 --
 R/pkg/inst/worker/daemon.R  |  4 +-
 R/pkg/inst/worker/worker.R  |  5 +-
 .../org/apache/spark/api/r/RAuthHelper.scala| 38 ++
 .../scala/org/apache/spark/api/r/RBackend.scala | 43 ---
 .../spark/api/r/RBackendAuthHandler.scala   | 55 
 .../scala/org/apache/spark/api/r/RRunner.scala  | 35 +
 .../scala/org/apache/spark/deploy/RRunner.scala |  6 ++-
 10 files changed, 210 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/628c7b51/R/pkg/R/client.R
--
diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R
index 9d82814..7244cc9 100644
--- a/R/pkg/R/client.R
+++ b/R/pkg/R/client.R
@@ -19,7 +19,7 @@
 
 # Creates a SparkR client connection object
 # if one doesn't already exist
-connectBackend <- function(hostname, port, timeout) {
+connectBackend <- function(hostname, port, timeout, authSecret) {
   if (exists(".sparkRcon", envir = .sparkREnv)) {
 if (isOpen(.sparkREnv[[".sparkRCon"]])) {
   cat("SparkRBackend client connection already exists\n")
@@ -29,7 +29,7 @@ connectBackend <- function(hostname, port, timeout) {
 
   con <- socketConnection(host = hostname, port = port, server = FALSE,
   blocking = TRUE, open = "wb", timeout = timeout)
-
+  doServerAuth(con, authSecret)
   assign(".sparkRCon", con, envir = .sparkREnv)
   con
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/628c7b51/R/pkg/R/deserialize.R
--
diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R
index a90f7d3..cb03f16 100644
--- a/R/pkg/R/deserialize.R
+++ b/R/pkg/R/deserialize.R
@@ -60,14 +60,18 @@ readTypedObject <- function(con, type) {
 stop(paste("Unsupported type for deserialization", type)))
 }
 
-readString <- function(con) {
-  stringLen <- readInt(con)
-  raw <- readBin(con, raw(), stringLen, endian = "big")
+readStringData <- function(con, len) {
+  raw <- readBin(con, raw(), len, endian = "big")
   string <- rawToChar(raw)
   Encoding(string) <- "UTF-8"
   string
 }
 
+readString <- function(con) {
+  stringLen <- readInt(con)
+  readStringData(con, stringLen)
+}
+
 readInt <- function(con) {
   readBin(con, integer(), n = 1, endian = "big")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/628c7b51/R/pkg/R/sparkR.R
--
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index a480ac6..38ee794 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -158,6 +158,10 @@ sparkR.sparkContext <- function(
 " please use the --packages commandline instead", sep = 
","))
 }
 backendPort <- existingPort
+authSecret <- Sys.getenv("SPARKR_BACKEND_AUTH_SECRET")
+if (nchar(authSecret) == 0) {
+  stop("Auth secret not provided in environment.")
+}
   } else {
 path <- tempfile(pattern = "backend_port")
 submitOps <- getClientModeSparkSubmitOpts(
@@ -186,16 +190,27 @@ sparkR.sparkContext <- function(
 monitorPort <- readInt(f)
 rLibPath <- readString(f)
 connectionTimeout <- readInt(f)
+
+# Don't use readString() so that we can provide a useful
+# error message if the R and Java versions are mismatched.
+authSecretLen = readInt(f)
+if (length(authSecretLen) == 0 || authSecretLen == 0) {
+  stop("Unexpected EOF in JVM connection data. Mismatched versions?")
+}
+authSecret <- readStringData(f, authSecretLen)
 close(f)
 file.remove(path)
 if (length(backendPort) == 0 || backendPort == 0 ||
 length(monitorPort) == 0 || monitorPort == 0 ||
-length(rLibPath) != 1) {
+length(rLibPath) != 1 || length(authSecret) == 0) {
   stop("JVM failed to launch")
 }
-assign(".monitorConn",
-   socketConnection(port = monitorPort, timeout = connectionTimeout),
-