This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 87c01e1db5 [KYUUBI #6619] Support http bearer authentication for 
ThriftHttp protocol
87c01e1db5 is described below

commit 87c01e1db51cdee36d1d70bc87b513ca1061c140
Author: Wang, Fei <[email protected]>
AuthorDate: Tue Aug 20 09:48:39 2024 -0700

    [KYUUBI #6619] Support http bearer authentication for ThriftHttp protocol
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #6619
    
    ## Describe Your Solution ๐Ÿ”ง
    
    This is a subtask of #6590
    This PR makes the following changes:
    1. Instead of using SessionManager, the ThriftHttpService will use 
AuthorizationFilter to handle http client parameters. The purpose is to pass 
correct client IP address for bearer token authentication.
    2. Partial backport Partial backport: 
https://github.com/apache/hive/pull/3006
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [x] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6624 from George314159/thrift.
    
    Closes #6619
    
    9761d6ddf [Wang, Fei] Partial backport HIVE-25575: Add support for JWT 
authentication in HTTP mode
    30ede05d8 [Wang, Fei] Revert "suffix"
    39c945802 [Wang, Fei] suffix
    b99420633 [Wang, Fei] Revert "Fix"
    ea6236a8d [George314159] Fix
    a75025be4 [George314159] Refine UT
    945433583 [George314159] Refine
    8f4eeff03 [George314159] Update UT
    6192d4713 [Wang, Fei] revert unused
    c7f037c66 [Wang, Fei] Support bearer token in kyuubi jdbc
    f7725f761 [George314159] Refine
    f865abf01 [George314159] Refine
    9d4edd50d [George314159] Add http header config for thrift bearer token 
auth and UT
    6d0f97355 [Wang, Fei] refine
    a7e785bcd [Wang, Fei] save
    
    Lead-authored-by: Wang, Fei <[email protected]>
    Co-authored-by: George314159 <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../kyuubi/jdbc/hive/JdbcConnectionParams.java     |   3 +
 .../apache/kyuubi/jdbc/hive/KyuubiConnection.java  |  79 +++++++++++++---
 .../kyuubi/jdbc/hive/auth/HttpAuthUtils.java       |   1 +
 .../hive/auth/HttpJwtAuthRequestInterceptor.java   |  47 ++++++++++
 .../kyuubi/server/KyuubiTHttpFrontendService.scala |  27 +++++-
 .../kyuubi/server/http/ThriftHttpServlet.scala     |  49 ++++------
 .../authentication/AuthenticationAuditLogger.scala |   8 +-
 .../http/authentication/AuthenticationFilter.scala |  12 +++
 .../KyuubiHttpAuthenticationFactory.scala          |   2 +
 .../kyuubi/server/http/util/SessionManager.scala   | 103 ---------------------
 .../KyuubiOperationKerberosAndPlainAuthSuite.scala |  52 ++++++++---
 ...rationThriftHttpKerberosAndPlainAuthSuite.scala |  41 +++++++-
 12 files changed, 258 insertions(+), 166 deletions(-)

diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
index 9aba2a813f..cd9fd517ef 100644
--- 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
+++ 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
@@ -55,6 +55,9 @@ public class JdbcConnectionParams {
   public static final String AUTH_KERBEROS_AUTH_TYPE_FROM_KEYTAB = 
"fromKeytab";
   public static final String AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = 
"fromSubject";
   public static final String AUTH_KERBEROS_AUTH_TYPE_FROM_TICKET_CACHE = 
"fromTicketCache";
+  public static final String AUTH_TYPE_JWT = "jwt";
+  public static final String AUTH_TYPE_JWT_KEY = "jwt";
+  public static final String AUTH_JWT_ENV = "JWT";
   public static final String ANONYMOUS_USER = "anonymous";
   public static final String ANONYMOUS_PASSWD = "anonymous";
   public static final String USE_SSL = "ssl";
diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
index 16347771d3..4c39fb308a 100644
--- 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
+++ 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
@@ -21,6 +21,7 @@ import static 
org.apache.kyuubi.jdbc.hive.JdbcConnectionParams.*;
 import static org.apache.kyuubi.jdbc.hive.Utils.HIVE_SERVER2_RETRY_KEY;
 import static org.apache.kyuubi.jdbc.hive.Utils.HIVE_SERVER2_RETRY_TRUE;
 
+import com.google.common.base.Preconditions;
 import java.io.*;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
@@ -446,19 +447,33 @@ public class KyuubiConnection implements SQLConnection, 
KyuubiLoggable {
     if (!isSaslAuthMode()) {
       requestInterceptor = null;
     } else if (isPlainSaslAuthMode()) {
-      /*
-       * Add an interceptor to pass username/password in the header. In https 
mode, the entire
-       * information is encrypted
-       */
-      requestInterceptor =
-          new HttpBasicAuthInterceptor(
-              getUserName(),
-              getPassword(),
-              cookieStore,
-              cookieName,
-              useSsl,
-              additionalHttpHeaders,
-              customCookies);
+      if (isJwtAuthMode()) {
+        final String signedJwt = getJWT();
+        Preconditions.checkArgument(
+            signedJwt != null && !signedJwt.isEmpty(),
+            "For jwt auth mode," + " a signed jwt must be provided");
+        /*
+         * Add an interceptor to pass jwt token in the header. In https mode, 
the entire
+         * information is encrypted
+         */
+        requestInterceptor =
+            new HttpJwtAuthRequestInterceptor(
+                signedJwt, cookieStore, cookieName, useSsl, 
additionalHttpHeaders, customCookies);
+      } else {
+        /*
+         * Add an interceptor to pass username/password in the header. In 
https mode, the entire
+         * information is encrypted
+         */
+        requestInterceptor =
+            new HttpBasicAuthInterceptor(
+                getUserName(),
+                getPassword(),
+                cookieStore,
+                cookieName,
+                useSsl,
+                additionalHttpHeaders,
+                customCookies);
+      }
     } else {
       // Configure http client for kerberos-based authentication
       Subject subject = createSubject();
@@ -581,6 +596,38 @@ public class KyuubiConnection implements SQLConnection, 
KyuubiLoggable {
     return httpClientBuilder.build();
   }
 
+  private String getJWT() {
+    String jwtCredential = getJWTStringFromSession();
+    if (jwtCredential == null || jwtCredential.isEmpty()) {
+      jwtCredential = getJWTStringFromEnv();
+    }
+    return jwtCredential;
+  }
+
+  private String getJWTStringFromEnv() {
+    String jwtCredential = System.getenv(JdbcConnectionParams.AUTH_JWT_ENV);
+    if (jwtCredential == null || jwtCredential.isEmpty()) {
+      LOG.debug("No JWT is specified in env variable {}", 
JdbcConnectionParams.AUTH_JWT_ENV);
+    } else {
+      int startIndex = Math.max(0, jwtCredential.length() - 7);
+      String lastSevenChars = jwtCredential.substring(startIndex);
+      LOG.debug("Fetched JWT (ends with {}) from the env.", lastSevenChars);
+    }
+    return jwtCredential;
+  }
+
+  private String getJWTStringFromSession() {
+    String jwtCredential = 
sessConfMap.get(JdbcConnectionParams.AUTH_TYPE_JWT_KEY);
+    if (jwtCredential == null || jwtCredential.isEmpty()) {
+      LOG.debug("No JWT is specified in connection string.");
+    } else {
+      int startIndex = Math.max(0, jwtCredential.length() - 7);
+      String lastSevenChars = jwtCredential.substring(startIndex);
+      LOG.debug("Fetched JWT (ends with {}) from the session.", 
lastSevenChars);
+    }
+    return jwtCredential;
+  }
+
   /** Create underlying SSL or non-SSL transport */
   private TTransport createUnderlyingTransport() throws TTransportException {
     TTransport transport = null;
@@ -917,6 +964,12 @@ public class KyuubiConnection implements SQLConnection, 
KyuubiLoggable {
     return isSaslAuthMode() && !hasSessionValue(AUTH_PRINCIPAL);
   }
 
+  private boolean isJwtAuthMode() {
+    return JdbcConnectionParams.AUTH_TYPE_JWT.equalsIgnoreCase(
+            sessConfMap.get(JdbcConnectionParams.AUTH_TYPE))
+        || sessConfMap.containsKey(JdbcConnectionParams.AUTH_TYPE_JWT_KEY);
+  }
+
   private boolean isKerberosAuthMode() {
     return isSaslAuthMode() && hasSessionValue(AUTH_PRINCIPAL);
   }
diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/HttpAuthUtils.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/HttpAuthUtils.java
index 73d966ca7a..0802e8effd 100644
--- 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/HttpAuthUtils.java
+++ 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/HttpAuthUtils.java
@@ -29,6 +29,7 @@ import org.ietf.jgss.Oid;
 public final class HttpAuthUtils {
   public static final String AUTHORIZATION = "Authorization";
   public static final String NEGOTIATE = "Negotiate";
+  public static final String BEARER = "Bearer";
 
   /** @return Stringified Base64 encoded kerberosAuthHeader on success */
   public static String getKerberosServiceTicket(
diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/HttpJwtAuthRequestInterceptor.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/HttpJwtAuthRequestInterceptor.java
new file mode 100644
index 0000000000..6bfc0ed2e6
--- /dev/null
+++ 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/HttpJwtAuthRequestInterceptor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.jdbc.hive.auth;
+
+import java.util.Map;
+import org.apache.http.HttpRequest;
+import org.apache.http.client.CookieStore;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * This implements the logic to intercept the HTTP requests from the Hive Jdbc 
connection and adds
+ * JWT auth header.
+ */
+public class HttpJwtAuthRequestInterceptor extends HttpRequestInterceptorBase {
+  private final String signedJwt;
+
+  public HttpJwtAuthRequestInterceptor(
+      String signedJwt,
+      CookieStore cookieStore,
+      String cn,
+      boolean isSSL,
+      Map<String, String> additionalHeaders,
+      Map<String, String> customCookies) {
+    super(cookieStore, cn, isSSL, additionalHeaders, customCookies);
+    this.signedJwt = signedJwt;
+  }
+
+  @Override
+  protected void addHttpAuthHeader(HttpRequest httpRequest, HttpContext 
httpContext) {
+    httpRequest.addHeader(HttpAuthUtils.AUTHORIZATION, HttpAuthUtils.BEARER + 
" " + signedJwt);
+  }
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTHttpFrontendService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTHttpFrontendService.scala
index 1e289c6235..0fe24475a6 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTHttpFrontendService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTHttpFrontendService.scala
@@ -22,6 +22,8 @@ import java.util.concurrent.{SynchronousQueue, 
ThreadPoolExecutor, TimeUnit}
 import javax.security.sasl.AuthenticationException
 import javax.servlet.{ServletContextEvent, ServletContextListener}
 
+import scala.collection.JavaConverters._
+
 import org.apache.commons.lang3.SystemUtils
 import org.apache.hadoop.conf.Configuration
 import org.eclipse.jetty.http.HttpMethod
@@ -39,7 +41,7 @@ import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.metrics.MetricsConstants.{THRIFT_HTTP_CONN_FAIL, 
THRIFT_HTTP_CONN_OPEN, THRIFT_HTTP_CONN_TOTAL}
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.server.http.ThriftHttpServlet
-import org.apache.kyuubi.server.http.util.SessionManager
+import org.apache.kyuubi.server.http.authentication.AuthenticationFilter
 import org.apache.kyuubi.service.{Serverable, Service, ServiceUtils, 
TFrontendService}
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TCLIService, 
TOpenSessionReq}
 import org.apache.kyuubi.shaded.thrift.protocol.TBinaryProtocol
@@ -268,13 +270,30 @@ final class KyuubiTHttpFrontendService(
   }
 
   override protected def getIpAddress: String = {
-    
Option(SessionManager.getProxyHttpHeaderIpAddress).getOrElse(SessionManager.getIpAddress)
+    Option(AuthenticationFilter.getUserProxyHeaderIpAddress).getOrElse(
+      AuthenticationFilter.getUserIpAddress)
+  }
+
+  override protected def getProxyUser(
+      sessionConf: java.util.Map[String, String],
+      ipAddress: String,
+      realUser: String): String = {
+    Option(AuthenticationFilter.getProxyUserName) match {
+      case Some(proxyUser) =>
+        val proxyUserConf = Map(PROXY_USER.key -> proxyUser)
+        super.getProxyUser(
+          (sessionConf.asScala ++ proxyUserConf).asJava,
+          ipAddress,
+          realUser)
+      case None => super.getProxyUser(sessionConf, ipAddress, realUser)
+    }
   }
 
   override protected def getRealUserAndSessionUser(req: TOpenSessionReq): 
(String, String) = {
-    val realUser = 
getShortName(Option(SessionManager.getUserName).getOrElse(req.getUsername))
+    val realUser = getShortName(Option(AuthenticationFilter.getUserName)
+      .getOrElse(req.getUsername))
     // using the remote ip address instead of that in proxy http header for 
authentication
-    val ipAddress: String = SessionManager.getIpAddress
+    val ipAddress: String = AuthenticationFilter.getUserIpAddress
     val sessionUser: String = if (req.getConfiguration == null) {
       realUser
     } else {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/ThriftHttpServlet.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/ThriftHttpServlet.scala
index d781faf3bf..4df3c427ec 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/ThriftHttpServlet.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/ThriftHttpServlet.scala
@@ -29,8 +29,8 @@ import scala.collection.mutable
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.FRONTEND_PROXY_HTTP_CLIENT_IP_HEADER
-import org.apache.kyuubi.server.http.authentication.AuthenticationFilter
-import org.apache.kyuubi.server.http.util.{CookieSigner, HttpAuthUtils, 
SessionManager}
+import 
org.apache.kyuubi.server.http.authentication.{AuthenticationAuditLogger, 
AuthenticationFilter}
+import org.apache.kyuubi.server.http.util.{CookieSigner, HttpAuthUtils}
 import org.apache.kyuubi.server.http.util.HttpAuthUtils.AUTHORIZATION_HEADER
 import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
 import org.apache.kyuubi.shaded.thrift.TProcessor
@@ -102,6 +102,12 @@ class ThriftHttpServlet(
         }
       }
 
+      AuthenticationFilter.HTTP_CLIENT_IP_ADDRESS.set(request.getRemoteAddr)
+      AuthenticationFilter.HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS.set(
+        request.getHeader(conf.get(FRONTEND_PROXY_HTTP_CLIENT_IP_HEADER)))
+      
Option(request.getHeader(X_FORWARDED_FOR_HEADER)).map(_.split(",").toList).foreach(
+        AuthenticationFilter.HTTP_FORWARDED_ADDRESSES.set)
+
       // If the cookie based authentication is not enabled or the request does 
not have a valid
       // cookie, use authentication depending on the server setup.
       if (clientUserName == null) {
@@ -109,31 +115,11 @@ class ThriftHttpServlet(
       }
 
       require(clientUserName != null, "No valid authorization provided")
-      debug("Client username: " + clientUserName)
-
       // Set the thread local username to be used for doAs if true
-      SessionManager.setUserName(clientUserName)
-
+      AuthenticationFilter.HTTP_CLIENT_USER_NAME.set(clientUserName)
       // find proxy user if any from query param
-      val doAsQueryParam = getDoAsQueryParam(request.getQueryString)
-      if (doAsQueryParam != null) 
SessionManager.setProxyUserName(doAsQueryParam)
-
-      val clientIpAddress = request.getRemoteAddr
-      debug("Client IP Address: " + clientIpAddress)
-      SessionManager.setIpAddress(clientIpAddress)
-
-      
Option(request.getHeader(conf.get(FRONTEND_PROXY_HTTP_CLIENT_IP_HEADER))).foreach
 {
-        ipAddress =>
-          debug("Proxy Http Header Client IP Address: " + ipAddress)
-          SessionManager.setProxyHttpHeaderIpAddress(ipAddress)
-      }
-
-      val forwarded_for = request.getHeader(X_FORWARDED_FOR_HEADER)
-      if (forwarded_for != null) {
-        debug(X_FORWARDED_FOR_HEADER + ":" + forwarded_for)
-        val forwardedAddresses = forwarded_for.split(",").toList
-        SessionManager.setForwardedAddresses(forwardedAddresses)
-      } else SessionManager.setForwardedAddresses(List.empty[String])
+      AuthenticationFilter.HTTP_CLIENT_PROXY_USER_NAME.set(
+        getDoAsQueryParam(request.getQueryString))
 
       // Generate new cookie and add it to the response
       if (requireNewCookie && !authFactory.saslDisabled) {
@@ -157,12 +143,13 @@ class ThriftHttpServlet(
         error("Error: ", e)
         throw e
     } finally {
-      // Clear the thread locals
-      SessionManager.clearUserName()
-      SessionManager.clearIpAddress()
-      SessionManager.clearProxyHttpHeaderIpAddress()
-      SessionManager.clearProxyUserName()
-      SessionManager.clearForwardedAddresses()
+      AuthenticationAuditLogger.audit(request, response)
+      AuthenticationFilter.HTTP_CLIENT_USER_NAME.remove()
+      AuthenticationFilter.HTTP_CLIENT_IP_ADDRESS.remove()
+      AuthenticationFilter.HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS.remove()
+      AuthenticationFilter.HTTP_AUTH_TYPE.remove()
+      AuthenticationFilter.HTTP_CLIENT_PROXY_USER_NAME.remove()
+      AuthenticationFilter.HTTP_FORWARDED_ADDRESSES.remove()
     }
   }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthenticationAuditLogger.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthenticationAuditLogger.scala
index ac74c449bd..43301dfab2 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthenticationAuditLogger.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthenticationAuditLogger.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.server.http.authentication
 import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
 
 import org.apache.kyuubi.Logging
-import 
org.apache.kyuubi.server.http.authentication.AuthenticationFilter.{HTTP_AUTH_TYPE,
 HTTP_CLIENT_IP_ADDRESS, HTTP_CLIENT_USER_NAME, 
HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS}
+import org.apache.kyuubi.server.http.authentication.AuthenticationFilter._
 
 object AuthenticationAuditLogger extends Logging {
   final private val AUDIT_BUFFER = new ThreadLocal[StringBuilder]() {
@@ -31,8 +31,14 @@ object AuthenticationAuditLogger extends Logging {
     val sb = AUDIT_BUFFER.get()
     sb.setLength(0)
     
sb.append(s"user=${HTTP_CLIENT_USER_NAME.get()}(auth:${HTTP_AUTH_TYPE.get()})").append("\t")
+    if (HTTP_CLIENT_PROXY_USER_NAME.get() != null) {
+      sb.append(s"proxyUser=${HTTP_CLIENT_PROXY_USER_NAME.get()}").append("\t")
+    }
     sb.append(s"ip=${HTTP_CLIENT_IP_ADDRESS.get()}").append("\t")
     
sb.append(s"proxyIp=${HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS.get()}").append("\t")
+    if (HTTP_FORWARDED_ADDRESSES.get().nonEmpty) {
+      
sb.append(s"forwardedFor=${getForwardedAddresses.mkString(",")}").append("\t")
+    }
     sb.append(s"method=${request.getMethod}").append("\t")
     sb.append(s"uri=${request.getRequestURI}").append("\t")
     sb.append(s"params=${request.getQueryString}").append("\t")
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthenticationFilter.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthenticationFilter.scala
index a2d499ab3e..423d239be4 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthenticationFilter.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthenticationFilter.scala
@@ -143,6 +143,8 @@ class AuthenticationFilter(conf: KyuubiConf) extends Filter 
with Logging {
         HTTP_CLIENT_IP_ADDRESS.remove()
         HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS.remove()
         HTTP_AUTH_TYPE.remove()
+        HTTP_CLIENT_PROXY_USER_NAME.remove()
+        HTTP_FORWARDED_ADDRESSES.remove()
         httpResponse.sendError(HttpServletResponse.SC_FORBIDDEN, e.getMessage)
     } finally {
       AuthenticationAuditLogger.audit(httpRequest, httpResponse)
@@ -189,12 +191,22 @@ object AuthenticationFilter {
   final val HTTP_AUTH_TYPE = new ThreadLocal[String]() {
     override protected def initialValue(): String = null
   }
+  final val HTTP_CLIENT_PROXY_USER_NAME = new ThreadLocal[String]() {
+    override protected def initialValue(): String = null
+  }
+  final val HTTP_FORWARDED_ADDRESSES = new ThreadLocal[List[String]] {
+    override protected def initialValue: List[String] = List.empty
+  }
 
   def getUserIpAddress: String = HTTP_CLIENT_IP_ADDRESS.get
 
   def getUserProxyHeaderIpAddress: String = 
HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS.get()
 
+  def getForwardedAddresses: List[String] = HTTP_FORWARDED_ADDRESSES.get
+
   def getUserName: String = HTTP_CLIENT_USER_NAME.get
 
+  def getProxyUserName: String = HTTP_CLIENT_PROXY_USER_NAME.get
+
   def getAuthType: String = HTTP_AUTH_TYPE.get()
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/KyuubiHttpAuthenticationFactory.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/KyuubiHttpAuthenticationFactory.scala
index ca95fda3d9..ee8dd3989d 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/KyuubiHttpAuthenticationFactory.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/KyuubiHttpAuthenticationFactory.scala
@@ -81,6 +81,8 @@ class KyuubiHttpAuthenticationFactory(conf: KyuubiConf) {
             AuthenticationFilter.HTTP_CLIENT_IP_ADDRESS.remove()
             AuthenticationFilter.HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS.remove()
             AuthenticationFilter.HTTP_AUTH_TYPE.remove()
+            AuthenticationFilter.HTTP_CLIENT_PROXY_USER_NAME.remove()
+            AuthenticationFilter.HTTP_FORWARDED_ADDRESSES.remove()
           }
         }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/util/SessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/util/SessionManager.scala
deleted file mode 100644
index acf4a25d0b..0000000000
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/util/SessionManager.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.server.http.util
-
-import org.apache.kyuubi.Logging
-
-object SessionManager extends Logging {
-
-  private val threadLocalIpAddress: ThreadLocal[String] = new 
ThreadLocal[String]
-
-  def setIpAddress(ipAddress: String): Unit = {
-    threadLocalIpAddress.set(ipAddress)
-  }
-
-  def clearIpAddress(): Unit = {
-    threadLocalIpAddress.remove()
-  }
-
-  def getIpAddress: String = {
-    threadLocalIpAddress.get
-  }
-
-  private val threadLocalProxyHttpHeaderIpAddress: ThreadLocal[String] = new 
ThreadLocal[String]
-
-  def setProxyHttpHeaderIpAddress(realIpAddress: String): Unit = {
-    threadLocalProxyHttpHeaderIpAddress.set(realIpAddress)
-  }
-
-  def clearProxyHttpHeaderIpAddress(): Unit = {
-    threadLocalProxyHttpHeaderIpAddress.remove()
-  }
-
-  def getProxyHttpHeaderIpAddress: String = {
-    threadLocalProxyHttpHeaderIpAddress.get
-  }
-
-  private val threadLocalForwardedAddresses: ThreadLocal[List[String]] =
-    new ThreadLocal[List[String]]
-
-  def setForwardedAddresses(ipAddress: List[String]): Unit = {
-    threadLocalForwardedAddresses.set(ipAddress)
-  }
-
-  def clearForwardedAddresses(): Unit = {
-    threadLocalForwardedAddresses.remove()
-  }
-
-  def getForwardedAddresses: List[String] = {
-    threadLocalForwardedAddresses.get
-  }
-
-  private val threadLocalUserName: ThreadLocal[String] = new 
ThreadLocal[String]() {
-    override protected def initialValue: String = {
-      null
-    }
-  }
-
-  def setUserName(userName: String): Unit = {
-    threadLocalUserName.set(userName)
-  }
-
-  def clearUserName(): Unit = {
-    threadLocalUserName.remove()
-  }
-
-  def getUserName: String = {
-    threadLocalUserName.get
-  }
-
-  private val threadLocalProxyUserName: ThreadLocal[String] = new 
ThreadLocal[String]() {
-    override protected def initialValue: String = {
-      null
-    }
-  }
-
-  def setProxyUserName(userName: String): Unit = {
-    debug("setting proxy user name based on query param to: " + userName)
-    threadLocalProxyUserName.set(userName)
-  }
-
-  def getProxyUserName: String = {
-    threadLocalProxyUserName.get
-  }
-
-  def clearProxyUserName(): Unit = {
-    threadLocalProxyUserName.remove()
-  }
-}
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationKerberosAndPlainAuthSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationKerberosAndPlainAuthSuite.scala
index 31cde63973..cdbbc0aed1 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationKerberosAndPlainAuthSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationKerberosAndPlainAuthSuite.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.kyuubi.{KerberizedTestHelper, Utils, WithKyuubiServer}
 import org.apache.kyuubi.config.KyuubiConf
-import 
org.apache.kyuubi.service.authentication.{UserDefineAuthenticationProviderImpl, 
WithLdapServer}
+import org.apache.kyuubi.service.authentication.{AuthTypes, 
UserDefineAuthenticationProviderImpl, WithLdapServer}
 
 class KyuubiOperationKerberosAndPlainAuthSuite extends WithKyuubiServer with 
KerberizedTestHelper
   with WithLdapServer with HiveJDBCTestHelper {
@@ -43,6 +43,12 @@ class KyuubiOperationKerberosAndPlainAuthSuite extends 
WithKyuubiServer with Ker
     s";kyuubiClientPrincipal=$testPrincipal;kyuubiClientKeytab=$testKeytab"
   private val currentUser = UserGroupInformation.getCurrentUser
 
+  protected def authMethods = 
conf.get(KyuubiConf.AUTHENTICATION_METHOD).map(AuthTypes.withName)
+  protected def kerberosAuthEnabled: Boolean = 
authMethods.contains(AuthTypes.KERBEROS)
+  protected def nonKerberosAuth = authMethods.filterNot(_ == 
AuthTypes.KERBEROS).headOption
+  protected def ldapAuthEnabled = nonKerberosAuth.contains(AuthTypes.LDAP)
+  protected def customAuthEnabled = nonKerberosAuth.contains(AuthTypes.CUSTOM)
+
   override def beforeAll(): Unit = {
     super.beforeAll()
   }
@@ -114,26 +120,50 @@ class KyuubiOperationKerberosAndPlainAuthSuite extends 
WithKyuubiServer with Ker
   }
 
   test("test with LDAP authentication") {
-    val conn = DriverManager.getConnection(jdbcUrlWithConf, ldapUser, 
ldapUserPasswd)
-    try {
-      val statement = conn.createStatement()
-      val resultSet = statement.executeQuery("select engine_name()")
-      assert(resultSet.next())
-      assert(resultSet.getString(1).nonEmpty)
-    } finally {
-      conn.close()
+    if (ldapAuthEnabled) {
+      val conn = DriverManager.getConnection(jdbcUrlWithConf, ldapUser, 
ldapUserPasswd)
+      try {
+        val statement = conn.createStatement()
+        val resultSet = statement.executeQuery("select engine_name()")
+        assert(resultSet.next())
+        assert(resultSet.getString(1).nonEmpty)
+      } finally {
+        conn.close()
+      }
+    } else {
+      intercept[SQLException] {
+        val conn = DriverManager.getConnection(jdbcUrlWithConf, ldapUser, 
ldapUserPasswd)
+        try {
+          val statement = conn.createStatement()
+          statement.executeQuery("select engine_name()")
+        } finally {
+          conn.close()
+        }
+      }
     }
   }
 
   test("only the first specified plain auth type is valid") {
-    intercept[SQLException] {
+    if (customAuthEnabled) {
       val conn = DriverManager.getConnection(jdbcUrlWithConf, customUser, 
customPasswd)
       try {
         val statement = conn.createStatement()
-        statement.executeQuery("select engine_name()")
+        val resultSet = statement.executeQuery("select engine_name()")
+        assert(resultSet.next())
+        assert(resultSet.getString(1).nonEmpty)
       } finally {
         conn.close()
       }
+    } else {
+      intercept[SQLException] {
+        val conn = DriverManager.getConnection(jdbcUrlWithConf, customUser, 
customPasswd)
+        try {
+          val statement = conn.createStatement()
+          statement.executeQuery("select engine_name()")
+        } finally {
+          conn.close()
+        }
+      }
     }
   }
 
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/thrift/http/KyuubiOperationThriftHttpKerberosAndPlainAuthSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/thrift/http/KyuubiOperationThriftHttpKerberosAndPlainAuthSuite.scala
index 941e121a6c..75c9e9018d 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/thrift/http/KyuubiOperationThriftHttpKerberosAndPlainAuthSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/thrift/http/KyuubiOperationThriftHttpKerberosAndPlainAuthSuite.scala
@@ -17,13 +17,16 @@
 
 package org.apache.kyuubi.operation.thrift.http
 
+import java.sql.{DriverManager, SQLException}
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols
+import org.apache.kyuubi.jdbc.hive.JdbcConnectionParams
 import org.apache.kyuubi.operation.KyuubiOperationKerberosAndPlainAuthSuite
-import 
org.apache.kyuubi.service.authentication.UserDefineAuthenticationProviderImpl
+import 
org.apache.kyuubi.service.authentication.{UserDefineAuthenticationProviderImpl, 
UserDefineTokenAuthenticationProviderImpl}
 
 class KyuubiOperationThriftHttpKerberosAndPlainAuthSuite
   extends KyuubiOperationKerberosAndPlainAuthSuite {
@@ -49,7 +52,7 @@ class KyuubiOperationThriftHttpKerberosAndPlainAuthSuite
     UserGroupInformation.setConfiguration(config)
     assert(UserGroupInformation.isSecurityEnabled)
 
-    KyuubiConf().set(KyuubiConf.AUTHENTICATION_METHOD, Seq("KERBEROS", "LDAP", 
"CUSTOM"))
+    KyuubiConf().set(KyuubiConf.AUTHENTICATION_METHOD, Seq("KERBEROS", 
"CUSTOM", "LDAP"))
       .set(KyuubiConf.SERVER_KEYTAB, testKeytab)
       .set(KyuubiConf.SERVER_PRINCIPAL, testPrincipal)
       .set(KyuubiConf.AUTHENTICATION_LDAP_URL, ldapUrl)
@@ -57,11 +60,43 @@ class KyuubiOperationThriftHttpKerberosAndPlainAuthSuite
       .set(
         KyuubiConf.AUTHENTICATION_CUSTOM_CLASS,
         classOf[UserDefineAuthenticationProviderImpl].getCanonicalName)
+      .set(
+        KyuubiConf.AUTHENTICATION_CUSTOM_BEARER_CLASS,
+        classOf[UserDefineAuthenticationProviderImpl].getCanonicalName)
       .set(KyuubiConf.SERVER_SPNEGO_KEYTAB, testKeytab)
       .set(KyuubiConf.SERVER_SPNEGO_PRINCIPAL, testSpnegoPrincipal)
   }
 
   override protected def getJdbcUrl: String =
     
s"jdbc:hive2://${server.frontendServices.head.connectionUrl}/default;transportMode=http;"
 +
-      s"httpPath=cliservice"
+      s"httpPath=cliservice;"
+
+  test("test with valid CUSTOM http bearer authentication") {
+    withSessionConf(Map(JdbcConnectionParams.AUTH_TYPE_JWT_KEY
+      -> UserDefineTokenAuthenticationProviderImpl.VALID_TOKEN))()() {
+      val conn = DriverManager.getConnection(jdbcUrlWithConf)
+      try {
+        val statement = conn.createStatement()
+        val resultSet = statement.executeQuery("select engine_name()")
+        assert(resultSet.next())
+        assert(resultSet.getString(1).nonEmpty)
+      } finally {
+        conn.close()
+      }
+    }
+  }
+
+  test("test with invalid CUSTOM http bearer authentication") {
+    withSessionConf(Map(JdbcConnectionParams.AUTH_TYPE_JWT_KEY -> 
"badToken"))()() {
+      intercept[SQLException] {
+        val conn = DriverManager.getConnection(jdbcUrlWithConf)
+        try {
+          val statement = conn.createStatement()
+          statement.executeQuery("select engine_name()")
+        } finally {
+          conn.close()
+        }
+      }
+    }
+  }
 }

Reply via email to