This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push: new 01da43d [LIVY-620] Spark batch session always ends with success when configuration is master yarn and deploy-mode client 01da43d is described below commit 01da43dba07aee1e4d13a2a19f233a38546ddec0 Author: Gustavo Martin Morcuende <gu.mart...@gmail.com> AuthorDate: Thu Aug 15 11:53:20 2019 +0800 [LIVY-620] Spark batch session always ends with success when configuration is master yarn and deploy-mode client ## What changes were proposed in this pull request? Batch session should end with state dead when process exits with no 0 return code. https://issues.apache.org/jira/browse/LIVY-620 ## How was this patch tested? 1. Unit Test (included in this PR) Submit batch session that runs forever, wait 2 seconds, kill that batch session and expect for state dead. 2. Also currently used in production environment. Author: Gustavo Martin Morcuende <gu.mart...@gmail.com> Closes #192 from gumartinm/master. --- .../apache/livy/server/batch/BatchSession.scala | 11 ++++++- .../livy/server/batch/BatchSessionSpec.scala | 38 +++++++++++++++++++++- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 4b27058..2a55c04 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -28,7 +28,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties import org.apache.livy.{LivyConf, Logging, Utils} import org.apache.livy.server.AccessManager import org.apache.livy.server.recovery.SessionStore -import org.apache.livy.sessions.{Session, SessionState} +import org.apache.livy.sessions.{FinishedSessionState, Session, SessionState} import org.apache.livy.sessions.Session._ import org.apache.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder} @@ -101,6 +101,7 @@ object BatchSession extends Logging { case 0 => case exitCode => warn(s"spark-submit exited with code $exitCode") + s.stateChanged(SparkApp.State.FAILED) } } finally { childProcesses.decrementAndGet() @@ -182,6 +183,14 @@ class BatchSession( override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = { synchronized { debug(s"$this state changed from $oldState to $newState") + if (!_state.isInstanceOf[FinishedSessionState]) { + stateChanged(newState) + } + } + } + + private def stateChanged(newState: SparkApp.State): Unit = { + synchronized { newState match { case SparkApp.State.RUNNING => _state = SessionState.Running diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index 417b627..20b8a81 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -33,7 +33,7 @@ import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf, Utils} import org.apache.livy.server.AccessManager import org.apache.livy.server.recovery.SessionStore import org.apache.livy.sessions.SessionState -import org.apache.livy.utils.{AppInfo, SparkApp} +import org.apache.livy.utils.{AppInfo, Clock, SparkApp} class BatchSessionSpec extends FunSpec @@ -56,6 +56,23 @@ class BatchSessionSpec script } + val runForeverScript: Path = { + val script = Files.createTempFile("livy-test-run-forever-script", ".py") + script.toFile.deleteOnExit() + val writer = new FileWriter(script.toFile) + try { + writer.write( + """ + |import time + |while True: + | time.sleep(1) + """.stripMargin) + } finally { + writer.close() + } + script + } + describe("A Batch process") { var sessionStore: SessionStore = null @@ -102,6 +119,25 @@ class BatchSessionSpec batch.appInfo shouldEqual expectedAppInfo } + it("should end with status dead when batch session exits with no 0 return code") { + val req = new CreateBatchRequest() + req.file = runForeverScript.toString + req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path")) + + val conf = new LivyConf().set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir")) + val accessManager = new AccessManager(conf) + val batch = BatchSession.create(0, None, req, conf, accessManager, null, None, sessionStore) + batch.start() + Clock.sleep(2) + batch.stopSession() + + Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, TimeUnit.SECONDS)) + (batch.state match { + case SessionState.Dead(_) => true + case _ => false + }) should be (true) + } + def testRecoverSession(name: Option[String]): Unit = { val conf = new LivyConf() val req = new CreateBatchRequest()