[1/2] spark git commit: [SPARKR] Match pyspark features in SparkR communication protocol.
Repository: spark Updated Branches: refs/heads/branch-2.3 eab10f994 -> 16cd9ac52 [SPARKR] Match pyspark features in SparkR communication protocol. (cherry picked from commit 628c7b517969c4a7ccb26ea67ab3dd61266073ca) Signed-off-by: Marcelo VanzinProject: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16cd9ac5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16cd9ac5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16cd9ac5 Branch: refs/heads/branch-2.3 Commit: 16cd9ac5264831e061c033b26fe1173ebc88e5d1 Parents: 323dc3a Author: Marcelo Vanzin Authored: Tue Apr 17 13:29:43 2018 -0700 Committer: Marcelo Vanzin Committed: Thu May 10 10:47:37 2018 -0700 -- R/pkg/R/client.R| 4 +- R/pkg/R/deserialize.R | 10 ++-- R/pkg/R/sparkR.R| 39 -- R/pkg/inst/worker/daemon.R | 4 +- R/pkg/inst/worker/worker.R | 5 +- .../org/apache/spark/api/r/RAuthHelper.scala| 38 ++ .../scala/org/apache/spark/api/r/RBackend.scala | 43 --- .../spark/api/r/RBackendAuthHandler.scala | 55 .../scala/org/apache/spark/api/r/RRunner.scala | 35 + .../scala/org/apache/spark/deploy/RRunner.scala | 6 ++- 10 files changed, 210 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/16cd9ac5/R/pkg/R/client.R -- diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R index 9d82814..7244cc9 100644 --- a/R/pkg/R/client.R +++ b/R/pkg/R/client.R @@ -19,7 +19,7 @@ # Creates a SparkR client connection object # if one doesn't already exist -connectBackend <- function(hostname, port, timeout) { +connectBackend <- function(hostname, port, timeout, authSecret) { if (exists(".sparkRcon", envir = .sparkREnv)) { if (isOpen(.sparkREnv[[".sparkRCon"]])) { cat("SparkRBackend client connection already exists\n") @@ -29,7 +29,7 @@ connectBackend <- function(hostname, port, timeout) { con <- socketConnection(host = hostname, port = port, server = FALSE, blocking = TRUE, open = "wb", timeout = timeout) - + doServerAuth(con, authSecret) assign(".sparkRCon", con, envir = .sparkREnv) con } http://git-wip-us.apache.org/repos/asf/spark/blob/16cd9ac5/R/pkg/R/deserialize.R -- diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R index a90f7d3..cb03f16 100644 --- a/R/pkg/R/deserialize.R +++ b/R/pkg/R/deserialize.R @@ -60,14 +60,18 @@ readTypedObject <- function(con, type) { stop(paste("Unsupported type for deserialization", type))) } -readString <- function(con) { - stringLen <- readInt(con) - raw <- readBin(con, raw(), stringLen, endian = "big") +readStringData <- function(con, len) { + raw <- readBin(con, raw(), len, endian = "big") string <- rawToChar(raw) Encoding(string) <- "UTF-8" string } +readString <- function(con) { + stringLen <- readInt(con) + readStringData(con, stringLen) +} + readInt <- function(con) { readBin(con, integer(), n = 1, endian = "big") } http://git-wip-us.apache.org/repos/asf/spark/blob/16cd9ac5/R/pkg/R/sparkR.R -- diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 965471f..7430d84 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -161,6 +161,10 @@ sparkR.sparkContext <- function( " please use the --packages commandline instead", sep = ",")) } backendPort <- existingPort +authSecret <- Sys.getenv("SPARKR_BACKEND_AUTH_SECRET") +if (nchar(authSecret) == 0) { + stop("Auth secret not provided in environment.") +} } else { path <- tempfile(pattern = "backend_port") submitOps <- getClientModeSparkSubmitOpts( @@ -189,16 +193,27 @@ sparkR.sparkContext <- function( monitorPort <- readInt(f) rLibPath <- readString(f) connectionTimeout <- readInt(f) + +# Don't use readString() so that we can provide a useful +# error message if the R and Java versions are mismatched. +authSecretLen = readInt(f) +if (length(authSecretLen) == 0 || authSecretLen == 0) { + stop("Unexpected EOF in JVM connection data. Mismatched versions?") +} +authSecret <- readStringData(f, authSecretLen) close(f) file.remove(path) if (length(backendPort) == 0 || backendPort == 0 || length(monitorPort) == 0 || monitorPort == 0 || -length(rLibPath) != 1) { +length(rLibPath) != 1 || length(authSecret) == 0) { stop("JVM failed
[1/2] spark git commit: [SPARKR] Match pyspark features in SparkR communication protocol.
Repository: spark Updated Branches: refs/heads/master 94155d039 -> 628c7b517 [SPARKR] Match pyspark features in SparkR communication protocol. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/628c7b51 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/628c7b51 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/628c7b51 Branch: refs/heads/master Commit: 628c7b517969c4a7ccb26ea67ab3dd61266073ca Parents: cc613b5 Author: Marcelo VanzinAuthored: Tue Apr 17 13:29:43 2018 -0700 Committer: Marcelo Vanzin Committed: Wed May 9 10:47:35 2018 -0700 -- R/pkg/R/client.R| 4 +- R/pkg/R/deserialize.R | 10 ++-- R/pkg/R/sparkR.R| 39 -- R/pkg/inst/worker/daemon.R | 4 +- R/pkg/inst/worker/worker.R | 5 +- .../org/apache/spark/api/r/RAuthHelper.scala| 38 ++ .../scala/org/apache/spark/api/r/RBackend.scala | 43 --- .../spark/api/r/RBackendAuthHandler.scala | 55 .../scala/org/apache/spark/api/r/RRunner.scala | 35 + .../scala/org/apache/spark/deploy/RRunner.scala | 6 ++- 10 files changed, 210 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/628c7b51/R/pkg/R/client.R -- diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R index 9d82814..7244cc9 100644 --- a/R/pkg/R/client.R +++ b/R/pkg/R/client.R @@ -19,7 +19,7 @@ # Creates a SparkR client connection object # if one doesn't already exist -connectBackend <- function(hostname, port, timeout) { +connectBackend <- function(hostname, port, timeout, authSecret) { if (exists(".sparkRcon", envir = .sparkREnv)) { if (isOpen(.sparkREnv[[".sparkRCon"]])) { cat("SparkRBackend client connection already exists\n") @@ -29,7 +29,7 @@ connectBackend <- function(hostname, port, timeout) { con <- socketConnection(host = hostname, port = port, server = FALSE, blocking = TRUE, open = "wb", timeout = timeout) - + doServerAuth(con, authSecret) assign(".sparkRCon", con, envir = .sparkREnv) con } http://git-wip-us.apache.org/repos/asf/spark/blob/628c7b51/R/pkg/R/deserialize.R -- diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R index a90f7d3..cb03f16 100644 --- a/R/pkg/R/deserialize.R +++ b/R/pkg/R/deserialize.R @@ -60,14 +60,18 @@ readTypedObject <- function(con, type) { stop(paste("Unsupported type for deserialization", type))) } -readString <- function(con) { - stringLen <- readInt(con) - raw <- readBin(con, raw(), stringLen, endian = "big") +readStringData <- function(con, len) { + raw <- readBin(con, raw(), len, endian = "big") string <- rawToChar(raw) Encoding(string) <- "UTF-8" string } +readString <- function(con) { + stringLen <- readInt(con) + readStringData(con, stringLen) +} + readInt <- function(con) { readBin(con, integer(), n = 1, endian = "big") } http://git-wip-us.apache.org/repos/asf/spark/blob/628c7b51/R/pkg/R/sparkR.R -- diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index a480ac6..38ee794 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -158,6 +158,10 @@ sparkR.sparkContext <- function( " please use the --packages commandline instead", sep = ",")) } backendPort <- existingPort +authSecret <- Sys.getenv("SPARKR_BACKEND_AUTH_SECRET") +if (nchar(authSecret) == 0) { + stop("Auth secret not provided in environment.") +} } else { path <- tempfile(pattern = "backend_port") submitOps <- getClientModeSparkSubmitOpts( @@ -186,16 +190,27 @@ sparkR.sparkContext <- function( monitorPort <- readInt(f) rLibPath <- readString(f) connectionTimeout <- readInt(f) + +# Don't use readString() so that we can provide a useful +# error message if the R and Java versions are mismatched. +authSecretLen = readInt(f) +if (length(authSecretLen) == 0 || authSecretLen == 0) { + stop("Unexpected EOF in JVM connection data. Mismatched versions?") +} +authSecret <- readStringData(f, authSecretLen) close(f) file.remove(path) if (length(backendPort) == 0 || backendPort == 0 || length(monitorPort) == 0 || monitorPort == 0 || -length(rLibPath) != 1) { +length(rLibPath) != 1 || length(authSecret) == 0) { stop("JVM failed to launch") } -assign(".monitorConn", - socketConnection(port = monitorPort, timeout = connectionTimeout), -