This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push: new 4ec3b9b [LIVY-617] Livy session leak on Yarn when creating session duplicated names 4ec3b9b is described below commit 4ec3b9b47b556390b2f738df62b5b277fa02f6ef Author: Shanyu Zhao <shz...@microsoft.com> AuthorDate: Wed Aug 28 19:04:57 2019 +0800 [LIVY-617] Livy session leak on Yarn when creating session duplicated names ## What changes were proposed in this pull request? When creating a session with duplicated name, instead of throw exception in SessionManager.register() method, we should stop the session. Otherwise the session driver process will keep running and end up creating a leaked Yarn application. https://issues.apache.org/jira/browse/LIVY-617 ## How was this patch tested? This is just a simple fix and verified with manual end to end test. Author: Shanyu Zhao <shz...@microsoft.com> Closes #187 from shanyu/shanyu. --- .../src/main/scala/org/apache/livy/sessions/SessionManager.scala | 9 ++++++++- server/src/test/scala/org/apache/livy/sessions/MockSession.scala | 5 ++++- .../test/scala/org/apache/livy/sessions/SessionManagerSpec.scala | 2 ++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala index f8f98a2..f2548ac 100644 --- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala +++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala @@ -98,7 +98,10 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( synchronized { session.name.foreach { sessionName => if (sessionsByName.contains(sessionName)) { - throw new IllegalArgumentException(s"Duplicate session name: ${session.name}") + val errMsg = s"Duplicate session name: ${session.name} for session ${session.id}" + error(errMsg) + session.stop() + throw new IllegalArgumentException(errMsg) } else { sessionsByName.put(sessionName, session) } @@ -106,6 +109,7 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( sessions.put(session.id, session) session.start() } + info(s"Registered new session ${session.id}") session } @@ -122,6 +126,7 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( } def delete(session: S): Future[Unit] = { + info(s"Deleting session ${session.id}") session.stop().map { case _ => try { sessionStore.remove(sessionType, session.id) @@ -133,6 +138,8 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( case NonFatal(e) => error("Exception was thrown during stop session:", e) throw e + } finally { + info(s"Deleted session ${session.id}") } } } diff --git a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala index ddcbd4b..f9609b1 100644 --- a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala +++ b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala @@ -27,7 +27,10 @@ class MockSession(id: Int, owner: String, conf: LivyConf, name: Option[String] = override def start(): Unit = () - override protected def stopSession(): Unit = () + var stopped = false + override protected def stopSession(): Unit = { + stopped = true + } override def logLines(): IndexedSeq[String] = IndexedSeq() diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala index a5e9ffa..100c756 100644 --- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala @@ -92,6 +92,8 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit an[IllegalArgumentException] should be thrownBy manager.register(session2) manager.get(session1.id).isDefined should be(true) manager.get(session2.id).isDefined should be(false) + assert(!session1.stopped) + assert(session2.stopped) manager.shutdown() }