This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push: new e7f23e0 [LIVY-547][SERVER] Livy kills session after livy.server.session.timeout even if the session is active e7f23e0 is described below commit e7f23e06606ff79aec1c22e79a96f959cb89a8be Author: Shanyu Zhao <shz...@microsoft.com> AuthorDate: Fri Aug 9 14:51:08 2019 +0800 [LIVY-547][SERVER] Livy kills session after livy.server.session.timeout even if the session is active ## What changes were proposed in this pull request? Add a new configuration: livy.server.session.timeout-check.skip-busy To indicate whether or not to skip timeout check for a busy session. It defaults to false for backward compatibility. https://issues.apache.org/jira/browse/LIVY-547 ## How was this patch tested? Manually tested the configuration. Author: Shanyu Zhao <shz...@microsoft.com> Closes #190 from shanyu/shanyu-547. --- conf/livy.conf.template | 6 +++++- server/src/main/scala/org/apache/livy/LivyConf.scala | 3 +++ .../org/apache/livy/sessions/SessionManager.scala | 4 ++++ .../scala/org/apache/livy/sessions/MockSession.scala | 3 ++- .../apache/livy/sessions/SessionManagerSpec.scala | 20 ++++++++++++++++++-- 5 files changed, 32 insertions(+), 4 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 2590e87..de7c248 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -50,8 +50,12 @@ # Enabled to check whether timeout Livy sessions should be stopped. # livy.server.session.timeout-check = true +# +# Whether or not to skip timeout check for a busy session +# livy.server.session.timeout-check.skip-busy = false -# Time in milliseconds on how long Livy will wait before timing out an idle session. +# Time in milliseconds on how long Livy will wait before timing out an inactive session. +# Note that the inactive session could be busy running jobs. # livy.server.session.timeout = 1h # # How long a finished session state should be kept in LivyServer for query. diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 32b3522..dec8e4a 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -216,6 +216,9 @@ object LivyConf { // Whether session timeout should be checked, by default it will be checked, which means inactive // session will be stopped after "livy.server.session.timeout" val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true) + // Whether session timeout check should skip busy sessions, if set to true, then busy sessions + // that have jobs running will never timeout. + val SESSION_TIMEOUT_CHECK_SKIP_BUSY = Entry("livy.server.session.timeout-check.skip-busy", false) // How long will an inactive session be gc-ed. val SESSION_TIMEOUT = Entry("livy.server.session.timeout", "1h") // How long a finished session state will be kept in memory diff --git a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala index a63cab3..f8f98a2 100644 --- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala +++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala @@ -77,6 +77,8 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( private[this] final val sessionTimeoutCheck = livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK) + private[this] final val sessionTimeoutCheckSkipBusy = + livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK_SKIP_BUSY) private[this] final val sessionTimeout = TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_TIMEOUT)) private[this] final val sessionStateRetainedInSec = @@ -153,6 +155,8 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( case _ => if (!sessionTimeoutCheck) { false + } else if (session.state == SessionState.Busy && sessionTimeoutCheckSkipBusy) { + false } else if (session.isInstanceOf[BatchSession]) { false } else { diff --git a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala index 3d0cc26..ddcbd4b 100644 --- a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala +++ b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala @@ -31,7 +31,8 @@ class MockSession(id: Int, owner: String, conf: LivyConf, name: Option[String] = override def logLines(): IndexedSeq[String] = IndexedSeq() - override def state: SessionState = SessionState.Idle + var serverState: SessionState = SessionState.Idle + override def state: SessionState = serverState override def recoveryMetadata: RecoveryMetadata = RecoveryMetadata(0) } diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala index 193d95b..a5e9ffa 100644 --- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala @@ -36,8 +36,8 @@ import org.apache.livy.sessions.Session.RecoveryMetadata class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite { implicit def executor: ExecutionContext = ExecutionContext.global - private def createSessionManager(): (LivyConf, SessionManager[MockSession, RecoveryMetadata]) = { - val livyConf = new LivyConf() + private def createSessionManager(livyConf: LivyConf = new LivyConf()) + : (LivyConf, SessionManager[MockSession, RecoveryMetadata]) = { livyConf.set(LivyConf.SESSION_TIMEOUT, "100ms") val manager = new SessionManager[MockSession, RecoveryMetadata]( livyConf, @@ -59,6 +59,22 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit } } + it("should not garbage collect busy sessions if skip-busy configured") { + val lc = new LivyConf() + lc.set(LivyConf.SESSION_TIMEOUT_CHECK_SKIP_BUSY, true) + val (livyConf, manager) = createSessionManager(lc) + val session1 = manager.register(new MockSession(manager.nextId(), null, livyConf)) + val session2 = manager.register(new MockSession(manager.nextId(), null, livyConf)) + manager.get(session1.id).isDefined should be(true) + manager.get(session2.id).isDefined should be(true) + session2.serverState = SessionState.Busy + eventually(timeout(5 seconds), interval(100 millis)) { + Await.result(manager.collectGarbage(), Duration.Inf) + (manager.get(session1.id).isDefined, manager.get(session2.id).isDefined) should + be (false, true) + } + } + it("should create sessions with names") { val (livyConf, manager) = createSessionManager() val name = "Mock-session"