Repository: incubator-livy
Updated Branches:
  refs/heads/master d39ab356b -> 134713d70


[LIVY-494] Add thriftserver to Livy server

## What changes were proposed in this pull request?

The PR adds a configuration parameter in order to startup also the thriftserver 
when starting Livy server.

Apart from this trivial change, other 3 main things were needed and are present 
in this PR:

 - Add the thriftserver JARs to the assembly and the livy-server script;
 - A small refactor in order to enforce impersonation in the `*Session` 
classes, instead of in the `*Servlet` ones, so that it is picked up by the 
thriftserver module too (this change is not strictly needed, but I consider it 
a better option that duplicating this logic in the thriftserver module too);
 - Creating a UGI from the configured keytab. This is needed because the 
thriftserver requires the UGI to be created from a keytab in order to work 
properly and previously Livy was using a UGI generated from the cached TGT 
(created by the `kinit` command).

## How was this patch tested?

Manual test: starting the server and having it up for more than 9 days.

Author: Marco Gaido <mga...@hortonworks.com>

Closes #107 from mgaido91/LIVY-494.


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/134713d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/134713d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/134713d7

Branch: refs/heads/master
Commit: 134713d705a8de555bed912ac84f5214d7bddc30
Parents: d39ab35
Author: Marco Gaido <mga...@hortonworks.com>
Authored: Mon Sep 24 15:39:07 2018 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Mon Sep 24 15:39:07 2018 -0700

----------------------------------------------------------------------
 assembly/assembly.xml                           |  7 +++
 assembly/pom.xml                                | 11 ++++
 bin/livy-server                                 |  5 ++
 server/pom.xml                                  |  6 +++
 .../main/scala/org/apache/livy/LivyConf.scala   |  1 +
 .../org/apache/livy/server/AccessManager.scala  | 46 ++++++++++++++++
 .../org/apache/livy/server/LivyServer.scala     | 23 ++++++++
 .../org/apache/livy/server/SessionServlet.scala | 56 +++-----------------
 .../livy/server/ThriftServerFactory.scala       | 40 ++++++++++++++
 .../apache/livy/server/batch/BatchSession.scala |  9 ++--
 .../livy/server/batch/BatchSessionServlet.scala | 10 ++--
 .../server/interactive/InteractiveSession.scala |  8 +--
 .../interactive/InteractiveSessionServlet.scala |  5 +-
 .../apache/livy/server/SessionServletSpec.scala |  8 ++-
 .../livy/server/batch/BatchSessionSpec.scala    |  8 ++-
 .../interactive/InteractiveSessionSpec.scala    |  4 +-
 .../livy/thriftserver/LivyThriftServer.scala    | 23 ++++++--
 .../thriftserver/LivyThriftSessionManager.scala |  2 +-
 .../thriftserver/ThriftServerFactoryImpl.scala  | 37 +++++++++++++
 19 files changed, 236 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/assembly/assembly.xml b/assembly/assembly.xml
index e1dcdff..f63fc0b 100644
--- a/assembly/assembly.xml
+++ b/assembly/assembly.xml
@@ -75,5 +75,12 @@
         <include>*</include>
       </includes>
     </fileSet>
+    <fileSet>
+      
<directory>${project.parent.basedir}/thriftserver/server/target/jars</directory>
+      <outputDirectory>${assembly.name}/jars</outputDirectory>
+      <includes>
+        <include>*</include>
+      </includes>
+    </fileSet>
   </fileSets>
 </assembly>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 1aeb908..470f24d 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -91,6 +91,17 @@
         <assembly.format>tar.gz</assembly.format>
       </properties>
     </profile>
+
+    <profile>
+      <id>thriftserver</id>
+      <dependencies>
+        <dependency>
+          <groupId>${project.groupId}</groupId>
+          <artifactId>livy-thriftserver</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
   </profiles>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/bin/livy-server
----------------------------------------------------------------------
diff --git a/bin/livy-server b/bin/livy-server
index eeb57fb..8d27d4e 100755
--- a/bin/livy-server
+++ b/bin/livy-server
@@ -79,10 +79,15 @@ start_livy_server() {
   LIBDIR="$LIVY_HOME/jars"
   if [ ! -d "$LIBDIR" ]; then
     LIBDIR="$LIVY_HOME/server/target/jars"
+    THRIFT_LIBDIR="$LIVY_HOME/thriftserver/server/target/jars"
   fi
   if [ ! -d "$LIBDIR" ]; then
     echo "Could not find Livy jars directory." 1>&2
     exit 1
+  else
+    if [ -d "$THRIFT_LIBDIR" ]; then
+      LIBDIR="$THRIFT_LIBDIR/*:$LIBDIR"
+    fi
   fi
 
   LIVY_CLASSPATH="$LIBDIR/*:$LIVY_CONF_DIR"

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index aa14b6d..0ebc329 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -98,6 +98,12 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-auth</artifactId>
       <scope>${hadoop.scope}</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>jline</groupId>
+          <artifactId>jline</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/server/src/main/scala/org/apache/livy/LivyConf.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala 
b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 73544c8..9e61f83 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -99,6 +99,7 @@ object LivyConf {
   val LAUNCH_KERBEROS_REFRESH_INTERVAL = 
Entry("livy.server.launch.kerberos.refresh-interval", "1h")
   val KINIT_FAIL_THRESHOLD = 
Entry("livy.server.launch.kerberos.kinit-fail-threshold", 5)
 
+  val THRIFT_SERVER_ENABLED = Entry("livy.server.thrift.enabled", false)
   val THRIFT_INCR_COLLECT_ENABLED = 
Entry("livy.server.thrift.incrementalCollect", false)
   val THRIFT_SESSION_CREATION_TIMEOUT = 
Entry("livy.server.thrift.session.creationTimeout", "10m")
   val THRIFT_SERVER_JAR_LOCATION = Entry("livy.server.thrift.jarLocation", 
null)

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/server/src/main/scala/org/apache/livy/server/AccessManager.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/AccessManager.scala 
b/server/src/main/scala/org/apache/livy/server/AccessManager.scala
index 346626a..c86801e 100644
--- a/server/src/main/scala/org/apache/livy/server/AccessManager.scala
+++ b/server/src/main/scala/org/apache/livy/server/AccessManager.scala
@@ -17,6 +17,8 @@
 
 package org.apache.livy.server
 
+import java.security.AccessControlException
+
 import org.apache.livy.{LivyConf, Logging}
 
 private[livy] class AccessManager(conf: LivyConf) extends Logging {
@@ -94,4 +96,48 @@ private[livy] class AccessManager(conf: LivyConf) extends 
Logging {
    * Check whether access control is enabled or not.
    */
   def isAccessControlOn: Boolean = aclsOn
+
+  /**
+   * Checks that the requesting user can impersonate the target user.
+   * If the user does not have permission to impersonate, then throws an 
`AccessControlException`.
+   *
+   * @return The user that should be impersonated. That can be the target user 
if defined, the
+   *         request's user - which may not be defined - otherwise, or `None` 
if impersonation is
+   *         disabled.
+   */
+  def checkImpersonation(
+      target: Option[String],
+      requestUser: String,
+      livyConf: LivyConf): Option[String] = {
+    if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
+      if (!target.forall(hasSuperAccess(_, requestUser))) {
+        throw new AccessControlException(
+          s"User '$requestUser' not allowed to impersonate '$target'.")
+      }
+      target.orElse(Option(requestUser))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Check that the requesting user has admin access to resources owned by the 
given target user.
+   */
+  def hasSuperAccess(target: String, requestUser: String): Boolean = {
+    requestUser == target || checkSuperUser(requestUser)
+  }
+
+  /**
+   * Check that the request's user has modify access to resources owned by the 
given target user.
+   */
+  def hasModifyAccess(target: String, requestUser: String): Boolean = {
+    requestUser == target || checkModifyPermissions(requestUser)
+  }
+
+  /**
+   * Check that the request's user has view access to resources owned by the 
given target user.
+   */
+  def hasViewAccess(target: String, requestUser: String): Boolean = {
+    requestUser == target || checkViewPermissions(requestUser)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/server/src/main/scala/org/apache/livy/server/LivyServer.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala 
b/server/src/main/scala/org/apache/livy/server/LivyServer.scala
index b0b84a2..525f560 100644
--- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala
+++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala
@@ -56,6 +56,8 @@ class LivyServer extends Logging {
   private var executor: ScheduledExecutorService = _
   private var accessManager: AccessManager = _
 
+  private var ugi: UserGroupInformation = _
+
   def start(): Unit = {
     livyConf = new LivyConf().loadFromFile("livy.conf")
     accessManager = new AccessManager(livyConf)
@@ -115,6 +117,16 @@ class LivyServer extends Logging {
         error("Failed to run kinit, stopping the server.")
         sys.exit(1)
       }
+      // This is and should be the only place where a login() on the UGI is 
performed.
+      // If an other login in the codebase is strictly needed, a needLogin 
check should be added to
+      // avoid anyway that 2 logins are performed.
+      // This is needed because the thriftserver requires the UGI to be 
created from a keytab in
+      // order to work properly and previously Livy was using a UGI generated 
from the cached TGT
+      // (created by the kinit command).
+      if (livyConf.getBoolean(LivyConf.THRIFT_SERVER_ENABLED)) {
+        UserGroupInformation.loginUserFromKeytab(launch_principal, 
launch_keytab)
+      }
+      ugi = UserGroupInformation.getCurrentUser
       startKinitThread(launch_keytab, launch_principal)
     }
 
@@ -266,6 +278,11 @@ class LivyServer extends Logging {
       }
     })
 
+    if (livyConf.getBoolean(LivyConf.THRIFT_SERVER_ENABLED)) {
+      ThriftServerFactory.getInstance.start(
+        livyConf, interactiveSessionManager, sessionStore, accessManager)
+    }
+
     _serverUrl = Some(s"${server.protocol}://${server.host}:${server.port}")
     sys.props("livy.server.server-url") = _serverUrl.get
   }
@@ -292,6 +309,12 @@ class LivyServer extends Logging {
       new Runnable() {
         override def run(): Unit = {
           if (runKinit(keytab, principal)) {
+            // The current UGI should never change. If that happens, it is an 
error condition and
+            // relogin the original UGI would not update the current UGI. So 
the server will fail
+            // due to no valid credentials. The assert here allows to fast 
detect this error
+            // condition and fail immediately with a meaningful error.
+            assert(ugi.equals(UserGroupInformation.getCurrentUser), "Current 
UGI has changed.")
+            ugi.reloginFromTicketCache()
             // schedule another kinit run with a fixed delay.
             executor.schedule(this, refreshInterval, TimeUnit.MILLISECONDS)
           } else {

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala 
b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
index 69418a8..928aa05 100644
--- a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
+++ b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
@@ -17,6 +17,7 @@
 
 package org.apache.livy.server
 
+import java.security.AccessControlException
 import javax.servlet.http.HttpServletRequest
 
 import org.scalatra._
@@ -149,6 +150,7 @@ abstract class SessionServlet[S <: Session, R <: 
RecoveryMetadata](
 
   error {
     case e: IllegalArgumentException => BadRequest(e.getMessage)
+    case e: AccessControlException => Forbidden(e.getMessage)
   }
 
   /**
@@ -157,52 +159,6 @@ abstract class SessionServlet[S <: Session, R <: 
RecoveryMetadata](
   protected def remoteUser(req: HttpServletRequest): String = 
req.getRemoteUser()
 
   /**
-   * Checks that the request's user can impersonate the target user.
-   *
-   * If the user does not have permission to impersonate, then halt execution.
-   *
-   * @return The user that should be impersonated. That can be the target user 
if defined, the
-   *         request's user - which may not be defined - otherwise, or `None` 
if impersonation is
-   *         disabled.
-   */
-  protected def checkImpersonation(
-      target: Option[String],
-      req: HttpServletRequest): Option[String] = {
-    if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
-      if (!target.map(hasSuperAccess(_, req)).getOrElse(true)) {
-        halt(Forbidden(s"User '${remoteUser(req)}' not allowed to impersonate 
'$target'."))
-      }
-      target.orElse(Option(remoteUser(req)))
-    } else {
-      None
-    }
-  }
-
-  /**
-   * Check that the request's user has view access to resources owned by the 
given target user.
-   */
-  protected def hasViewAccess(target: String, req: HttpServletRequest): 
Boolean = {
-    val user = remoteUser(req)
-    user == target || accessManager.checkViewPermissions(user)
-  }
-
-  /**
-   * Check that the request's user has modify access to resources owned by the 
given target user.
-   */
-  protected def hasModifyAccess(target: String, req: HttpServletRequest): 
Boolean = {
-    val user = remoteUser(req)
-    user == target || accessManager.checkModifyPermissions(user)
-  }
-
-  /**
-   * Check that the request's user has admin access to resources owned by the 
given target user.
-   */
-  protected def hasSuperAccess(target: String, req: HttpServletRequest): 
Boolean = {
-    val user = remoteUser(req)
-    user == target || accessManager.checkSuperUser(user)
-  }
-
-  /**
    * Performs an operation on the session, without checking for ownership. 
Operations executed
    * via this method must not modify the session in any way, or return 
potentially sensitive
    * information.
@@ -214,22 +170,22 @@ abstract class SessionServlet[S <: Session, R <: 
RecoveryMetadata](
    * session.
    */
   protected def withViewAccessSession(fn: (S => Any)): Any =
-    doWithSession(fn, false, Some(hasViewAccess))
+    doWithSession(fn, false, Some(accessManager.hasViewAccess))
 
   /**
    * Performs an operation on the session, verifying whether the caller has 
view access of the
    * session.
    */
   protected def withModifyAccessSession(fn: (S => Any)): Any =
-    doWithSession(fn, false, Some(hasModifyAccess))
+    doWithSession(fn, false, Some(accessManager.hasModifyAccess))
 
   private def doWithSession(fn: (S => Any),
       allowAll: Boolean,
-      checkFn: Option[(String, HttpServletRequest) => Boolean]): Any = {
+      checkFn: Option[(String, String) => Boolean]): Any = {
     val sessionId = params("id").toInt
     sessionManager.get(sessionId) match {
       case Some(session) =>
-        if (allowAll || checkFn.map(_(session.owner, 
request)).getOrElse(false)) {
+        if (allowAll || checkFn.map(_(session.owner, 
remoteUser(request))).getOrElse(false)) {
           fn(session)
         } else {
           Forbidden()

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/server/src/main/scala/org/apache/livy/server/ThriftServerFactory.scala
----------------------------------------------------------------------
diff --git 
a/server/src/main/scala/org/apache/livy/server/ThriftServerFactory.scala 
b/server/src/main/scala/org/apache/livy/server/ThriftServerFactory.scala
new file mode 100644
index 0000000..b6f7d9d
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/ThriftServerFactory.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server
+
+import org.apache.livy.LivyConf
+import org.apache.livy.server.recovery.SessionStore
+import org.apache.livy.sessions.InteractiveSessionManager
+
+/**
+ * Its implementation starts Livy ThriftServer
+ */
+trait ThriftServerFactory {
+  def start(
+    livyConf: LivyConf,
+    livySessionManager: InteractiveSessionManager,
+    sessionStore: SessionStore,
+    accessManager: AccessManager): Unit
+}
+
+object ThriftServerFactory {
+  def getInstance: ThriftServerFactory = {
+    
Class.forName("org.apache.livy.thriftserver.ThriftServerFactoryImpl").newInstance()
+      .asInstanceOf[ThriftServerFactory]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
----------------------------------------------------------------------
diff --git 
a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala 
b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
index 24c6cfd..3d1e6cd 100644
--- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
@@ -26,8 +26,8 @@ import scala.util.Random
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties
 
 import org.apache.livy.{LivyConf, Logging, Utils}
+import org.apache.livy.server.AccessManager
 import org.apache.livy.server.recovery.SessionStore
-import org.apache.livy.server.SessionServlet
 import org.apache.livy.sessions.{Session, SessionState}
 import org.apache.livy.sessions.Session._
 import org.apache.livy.utils.{AppInfo, SparkApp, SparkAppListener, 
SparkProcessBuilder}
@@ -55,11 +55,12 @@ object BatchSession extends Logging {
       id: Int,
       request: CreateBatchRequest,
       livyConf: LivyConf,
+      accessManager: AccessManager,
       owner: String,
-      proxyUser: Option[String],
       sessionStore: SessionStore,
       mockApp: Option[SparkApp] = None): BatchSession = {
     val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}"
+    val impersonatedUser = accessManager.checkImpersonation(request.proxyUser, 
owner, livyConf)
 
     def createSparkApp(s: BatchSession): SparkApp = {
       val conf = SparkApp.prepareSparkConf(
@@ -72,7 +73,7 @@ object BatchSession extends Logging {
       val builder = new SparkProcessBuilder(livyConf)
       builder.conf(conf)
 
-      proxyUser.foreach(builder.proxyUser)
+      impersonatedUser.foreach(builder.proxyUser)
       request.className.foreach(builder.className)
       request.driverMemory.foreach(builder.driverMemory)
       request.driverCores.foreach(builder.driverCores)
@@ -116,7 +117,7 @@ object BatchSession extends Logging {
       SessionState.Starting,
       livyConf,
       owner,
-      proxyUser,
+      impersonatedUser,
       sessionStore,
       mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp))
   }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
----------------------------------------------------------------------
diff --git 
a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala 
b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
index 85945d9..a069a50 100644
--- 
a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
@@ -42,16 +42,20 @@ class BatchSessionServlet(
 
   override protected def createSession(req: HttpServletRequest): BatchSession 
= {
     val createRequest = bodyAs[CreateBatchRequest](req)
-    val proxyUser = checkImpersonation(createRequest.proxyUser, req)
     BatchSession.create(
-      sessionManager.nextId(), createRequest, livyConf, remoteUser(req), 
proxyUser, sessionStore)
+      sessionManager.nextId(),
+      createRequest,
+      livyConf,
+      accessManager,
+      remoteUser(req),
+      sessionStore)
   }
 
   override protected[batch] def clientSessionView(
       session: BatchSession,
       req: HttpServletRequest): Any = {
     val logs =
-      if (hasViewAccess(session.owner, req)) {
+      if (accessManager.hasViewAccess(session.owner, remoteUser(req))) {
         val lines = session.logLines()
 
         val size = 10

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
----------------------------------------------------------------------
diff --git 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index 8bb1641..43a61ac 100644
--- 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -39,6 +39,7 @@ import org.apache.livy._
 import org.apache.livy.client.common.HttpMessages._
 import org.apache.livy.rsc.{PingJob, RSCClient, RSCConf}
 import org.apache.livy.rsc.driver.Statement
+import org.apache.livy.server.AccessManager
 import org.apache.livy.server.recovery.SessionStore
 import org.apache.livy.sessions._
 import org.apache.livy.sessions.Session._
@@ -66,13 +67,14 @@ object InteractiveSession extends Logging {
   def create(
       id: Int,
       owner: String,
-      proxyUser: Option[String],
       livyConf: LivyConf,
+      accessManager: AccessManager,
       request: CreateInteractiveRequest,
       sessionStore: SessionStore,
       mockApp: Option[SparkApp] = None,
       mockClient: Option[RSCClient] = None): InteractiveSession = {
     val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}"
+    val impersonatedUser = accessManager.checkImpersonation(request.proxyUser, 
owner, livyConf)
 
     val client = mockClient.orElse {
       val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf(
@@ -101,7 +103,7 @@ object InteractiveSession extends Logging {
         .setAll(builderProperties.asJava)
         .setConf("livy.client.session-id", id.toString)
         .setConf(RSCConf.Entry.DRIVER_CLASS.key(), 
"org.apache.livy.repl.ReplDriver")
-        .setConf(RSCConf.Entry.PROXY_USER.key(), proxyUser.orNull)
+        .setConf(RSCConf.Entry.PROXY_USER.key(), impersonatedUser.orNull)
         .setURI(new URI("rsc:/"))
 
       Option(builder.build().asInstanceOf[RSCClient])
@@ -117,7 +119,7 @@ object InteractiveSession extends Logging {
       request.heartbeatTimeoutInSecond,
       livyConf,
       owner,
-      proxyUser,
+      impersonatedUser,
       sessionStore,
       mockApp)
   }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
----------------------------------------------------------------------
diff --git 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
index 4d614f4..7450cd7 100644
--- 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
@@ -52,12 +52,11 @@ class InteractiveSessionServlet(
 
   override protected def createSession(req: HttpServletRequest): 
InteractiveSession = {
     val createRequest = bodyAs[CreateInteractiveRequest](req)
-    val proxyUser = checkImpersonation(createRequest.proxyUser, req)
     InteractiveSession.create(
       sessionManager.nextId(),
       remoteUser(req),
-      proxyUser,
       livyConf,
+      accessManager,
       createRequest,
       sessionStore)
   }
@@ -66,7 +65,7 @@ class InteractiveSessionServlet(
       session: InteractiveSession,
       req: HttpServletRequest): Any = {
     val logs =
-      if (hasViewAccess(session.owner, req)) {
+      if (accessManager.hasViewAccess(session.owner, remoteUser(req))) {
         Option(session.logLines())
           .map { lines =>
             val size = 10

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
----------------------------------------------------------------------
diff --git 
a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala 
b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
index f07c296..e0ebd9a 100644
--- a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
@@ -62,14 +62,18 @@ object SessionServletSpec {
     new SessionServlet(sessionManager, conf, accessManager) with 
RemoteUserOverride {
       override protected def createSession(req: HttpServletRequest): Session = 
{
         val params = bodyAs[Map[String, String]](req)
-        checkImpersonation(params.get(PROXY_USER), req)
+        accessManager.checkImpersonation(params.get(PROXY_USER), 
remoteUser(req), livyConf)
         new MockSession(sessionManager.nextId(), remoteUser(req), conf)
       }
 
       override protected def clientSessionView(
           session: Session,
           req: HttpServletRequest): Any = {
-        val logs = if (hasViewAccess(session.owner, req)) session.logLines() 
else Nil
+        val logs = if (accessManager.hasViewAccess(session.owner, 
remoteUser(req))) {
+          session.logLines()
+        } else {
+          Nil
+        }
         MockSessionView(session.id, session.owner, logs)
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
----------------------------------------------------------------------
diff --git 
a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala 
b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
index 196d328..8381c95 100644
--- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
@@ -30,6 +30,7 @@ import org.scalatest.{BeforeAndAfter, FunSpec, ShouldMatchers}
 import org.scalatest.mock.MockitoSugar.mock
 
 import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf, Utils}
+import org.apache.livy.server.AccessManager
 import org.apache.livy.server.recovery.SessionStore
 import org.apache.livy.sessions.SessionState
 import org.apache.livy.utils.{AppInfo, SparkApp}
@@ -68,7 +69,8 @@ class BatchSessionSpec
       req.conf = Map("spark.driver.extraClassPath" -> 
sys.props("java.class.path"))
 
       val conf = new LivyConf().set(LivyConf.LOCAL_FS_WHITELIST, 
sys.props("java.io.tmpdir"))
-      val batch = BatchSession.create(0, req, conf, null, None, sessionStore)
+      val accessManager = new AccessManager(conf)
+      val batch = BatchSession.create(0, req, conf, accessManager, null, 
sessionStore)
 
       Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, 
TimeUnit.SECONDS))
       (batch.state match {
@@ -83,7 +85,9 @@ class BatchSessionSpec
       val conf = new LivyConf()
       val req = new CreateBatchRequest()
       val mockApp = mock[SparkApp]
-      val batch = BatchSession.create(0, req, conf, null, None, sessionStore, 
Some(mockApp))
+      val accessManager = new AccessManager(conf)
+      val batch = BatchSession.create(
+        0, req, conf, accessManager, null, sessionStore, Some(mockApp))
 
       val expectedAppId = "APPID"
       batch.appIdKnown(expectedAppId)

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
----------------------------------------------------------------------
diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
index 9ee4cd8..20e1f2d 100644
--- 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
@@ -36,6 +36,7 @@ import org.scalatest.mock.MockitoSugar.mock
 import org.apache.livy.{ExecuteRequest, JobHandle, LivyBaseUnitTestSuite, 
LivyConf}
 import org.apache.livy.rsc.{PingJob, RSCClient, RSCConf}
 import org.apache.livy.rsc.driver.StatementState
+import org.apache.livy.server.AccessManager
 import org.apache.livy.server.recovery.SessionStore
 import org.apache.livy.sessions.{PySpark, SessionState, Spark}
 import org.apache.livy.utils.{AppInfo, SparkApp}
@@ -51,6 +52,7 @@ class InteractiveSessionSpec extends FunSpec
   implicit val formats = DefaultFormats
 
   private var session: InteractiveSession = null
+  private val accessManager = new AccessManager(livyConf)
 
   private def createSession(
       sessionStore: SessionStore = mock[SessionStore],
@@ -68,7 +70,7 @@ class InteractiveSessionSpec extends FunSpec
       SparkLauncher.DRIVER_EXTRA_CLASSPATH -> sys.props("java.class.path"),
       RSCConf.Entry.LIVY_JARS.key() -> ""
     )
-    InteractiveSession.create(0, null, None, livyConf, req, sessionStore, 
mockApp)
+    InteractiveSession.create(0, null, livyConf, accessManager, req, 
sessionStore, mockApp)
   }
 
   private def executeStatement(code: String, codeType: Option[String] = None): 
JValue = {

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala
----------------------------------------------------------------------
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala
index d34c1c0..c670217 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala
@@ -17,11 +17,13 @@
 
 package org.apache.livy.thriftserver
 
+import java.security.PrivilegedExceptionAction
+
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hive.service.server.HiveServer2
-import org.scalatra.ScalatraServlet
 
 import org.apache.livy.{LivyConf, Logging}
 import org.apache.livy.server.AccessManager
@@ -61,6 +63,7 @@ object LivyThriftServer extends Logging {
       accessManager: AccessManager): Unit = synchronized {
     if (thriftServerThread == null) {
       info("Starting LivyThriftServer")
+      val ugi = UserGroupInformation.getCurrentUser
       val runThriftServer = new Runnable {
         override def run(): Unit = {
           try {
@@ -69,8 +72,15 @@ object LivyThriftServer extends Logging {
               livySessionManager,
               sessionStore,
               accessManager)
-            thriftServer.init(hiveConf(livyConf))
-            thriftServer.start()
+            if (UserGroupInformation.isSecurityEnabled) {
+              ugi.doAs(new PrivilegedExceptionAction[Unit] {
+                override def run(): Unit = {
+                  doStart(livyConf)
+                }
+              })
+            } else {
+              doStart(livyConf)
+            }
             info("LivyThriftServer started")
           } catch {
             case e: Exception =>
@@ -86,6 +96,11 @@ object LivyThriftServer extends Logging {
     }
   }
 
+  private def doStart(livyConf: LivyConf): Unit = {
+    thriftServer.init(hiveConf(livyConf))
+    thriftServer.start()
+  }
+
   private[thriftserver] def getInstance: Option[LivyThriftServer] = {
     Option(thriftServer)
   }
@@ -106,7 +121,7 @@ class LivyThriftServer(
     private[thriftserver] val livyConf: LivyConf,
     private[thriftserver] val livySessionManager: InteractiveSessionManager,
     private[thriftserver] val sessionStore: SessionStore,
-    private val accessManager: AccessManager) extends HiveServer2 {
+    private[thriftserver] val accessManager: AccessManager) extends 
HiveServer2 {
   override def init(hiveConf: HiveConf): Unit = {
     this.cliService = new LivyCLIService(this)
     super.init(hiveConf)

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
----------------------------------------------------------------------
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
index ec987c5..7c7b265 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
@@ -249,8 +249,8 @@ class LivyThriftSessionManager(val server: 
LivyThriftServer, hiveConf: HiveConf)
       val newSession = InteractiveSession.create(
         server.livySessionManager.nextId(),
         username,
-        None,
         server.livyConf,
+        server.accessManager,
         createInteractiveRequest,
         server.sessionStore)
       onLivySessionOpened(newSession)

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/134713d7/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/ThriftServerFactoryImpl.scala
----------------------------------------------------------------------
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/ThriftServerFactoryImpl.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/ThriftServerFactoryImpl.scala
new file mode 100644
index 0000000..5cc5707
--- /dev/null
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/ThriftServerFactoryImpl.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import org.apache.livy.LivyConf
+import org.apache.livy.server.{AccessManager, ThriftServerFactory}
+import org.apache.livy.server.recovery.SessionStore
+import org.apache.livy.sessions.InteractiveSessionManager
+
+class ThriftServerFactoryImpl extends ThriftServerFactory {
+  override def start(
+      livyConf: LivyConf,
+      livySessionManager: InteractiveSessionManager,
+      sessionStore: SessionStore,
+      accessManager: AccessManager): Unit = {
+    if (LivyThriftServer.getInstance.isDefined) {
+      throw new RuntimeException(s"A ${classOf[LivyThriftServer].getName} has 
been already " +
+        s"started, so a new one cannot be started.")
+    }
+    LivyThriftServer.start(livyConf, livySessionManager, sessionStore, 
accessManager)
+  }
+}


Reply via email to