This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push: new def6318 [LIVY-41] Let users access sessions by session name def6318 is described below commit def6318c84f32a09f219065691857f11ca74e7cb Author: Fathi Salmi, Meisam(mfathisalmi) <mfathisa...@paypal.com> AuthorDate: Tue Feb 5 11:25:06 2019 -0800 [LIVY-41] Let users access sessions by session name This commit enables Livy users to access sessions either by names or by auto-generated sessiond id's. It also prevents users from creating sessions that have the same name. This commit keeps API change minimal. These are places that API change is needed: - `Session` and its sub-classes adds a new field, `name`. - `RecoveryMetadata` and its sub-classes adds a new field, `name`. - `SessionManager` adds a new method `getSession(name: String)` which looks sessions up by name. Task-url: https://issues.apache.org/jira/browse/LIVY-41 Author: Fathi Salmi, Meisam(mfathisalmi) <mfathisa...@paypal.com> Author: Meisam Fathi <meisam.fa...@gmail.com> Author: Fathi Salmi, Meisam <meisam.fa...@gmail.com> Author: Fathi, Meisam <meisam.fa...@gmail.com> Closes #48 from meisam/LIVY-41-rebased. --- .../apache/livy/client/common/HttpMessages.java | 8 +- .../apache/livy/client/http/HttpClientSpec.scala | 1 + .../livy/test/framework/LivyRestClient.scala | 4 + .../test/scala/org/apache/livy/test/BatchIT.scala | 4 +- .../scala/org/apache/livy/test/InteractiveIT.scala | 2 +- .../org/apache/livy/server/SessionServlet.scala | 13 ++- .../apache/livy/server/batch/BatchSession.scala | 20 +++- .../livy/server/batch/BatchSessionServlet.scala | 9 +- .../server/interactive/InteractiveSession.scala | 104 +++++++++++---------- .../interactive/InteractiveSessionServlet.scala | 6 +- .../scala/org/apache/livy/sessions/Session.scala | 16 +++- .../org/apache/livy/sessions/SessionManager.scala | 13 +++ .../apache/livy/server/SessionServletSpec.scala | 4 +- .../livy/server/batch/BatchServletSpec.scala | 60 +++++++----- .../livy/server/batch/BatchSessionSpec.scala | 19 +++- .../interactive/BaseInteractiveServletSpec.scala | 1 + .../InteractiveSessionServletSpec.scala | 12 ++- .../interactive/InteractiveSessionSpec.scala | 32 +++++-- .../server/interactive/SessionHeartbeatSpec.scala | 5 +- .../server/recovery/BlackholeStateStoreSpec.scala | 20 ++++ .../org/apache/livy/sessions/MockSession.scala | 5 +- .../apache/livy/sessions/SessionManagerSpec.scala | 46 +++++++-- .../thriftserver/LivyThriftSessionManager.scala | 1 + 23 files changed, 289 insertions(+), 116 deletions(-) diff --git a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java index b1e253f..2245eb9 100644 --- a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java +++ b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java @@ -53,6 +53,7 @@ public class HttpMessages { public static class SessionInfo implements ClientMessage { public final int id; + public final String name; public final String appId; public final String owner; public final String proxyUser; @@ -61,9 +62,10 @@ public class HttpMessages { public final Map<String, String> appInfo; public final List<String> log; - public SessionInfo(int id, String appId, String owner, String proxyUser, String state, - String kind, Map<String, String> appInfo, List<String> log) { + public SessionInfo(int id, String name, String appId, String owner, String proxyUser, + String state, String kind, Map<String, String> appInfo, List<String> log) { this.id = id; + this.name = name; this.appId = appId; this.owner = owner; this.proxyUser = proxyUser; @@ -74,7 +76,7 @@ public class HttpMessages { } private SessionInfo() { - this(-1, null, null, null, null, null, null, null); + this(-1, null, null, null, null, null, null, null, null); } } diff --git a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala index 837c24c..f53d9f5 100644 --- a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala +++ b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala @@ -273,6 +273,7 @@ private class HttpClientTestBootstrap extends LifeCycle { val session = mock(classOf[InteractiveSession]) val id = sessionManager.nextId() when(session.id).thenReturn(id) + when(session.name).thenReturn(None) when(session.appId).thenReturn(None) when(session.appInfo).thenReturn(AppInfo()) when(session.state).thenReturn(SessionState.Idle) diff --git a/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala b/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala index 1087559..cf68f77 100644 --- a/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala +++ b/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala @@ -249,12 +249,14 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String) } def startBatch( + name: Option[String], file: String, className: Option[String], args: List[String], sparkConf: Map[String, String]): BatchSession = { val r = new CreateBatchRequest() r.file = file + r.name = name r.className = className r.args = args r.conf = Map("spark.yarn.maxAppAttempts" -> "1") ++ sparkConf @@ -264,12 +266,14 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String) } def startSession( + name: Option[String], kind: Kind, sparkConf: Map[String, String], heartbeatTimeoutInSecond: Int): InteractiveSession = { val r = new CreateInteractiveRequest() r.kind = kind r.conf = sparkConf + r.name = name r.heartbeatTimeoutInSecond = heartbeatTimeoutInSecond val id = start(INTERACTIVE_TYPE, mapper.writeValueAsString(r)) diff --git a/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala b/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala index 7c433fb..a6f4e73 100644 --- a/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala +++ b/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala @@ -159,14 +159,14 @@ class BatchIT extends BaseIntegrationTestSuite with BeforeAndAfterAll { private def withScript[R] (scriptPath: String, args: List[String], sparkConf: Map[String, String] = Map.empty) (f: (LivyRestClient#BatchSession) => R): R = { - val s = livyClient.startBatch(scriptPath, None, args, sparkConf) + val s = livyClient.startBatch(None, scriptPath, None, args, sparkConf) withSession(s)(f) } private def withTestLib[R] (testClass: Class[_], args: List[String], sparkConf: Map[String, String] = Map.empty) (f: (LivyRestClient#BatchSession) => R): R = { - val s = livyClient.startBatch(testLibPath, Some(testClass.getName()), args, sparkConf) + val s = livyClient.startBatch(None, testLibPath, Some(testClass.getName()), args, sparkConf) withSession(s)(f) } } diff --git a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala index 689195c..0613bf3 100644 --- a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala +++ b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala @@ -206,7 +206,7 @@ class InteractiveIT extends BaseIntegrationTestSuite { waitForIdle: Boolean = true, heartbeatTimeoutInSecond: Int = 0) (f: (LivyRestClient#InteractiveSession) => R): R = { - withSession(livyClient.startSession(kind, sparkConf, heartbeatTimeoutInSecond)) { s => + withSession(livyClient.startSession(None, kind, sparkConf, heartbeatTimeoutInSecond)) { s => if (waitForIdle) { s.verifySessionIdle() } diff --git a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala index d62a96e..0c52a1e 100644 --- a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala +++ b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala @@ -184,8 +184,15 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata]( private def doWithSession(fn: (S => Any), allowAll: Boolean, checkFn: Option[(String, String) => Boolean]): Any = { - val sessionId = params("id").toInt - sessionManager.get(sessionId) match { + val idOrNameParam: String = params("id") + val session = if (idOrNameParam.forall(_.isDigit)) { + val sessionId = idOrNameParam.toInt + sessionManager.get(sessionId) + } else { + val sessionName = idOrNameParam + sessionManager.get(sessionName) + } + session match { case Some(session) => if (allowAll || checkFn.map(_(session.owner, remoteUser(request))).getOrElse(false)) { fn(session) @@ -193,7 +200,7 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata]( Forbidden() } case None => - NotFound(ResponseMessage(s"Session '$sessionId' not found.")) + NotFound(ResponseMessage(s"Session '$idOrNameParam' not found.")) } } diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index c15057b..3bbd742 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -35,6 +35,7 @@ import org.apache.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessB @JsonIgnoreProperties(ignoreUnknown = true) case class BatchRecoveryMetadata( id: Int, + name: Option[String], appId: Option[String], appTag: String, owner: String, @@ -53,6 +54,7 @@ object BatchSession extends Logging { def create( id: Int, + name: Option[String], request: CreateBatchRequest, livyConf: LivyConf, accessManager: AccessManager, @@ -110,6 +112,7 @@ object BatchSession extends Logging { new BatchSession( id, + name, appTag, SessionState.Starting, livyConf, @@ -126,6 +129,7 @@ object BatchSession extends Logging { mockApp: Option[SparkApp] = None): BatchSession = { new BatchSession( m.id, + m.name, m.appTag, SessionState.Recovering, livyConf, @@ -140,6 +144,7 @@ object BatchSession extends Logging { class BatchSession( id: Int, + name: Option[String], appTag: String, initialState: SessionState, livyConf: LivyConf, @@ -147,20 +152,25 @@ class BatchSession( override val proxyUser: Option[String], sessionStore: SessionStore, sparkApp: BatchSession => SparkApp) - extends Session(id, owner, livyConf) with SparkAppListener { + extends Session(id, name, owner, livyConf) with SparkAppListener { import BatchSession._ protected implicit def executor: ExecutionContextExecutor = ExecutionContext.global private[this] var _state: SessionState = initialState - private val app = sparkApp(this) + + private var app: Option[SparkApp] = None override def state: SessionState = _state - override def logLines(): IndexedSeq[String] = app.log() + override def logLines(): IndexedSeq[String] = app.map(_.log()).getOrElse(IndexedSeq.empty[String]) + + override def start(): Unit = { + app = Option(sparkApp(this)) + } override def stopSession(): Unit = { - app.kill() + app.foreach(_.kill()) } override def appIdKnown(appId: String): Unit = { @@ -187,5 +197,5 @@ class BatchSession( override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo } override def recoveryMetadata: RecoveryMetadata = - BatchRecoveryMetadata(id, appId, appTag, owner, proxyUser) + BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser) } diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala index a069a50..e48ad8f 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala @@ -27,6 +27,7 @@ import org.apache.livy.utils.AppInfo case class BatchSessionView( id: Long, + name: Option[String], state: String, appId: Option[String], appInfo: AppInfo, @@ -42,8 +43,11 @@ class BatchSessionServlet( override protected def createSession(req: HttpServletRequest): BatchSession = { val createRequest = bodyAs[CreateBatchRequest](req) + val sessionId = sessionManager.nextId() + val sessionName = createRequest.name BatchSession.create( - sessionManager.nextId(), + sessionId, + sessionName, createRequest, livyConf, accessManager, @@ -66,7 +70,8 @@ class BatchSessionServlet( } else { Nil } - BatchSessionView(session.id, session.state.toString, session.appId, session.appInfo, logs) + BatchSessionView(session.id, session.name, session.state.toString, session.appId, + session.appInfo, logs) } } diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 3b3095f..0c3a3a8 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.concurrent.Future +import scala.concurrent.{Future, Promise} import scala.concurrent.duration.{Duration, FiniteDuration} import scala.util.{Random, Try} @@ -49,6 +49,7 @@ import org.apache.livy.utils._ @JsonIgnoreProperties(ignoreUnknown = true) case class InteractiveRecoveryMetadata( id: Int, + name: Option[String], appId: Option[String], appTag: String, kind: Kind, @@ -66,6 +67,7 @@ object InteractiveSession extends Logging { def create( id: Int, + name: Option[String], owner: String, livyConf: LivyConf, accessManager: AccessManager, @@ -111,6 +113,7 @@ object InteractiveSession extends Logging { new InteractiveSession( id, + name, None, appTag, client, @@ -137,6 +140,7 @@ object InteractiveSession extends Logging { new InteractiveSession( metadata.id, + metadata.name, metadata.appId, metadata.appTag, client, @@ -347,6 +351,7 @@ object InteractiveSession extends Logging { class InteractiveSession( id: Int, + name: Option[String], appIdHint: Option[String], appTag: String, val client: Option[RSCClient], @@ -358,7 +363,7 @@ class InteractiveSession( override val proxyUser: Option[String], sessionStore: SessionStore, mockApp: Option[SparkApp]) // For unit test. - extends Session(id, owner, livyConf) + extends Session(id, name, owner, livyConf) with SessionHeartbeat with SparkAppListener { @@ -377,69 +382,74 @@ class InteractiveSession( private val sessionSaveLock = new Object() _appId = appIdHint - sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata) - heartbeat() - private val app = mockApp.orElse { - val driverProcess = client.flatMap { c => Option(c.getDriverProcess) } + private var app: Option[SparkApp] = None + + override def start(): Unit = { + sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata) + heartbeat() + app = mockApp.orElse { + val driverProcess = client.flatMap { c => Option(c.getDriverProcess) } .map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))) - driverProcess.map { _ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)) } - } + driverProcess.map { _ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)) } + } - if (client.isEmpty) { - transition(Dead()) - val msg = s"Cannot recover interactive session $id because its RSCDriver URI is unknown." - info(msg) - sessionLog = IndexedSeq(msg) - } else { - val uriFuture = Future { client.get.getServerUri.get() } - - uriFuture onSuccess { case url => - rscDriverUri = Option(url) - sessionSaveLock.synchronized { - sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata) + if (client.isEmpty) { + transition(Dead()) + val msg = s"Cannot recover interactive session $id because its RSCDriver URI is unknown." + info(msg) + sessionLog = IndexedSeq(msg) + } else { + val uriFuture = Future { client.get.getServerUri.get() } + + uriFuture.onSuccess { case url => + rscDriverUri = Option(url) + sessionSaveLock.synchronized { + sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata) + } } - } - uriFuture onFailure { case e => warn("Fail to get rsc uri", e) } + uriFuture.onFailure { case e => warn("Fail to get rsc uri", e) } - // Send a dummy job that will return once the client is ready to be used, and set the - // state to "idle" at that point. - client.get.submit(new PingJob()).addListener(new JobHandle.Listener[Void]() { + // Send a dummy job that will return once the client is ready to be used, and set the + // state to "idle" at that point. + client.get.submit(new PingJob()).addListener(new JobHandle.Listener[Void]() { override def onJobQueued(job: JobHandle[Void]): Unit = { } override def onJobStarted(job: JobHandle[Void]): Unit = { } - override def onJobCancelled(job: JobHandle[Void]): Unit = errorOut() + override def onJobCancelled(job: JobHandle[Void]): Unit = errorOut() - override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit = errorOut() + override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit = errorOut() - override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit = { - transition(SessionState.Running) - info(s"Interactive session $id created [appid: ${appId.orNull}, owner: $owner, proxyUser:" + - s" $proxyUser, state: ${state.toString}, kind: ${kind.toString}, " + - s"info: ${appInfo.asJavaMap}]") - } + override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit = { + transition(SessionState.Running) + info(s"Interactive session $id created [appid: ${appId.orNull}, " + + s"owner: $owner, proxyUser:" + + s" $proxyUser, state: ${state.toString}, kind: ${kind.toString}, " + + s"info: ${appInfo.asJavaMap}]") + } - private def errorOut(): Unit = { - // Other code might call stop() to close the RPC channel. When RPC channel is closing, - // this callback might be triggered. Check and don't call stop() to avoid nested called - // if the session is already shutting down. - if (serverSideState != SessionState.ShuttingDown) { - transition(SessionState.Error()) - stop() - app.foreach { a => - info(s"Failed to ping RSC driver for session $id. Killing application.") - a.kill() + private def errorOut(): Unit = { + // Other code might call stop() to close the RPC channel. When RPC channel is closing, + // this callback might be triggered. Check and don't call stop() to avoid nested called + // if the session is already shutting down. + if (serverSideState != SessionState.ShuttingDown) { + transition(SessionState.Error()) + stop() + app.foreach { a => + info(s"Failed to ping RSC driver for session $id. Killing application.") + a.kill() + } } } - } - }) + }) + } } override def logLines(): IndexedSeq[String] = app.map(_.log()).getOrElse(sessionLog) override def recoveryMetadata: RecoveryMetadata = - InteractiveRecoveryMetadata( - id, appId, appTag, kind, heartbeatTimeout.toSeconds.toInt, owner, proxyUser, rscDriverUri) + InteractiveRecoveryMetadata(id, name, appId, appTag, kind, + heartbeatTimeout.toSeconds.toInt, owner, proxyUser, rscDriverUri) override def state: SessionState = { if (serverSideState == SessionState.Running) { diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala index 7450cd7..dec88a8 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala @@ -54,6 +54,7 @@ class InteractiveSessionServlet( val createRequest = bodyAs[CreateInteractiveRequest](req) InteractiveSession.create( sessionManager.nextId(), + createRequest.name, remoteUser(req), livyConf, accessManager, @@ -79,8 +80,9 @@ class InteractiveSessionServlet( Nil } - new SessionInfo(session.id, session.appId.orNull, session.owner, session.proxyUser.orNull, - session.state.toString, session.kind.toString, session.appInfo.asJavaMap, logs.asJava) + new SessionInfo(session.id, session.name.orNull, session.appId.orNull, session.owner, + session.proxyUser.orNull, session.state.toString, session.kind.toString, + session.appInfo.asJavaMap, logs.asJava) } post("/:id/stop") { diff --git a/server/src/main/scala/org/apache/livy/sessions/Session.scala b/server/src/main/scala/org/apache/livy/sessions/Session.scala index 0b54779..ee14283 100644 --- a/server/src/main/scala/org/apache/livy/sessions/Session.scala +++ b/server/src/main/scala/org/apache/livy/sessions/Session.scala @@ -21,7 +21,6 @@ import java.io.InputStream import java.net.{URI, URISyntaxException} import java.security.PrivilegedExceptionAction import java.util.UUID -import java.util.concurrent.TimeUnit import scala.concurrent.{ExecutionContext, Future} @@ -135,13 +134,24 @@ object Session { } } -abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf) +abstract class Session( + val id: Int, + val name: Option[String], + val owner: String, + val livyConf: LivyConf) extends Logging { import Session._ protected implicit val executionContext = ExecutionContext.global + // validate session name. The name should not be a number + name.foreach { sessionName => + if (sessionName.forall(_.isDigit)) { + throw new IllegalArgumentException(s"Invalid session name: $sessionName") + } + } + protected var _appId: Option[String] = None private var _lastActivity = System.nanoTime() @@ -171,6 +181,8 @@ abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf) def state: SessionState + def start(): Unit + def stop(): Future[Unit] = Future { try { info(s"Stopping $this...") diff --git a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala index 5926071..a63cab3 100644 --- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala +++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala @@ -73,6 +73,8 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( protected[this] final val idCounter = new AtomicInteger(0) protected[this] final val sessions = mutable.LinkedHashMap[Int, S]() + private[this] final val sessionsByName = mutable.HashMap[String, S]() + private[this] final val sessionTimeoutCheck = livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK) private[this] final val sessionTimeout = @@ -92,13 +94,23 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( def register(session: S): S = { info(s"Registering new session ${session.id}") synchronized { + session.name.foreach { sessionName => + if (sessionsByName.contains(sessionName)) { + throw new IllegalArgumentException(s"Duplicate session name: ${session.name}") + } else { + sessionsByName.put(sessionName, session) + } + } sessions.put(session.id, session) + session.start() } session } def get(id: Int): Option[S] = sessions.get(id) + def get(sessionName: String): Option[S] = sessionsByName.get(sessionName) + def size(): Int = sessions.size def all(): Iterable[S] = sessions.values @@ -113,6 +125,7 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( sessionStore.remove(sessionType, session.id) synchronized { sessions.remove(session.id) + session.name.foreach(sessionsByName.remove) } } catch { case NonFatal(e) => diff --git a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala index e0ebd9a..38b79ce 100644 --- a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala @@ -32,7 +32,7 @@ object SessionServletSpec { val PROXY_USER = "proxyUser" class MockSession(id: Int, owner: String, livyConf: LivyConf) - extends Session(id, owner, livyConf) { + extends Session(id, None, owner, livyConf) { case class MockRecoveryMetadata(id: Int) extends RecoveryMetadata() @@ -42,6 +42,8 @@ object SessionServletSpec { override def state: SessionState = SessionState.Idle + override def start(): Unit = () + override protected def stopSession(): Unit = () override def logLines(): IndexedSeq[String] = IndexedSeq("log") diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala index 2c37c19..ed29800 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala @@ -62,6 +62,34 @@ class BatchServletSpec extends BaseSessionServletSpec[BatchSession, BatchRecover accessManager) } + def testShowSessionProperties(name: Option[String]): Unit = { + val id = 0 + val state = SessionState.Running + val appId = "appid" + val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL")) + val log = IndexedSeq[String]("log1", "log2") + + val session = mock[BatchSession] + when(session.id).thenReturn(id) + when(session.name).thenReturn(name) + when(session.state).thenReturn(state) + when(session.appId).thenReturn(Some(appId)) + when(session.appInfo).thenReturn(appInfo) + when(session.logLines()).thenReturn(log) + + val req = mock[HttpServletRequest] + + val view = servlet.asInstanceOf[BatchSessionServlet].clientSessionView(session, req) + .asInstanceOf[BatchSessionView] + + view.id shouldEqual id + view.name shouldEqual name + view.state shouldEqual state.toString + view.appId shouldEqual Some(appId) + view.appInfo shouldEqual appInfo + view.log shouldEqual log + } + describe("Batch Servlet") { it("should create and tear down a batch") { jget[Map[String, Any]]("/") { data => @@ -116,36 +144,18 @@ class BatchServletSpec extends BaseSessionServletSpec[BatchSession, BatchRecover it("should respect config black list") { val createRequest = new CreateBatchRequest() + createRequest.name = Some("TEST-BatchServletSpec-Session-0") createRequest.file = script.toString createRequest.conf = BLACKLISTED_CONFIG jpost[Map[String, Any]]("/", createRequest, expectedStatus = SC_BAD_REQUEST) { _ => } } - it("should show session properties") { - val id = 0 - val state = SessionState.Running - val appId = "appid" - val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL")) - val log = IndexedSeq[String]("log1", "log2") - - val session = mock[BatchSession] - when(session.id).thenReturn(id) - when(session.state).thenReturn(state) - when(session.appId).thenReturn(Some(appId)) - when(session.appInfo).thenReturn(appInfo) - when(session.logLines()).thenReturn(log) - - val req = mock[HttpServletRequest] - - val view = servlet.asInstanceOf[BatchSessionServlet].clientSessionView(session, req) - .asInstanceOf[BatchSessionView] - - view.id shouldEqual id - view.state shouldEqual state.toString - view.appId shouldEqual Some(appId) - view.appInfo shouldEqual appInfo - view.log shouldEqual log - } + Seq(None, Some("TEST-batch-session")) + .foreach { name => + it(s"should show session properties (name = $name)") { + testShowSessionProperties(name) + } + } it("should fail session creation when max session creation is hit") { val createRequest = new CreateBatchRequest() diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index 8381c95..d7cac15 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -70,7 +70,8 @@ class BatchSessionSpec val conf = new LivyConf().set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir")) val accessManager = new AccessManager(conf) - val batch = BatchSession.create(0, req, conf, accessManager, null, sessionStore) + val batch = BatchSession.create(0, None, req, conf, accessManager, null, sessionStore) + batch.start() Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, TimeUnit.SECONDS)) (batch.state match { @@ -87,7 +88,8 @@ class BatchSessionSpec val mockApp = mock[SparkApp] val accessManager = new AccessManager(conf) val batch = BatchSession.create( - 0, req, conf, accessManager, null, sessionStore, Some(mockApp)) + 0, None, req, conf, accessManager, null, sessionStore, Some(mockApp)) + batch.start() val expectedAppId = "APPID" batch.appIdKnown(expectedAppId) @@ -100,18 +102,27 @@ class BatchSessionSpec batch.appInfo shouldEqual expectedAppInfo } - it("should recover session") { + def testRecoverSession(name: Option[String]): Unit = { val conf = new LivyConf() val req = new CreateBatchRequest() + val name = Some("Test Batch Session") val mockApp = mock[SparkApp] - val m = BatchRecoveryMetadata(99, None, "appTag", null, None) + val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None) val batch = BatchSession.recover(m, conf, sessionStore, Some(mockApp)) batch.state shouldBe (SessionState.Recovering) + batch.name shouldBe (name) batch.appIdKnown("appId") verify(sessionStore, atLeastOnce()).save( Matchers.eq(BatchSession.RECOVERY_SESSION_TYPE), anyObject()) } + + Seq[Option[String]](None, Some("Test Batch Session"), null) + .foreach { case name => + it(s"should recover session (name = $name)") { + testRecoverSession(name) + } + } } } diff --git a/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala index b16e74f..7f8bbfe 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala @@ -61,6 +61,7 @@ abstract class BaseInteractiveServletSpec val classpath = sys.props("java.class.path") val request = new CreateInteractiveRequest() request.kind = kind + request.name = None request.conf = extraConf ++ Map( RSCConf.Entry.LIVY_JARS.key() -> "", RSCConf.Entry.CLIENT_IN_PROCESS.key() -> inProcess.toString, diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala index e946fc0..0b061fa 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala @@ -60,6 +60,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { val session = mock[InteractiveSession] when(session.kind).thenReturn(Spark) + when(session.name).thenReturn(None) when(session.appId).thenReturn(None) when(session.appInfo).thenReturn(AppInfo()) when(session.logLines()).thenReturn(IndexedSeq()) @@ -151,7 +152,14 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { } } - it("should show session properties") { + Seq(Some("TEST-interactive-session"), None) + .foreach { case name => + it(s"should show session properties (name =$name") { + testShowSessionProperties(name: Option[String]) + } + } + + def testShowSessionProperties(name: Option[String]): Unit = { val id = 0 val appId = "appid" val owner = "owner" @@ -163,6 +171,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { val session = mock[InteractiveSession] when(session.id).thenReturn(id) + when(session.name).thenReturn(name) when(session.appId).thenReturn(Some(appId)) when(session.owner).thenReturn(owner) when(session.proxyUser).thenReturn(Some(proxyUser)) @@ -177,6 +186,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { .asInstanceOf[SessionInfo] view.id shouldEqual id + Option(view.name) shouldEqual name view.appId shouldEqual appId view.owner shouldEqual owner view.proxyUser shouldEqual proxyUser diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala index f07e61f..95bc08a 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala @@ -70,7 +70,7 @@ class InteractiveSessionSpec extends FunSpec SparkLauncher.DRIVER_EXTRA_CLASSPATH -> sys.props("java.class.path"), RSCConf.Entry.LIVY_JARS.key() -> "" ) - InteractiveSession.create(0, null, livyConf, accessManager, req, sessionStore, mockApp) + InteractiveSession.create(0, None, null, livyConf, accessManager, req, sessionStore, mockApp) } private def executeStatement(code: String, codeType: Option[String] = None): JValue = { @@ -160,6 +160,7 @@ class InteractiveSessionSpec extends FunSpec val mockApp = mock[SparkApp] val sessionStore = mock[SessionStore] session = createSession(sessionStore, Some(mockApp)) + session.start() val expectedAppId = "APPID" session.appIdKnown(expectedAppId) @@ -242,15 +243,32 @@ class InteractiveSessionSpec extends FunSpec } describe("recovery") { - it("should recover session") { + it("should recover named sessions") { val conf = new LivyConf() val sessionStore = mock[SessionStore] val mockClient = mock[RSCClient] when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]]) - val m = - InteractiveRecoveryMetadata( - 78, None, "appTag", Spark, 0, null, None, Some(URI.create(""))) + val m = InteractiveRecoveryMetadata( + 78, Some("Test session"), None, "appTag", Spark, 0, null, None, Some(URI.create(""))) + val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient)) + s.start() + + s.state shouldBe (SessionState.Recovering) + + s.appIdKnown("appId") + verify(sessionStore, atLeastOnce()).save( + MockitoMatchers.eq(InteractiveSession.RECOVERY_SESSION_TYPE), anyObject()) + } + + it("should recover sessions with no name") { + val conf = new LivyConf() + val sessionStore = mock[SessionStore] + val mockClient = mock[RSCClient] + when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]]) + val m = InteractiveRecoveryMetadata( + 78, None, None, "appTag", Spark, 0, null, None, Some(URI.create(""))) val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient)) + s.start() s.state shouldBe (SessionState.Recovering) @@ -263,9 +281,9 @@ class InteractiveSessionSpec extends FunSpec val conf = new LivyConf() val sessionStore = mock[SessionStore] val m = InteractiveRecoveryMetadata( - 78, Some("appId"), "appTag", Spark, 0, null, None, None) + 78, None, Some("appId"), "appTag", Spark, 0, null, None, None) val s = InteractiveSession.recover(m, conf, sessionStore, None) - + s.start() s.state shouldBe a[SessionState.Dead] s.logLines().mkString should include("RSCDriver URI is unknown") } diff --git a/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala index 12c8bbb..6d2cff8 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala @@ -55,7 +55,8 @@ class SessionHeartbeatSpec extends FunSpec with Matchers { } describe("SessionHeartbeatWatchdog") { - abstract class TestSession extends Session(0, null, null) with SessionHeartbeat {} + abstract class TestSession + extends Session(0, None, null, null) with SessionHeartbeat {} class TestWatchdog(conf: LivyConf) extends SessionManager[TestSession, RecoveryMetadata]( conf, @@ -68,10 +69,12 @@ class SessionHeartbeatSpec extends FunSpec with Matchers { it("should delete only expired sessions") { val expiredSession: TestSession = mock[TestSession] when(expiredSession.id).thenReturn(0) + when(expiredSession.name).thenReturn(None) when(expiredSession.heartbeatExpired).thenReturn(true) val nonExpiredSession: TestSession = mock[TestSession] when(nonExpiredSession.id).thenReturn(1) + when(nonExpiredSession.name).thenReturn(None) when(nonExpiredSession.heartbeatExpired).thenReturn(false) val n = new TestWatchdog(new LivyConf()) diff --git a/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala index e40bb1c..8ee448f 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala @@ -21,6 +21,7 @@ import org.scalatest.FunSpec import org.scalatest.Matchers._ import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} +import org.apache.livy.server.batch.BatchRecoveryMetadata class BlackholeStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { describe("BlackholeStateStore") { @@ -43,5 +44,24 @@ class BlackholeStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { it("remove should not throw") { stateStore.remove("") } + + it("should deserialize sessions without name") { + val jsonbytes = + """ + |{ + | "id": 408107, + | "appId": "application_1541532370353_1465148", + | "state": "running", + | "appTag": "livy-batch-408107-2jAOFzDy", + | "owner": "batch_admin", + | "proxyUser": "batch_opts", + | "version": 1 + |} + """.stripMargin.getBytes("UTF-8") + val batchRecoveryMetadata = stateStore.deserialize[BatchRecoveryMetadata](jsonbytes) + batchRecoveryMetadata.id shouldBe 408107 + batchRecoveryMetadata.appId shouldBe Some("application_1541532370353_1465148") + batchRecoveryMetadata.name shouldBe None + } } } diff --git a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala index 1604f4d..3d0cc26 100644 --- a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala +++ b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala @@ -19,11 +19,14 @@ package org.apache.livy.sessions import org.apache.livy.LivyConf -class MockSession(id: Int, owner: String, conf: LivyConf) extends Session(id, owner, conf) { +class MockSession(id: Int, owner: String, conf: LivyConf, name: Option[String] = None) + extends Session(id, name, owner, conf) { case class RecoveryMetadata(id: Int) extends Session.RecoveryMetadata() override val proxyUser = None + override def start(): Unit = () + override protected def stopSession(): Unit = () override def logLines(): IndexedSeq[String] = IndexedSeq() diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala index 547af8b..523e1d7 100644 --- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala @@ -36,16 +36,21 @@ import org.apache.livy.sessions.Session.RecoveryMetadata class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite { implicit def executor: ExecutionContext = ExecutionContext.global + private def createSessionManager(): (LivyConf, SessionManager[MockSession, RecoveryMetadata]) = { + val livyConf = new LivyConf() + livyConf.set(LivyConf.SESSION_TIMEOUT, "100ms") + val manager = new SessionManager[MockSession, RecoveryMetadata]( + livyConf, + { _ => assert(false).asInstanceOf[MockSession] }, + mock[SessionStore], + "test", + Some(Seq.empty)) + (livyConf, manager) + } + describe("SessionManager") { it("should garbage collect old sessions") { - val livyConf = new LivyConf() - livyConf.set(LivyConf.SESSION_TIMEOUT, "100ms") - val manager = new SessionManager[MockSession, RecoveryMetadata]( - livyConf, - { _ => assert(false).asInstanceOf[MockSession] }, - mock[SessionStore], - "test", - Some(Seq.empty)) + val (livyConf, manager) = createSessionManager() val session = manager.register(new MockSession(manager.nextId(), null, livyConf)) manager.get(session.id).isDefined should be(true) eventually(timeout(5 seconds), interval(100 millis)) { @@ -54,10 +59,31 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit } } + it("should create sessions with names") { + val (livyConf, manager) = createSessionManager() + val name = "Mock-session" + val session = manager.register(new MockSession(manager.nextId(), null, livyConf, Some(name))) + manager.get(session.id).isDefined should be(true) + manager.get(name).isDefined should be(true) + } + + it("should not create sessions with duplicate names") { + val (livyConf, manager) = createSessionManager() + val name = "Mock-session" + val session1 = new MockSession(manager.nextId(), null, livyConf, Some(name)) + val session2 = new MockSession(manager.nextId(), null, livyConf, Some(name)) + manager.register(session1) + an[IllegalArgumentException] should be thrownBy manager.register(session2) + manager.get(session1.id).isDefined should be(true) + manager.get(session2.id).isDefined should be(false) + manager.shutdown() + } + it("batch session should not be gc-ed until application is finished") { val sessionId = 24 val session = mock[BatchSession] when(session.id).thenReturn(sessionId) + when(session.name).thenReturn(None) when(session.stop()).thenReturn(Future {}) when(session.lastActivity).thenReturn(System.nanoTime()) @@ -70,6 +96,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit val sessionId = 24 val session = mock[InteractiveSession] when(session.id).thenReturn(sessionId) + when(session.name).thenReturn(None) when(session.stop()).thenReturn(Future {}) when(session.lastActivity).thenReturn(System.nanoTime()) @@ -112,12 +139,13 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit implicit def executor: ExecutionContext = ExecutionContext.global def makeMetadata(id: Int, appTag: String): BatchRecoveryMetadata = { - BatchRecoveryMetadata(id, None, appTag, null, None) + BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, None) } def mockSession(id: Int): BatchSession = { val session = mock[BatchSession] when(session.id).thenReturn(id) + when(session.name).thenReturn(None) when(session.stop()).thenReturn(Future {}) when(session.lastActivity).thenReturn(System.nanoTime()) diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala index 5be5536..31ea2f0 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala @@ -228,6 +228,7 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC createInteractiveRequest.kind = Spark val newSession = InteractiveSession.create( server.livySessionManager.nextId(), + None, username, server.livyConf, server.accessManager,