LIVY-313. Fixed SparkRInterpreter always returning success. (#307) * LIVY-313. Fixed SparkRInterpreter always returning success.
- Stopped redirecting stderr to stdout. - Continue to read ErrorStream (it was only being read once). - Checking for any errors returned by stderr before returning success. * Fixing scalastyle check error * Changing the way errors are handled in SparkRInterpreter * Fixing scalastyle check error * Updating SparkRSessionSpec Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/0de0e286 Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/0de0e286 Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/0de0e286 Branch: refs/heads/master Commit: 0de0e28658d5490e289a290b3cdf8a9f12e19eb0 Parents: 5e6f9ed Author: Jonathan Alter <jonal...@users.noreply.github.com> Authored: Wed Mar 22 23:56:57 2017 -0700 Committer: Jeff Zhang <zjf...@gmail.com> Committed: Thu Mar 23 14:56:57 2017 +0800 ---------------------------------------------------------------------- .../cloudera/livy/repl/SparkRInterpreter.scala | 61 ++++++++++++++------ .../livy/repl/SparkRInterpreterSpec.scala | 13 ++--- .../cloudera/livy/repl/SparkRSessionSpec.scala | 8 +-- 3 files changed, 53 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0de0e286/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala ---------------------------------------------------------------------- diff --git a/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala b/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala index 8e5f3c0..7318b1e 100644 --- a/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala +++ b/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala @@ -37,9 +37,12 @@ import org.json4s.JsonDSL._ import com.cloudera.livy.client.common.ClientConf import com.cloudera.livy.rsc.RSCConf +private case class RequestResponse(content: String, error: Boolean) + // scalastyle:off println object SparkRInterpreter { private val LIVY_END_MARKER = "----LIVY_END_OF_COMMAND----" + private val LIVY_ERROR_MARKER = "----LIVY_END_OF_ERROR----" private val PRINT_MARKER = f"""print("$LIVY_END_MARKER")""" private val EXPECTED_OUTPUT = f"""[1] "$LIVY_END_MARKER"""" @@ -188,18 +191,25 @@ class SparkRInterpreter(process: Process, } try { - var content: JObject = TEXT_PLAIN -> sendRequest(code) - - // If we rendered anything, pass along the last image. - tempFile.foreach { case file => - val bytes = Files.readAllBytes(file) - if (bytes.nonEmpty) { - val image = Base64.encodeBase64String(bytes) - content = content ~ (IMAGE_PNG -> image) + val response = sendRequest(code) + + if (response.error) { + Interpreter.ExecuteError("Error", response.content) + } else { + var content: JObject = TEXT_PLAIN -> response.content + + // If we rendered anything, pass along the last image. + tempFile.foreach { case file => + val bytes = Files.readAllBytes(file) + if (bytes.nonEmpty) { + val image = Base64.encodeBase64String(bytes) + content = content ~ (IMAGE_PNG -> image) + } } + + Interpreter.ExecuteSuccess(content) } - Interpreter.ExecuteSuccess(content) } catch { case e: Error => Interpreter.ExecuteError("Error", e.output) @@ -211,14 +221,16 @@ class SparkRInterpreter(process: Process, } - private def sendRequest(code: String): String = { - stdin.println(s"""try(eval(parse(text="${StringEscapeUtils.escapeJava(code)}")))""") + private def sendRequest(code: String): RequestResponse = { + stdin.println(s"""tryCatch(eval(parse(text="${StringEscapeUtils.escapeJava(code)}")) + |,error = function(e) sprintf("%s%s", e, "${LIVY_ERROR_MARKER}")) + """.stripMargin) stdin.flush() stdin.println(PRINT_MARKER) stdin.flush() - readTo(EXPECTED_OUTPUT) + readTo(EXPECTED_OUTPUT, LIVY_ERROR_MARKER) } override protected def sendShutdownRequest() = { @@ -242,7 +254,10 @@ class SparkRInterpreter(process: Process, } @tailrec - private def readTo(marker: String, output: StringBuilder = StringBuilder.newBuilder): String = { + private def readTo( + marker: String, + errorMarker: String, + output: StringBuilder = StringBuilder.newBuilder): RequestResponse = { var char = readChar(output) // Remove any ANSI color codes which match the pattern "\u001b\\[[0-9;]*[mG]". @@ -259,15 +274,25 @@ class SparkRInterpreter(process: Process, } if (output.endsWith(marker)) { - val result = output.toString() - result.substring(0, result.length - marker.length) - .stripPrefix("\n") - .stripSuffix("\n") + var result = stripMarker(output.toString(), marker) + + if (result.endsWith(errorMarker + "\"")) { + result = stripMarker(result, "\\n" + errorMarker) + RequestResponse(result, error = true) + } else { + RequestResponse(result, error = false) + } } else { - readTo(marker, output) + readTo(marker, errorMarker, output) } } + private def stripMarker(result: String, marker: String): String = { + result.replace(marker, "") + .stripPrefix("\n") + .stripSuffix("\n") + } + private def readChar(output: StringBuilder): Char = { val byte = stdout.read() if (byte == -1) { http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0de0e286/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala ---------------------------------------------------------------------- diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala index e9db106..f4f709f 100644 --- a/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala +++ b/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala @@ -84,19 +84,18 @@ class SparkRInterpreterSpec extends BaseInterpreterSpec { it should "report an error if accessing an unknown variable" in withInterpreter { interpreter => val response = interpreter.execute("x") - response should equal(Interpreter.ExecuteSuccess( - TEXT_PLAIN -> "Error in eval(expr, envir, enclos) : object 'x' not found" + response should equal(Interpreter.ExecuteError( + "Error", + """[1] "Error in eval(expr, envir, enclos): object 'x' not found"""" )) } it should "not hang when executing incomplete statements" in withInterpreter { interpreter => val response = interpreter.execute("x[") - response should equal(Interpreter.ExecuteSuccess( - TEXT_PLAIN -> - """Error in parse(text = "x[") : <text>:2:0: unexpected end of input - |1: x[ - | ^""".stripMargin + response should equal(Interpreter.ExecuteError( + "Error", + """[1] "Error in parse(text = \"x[\"): <text>:2:0: unexpected end of input\n1: x[\n ^"""" )) } http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0de0e286/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala ---------------------------------------------------------------------- diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala index 5592977..4bbf87c 100644 --- a/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala +++ b/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala @@ -133,11 +133,11 @@ class SparkRSessionSpec extends BaseSessionSpec { val result = parse(statement.output) val expectedResult = Extraction.decompose(Map( - "status" -> "ok", + "status" -> "error", "execution_count" -> 0, - "data" -> Map( - "text/plain" -> "Error in eval(expr, envir, enclos) : object 'x' not found" - ) + "ename" -> "Error", + "evalue" -> "[1] \"Error in eval(expr, envir, enclos): object 'x' not found\"", + "traceback" -> List() )) result should equal (expectedResult)