This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new a73cec258 fix(java-client): fix client renew TGT thread leak when 
request table doesnt exist (#1970)
a73cec258 is described below

commit a73cec2583e9d201f80f6eb97b3662cb407dd297
Author: Samunroyu <[email protected]>
AuthorDate: Wed Apr 17 12:00:49 2024 +0800

    fix(java-client): fix client renew TGT thread leak when request table 
doesnt exist (#1970)
    
    In the Kerberos environment, java client will check TGT and relogin
    kerberos if needed every 10 seconds by a thread. But when client
    requests table which doesnt exist will cause threads leak.
    
    The root cause this problem is when Pegasus client close the client
    but don't close TGT thread pool.
    
    This patch adds KerberosProtocol close function.
---
 .../apache/pegasus/rpc/async/ClusterManager.java   |  2 ++
 .../rpc/interceptor/ReplicaSessionInterceptor.java |  6 +++-
 .../ReplicaSessionInterceptorManager.java          | 10 ++++++-
 .../org/apache/pegasus/security/AuthProtocol.java  |  6 +++-
 .../security/AuthReplicaSessionInterceptor.java    |  8 +++++-
 .../apache/pegasus/security/KerberosProtocol.java  | 32 +++++++++++++++++-----
 6 files changed, 53 insertions(+), 11 deletions(-)

diff --git 
a/java-client/src/main/java/org/apache/pegasus/rpc/async/ClusterManager.java 
b/java-client/src/main/java/org/apache/pegasus/rpc/async/ClusterManager.java
index 01899abd6..1df0a401b 100644
--- a/java-client/src/main/java/org/apache/pegasus/rpc/async/ClusterManager.java
+++ b/java-client/src/main/java/org/apache/pegasus/rpc/async/ClusterManager.java
@@ -188,6 +188,8 @@ public class ClusterManager extends Cluster {
 
   @Override
   public void close() {
+    sessionInterceptorManager.close();
+
     if (enableCounter) {
       MetricsManager.finish();
     }
diff --git 
a/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptor.java
 
b/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptor.java
index c623e242b..5b7dcfe0b 100644
--- 
a/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptor.java
+++ 
b/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptor.java
@@ -18,13 +18,17 @@
  */
 package org.apache.pegasus.rpc.interceptor;
 
+import java.io.Closeable;
 import org.apache.pegasus.rpc.async.ReplicaSession;
 
-public interface ReplicaSessionInterceptor {
+public interface ReplicaSessionInterceptor extends Closeable {
   // The behavior when a rpc session is connected.
   void onConnected(ReplicaSession session);
 
   // The behavior when rpc session is sending a message.
   // @returns false if this message shouldn't be sent.
   boolean onSendMessage(ReplicaSession session, final 
ReplicaSession.RequestEntry entry);
+
+  @Override
+  void close();
 }
diff --git 
a/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java
 
b/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java
index 44e46e560..e08c44590 100644
--- 
a/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java
+++ 
b/java-client/src/main/java/org/apache/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pegasus.rpc.interceptor;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.pegasus.client.ClientOptions;
 import org.apache.pegasus.rpc.async.ReplicaSession;
 import org.apache.pegasus.security.AuthReplicaSessionInterceptor;
 
-public class ReplicaSessionInterceptorManager {
+public class ReplicaSessionInterceptorManager implements Closeable {
   private List<ReplicaSessionInterceptor> interceptors = new ArrayList<>();
 
   public ReplicaSessionInterceptorManager(ClientOptions options) {
@@ -49,4 +50,11 @@ public class ReplicaSessionInterceptorManager {
     }
     return true;
   }
+
+  @Override
+  public void close() {
+    for (ReplicaSessionInterceptor interceptor : interceptors) {
+      interceptor.close();
+    }
+  }
 }
diff --git 
a/java-client/src/main/java/org/apache/pegasus/security/AuthProtocol.java 
b/java-client/src/main/java/org/apache/pegasus/security/AuthProtocol.java
index 11beb40e0..9aefe330c 100644
--- a/java-client/src/main/java/org/apache/pegasus/security/AuthProtocol.java
+++ b/java-client/src/main/java/org/apache/pegasus/security/AuthProtocol.java
@@ -18,13 +18,17 @@
  */
 package org.apache.pegasus.security;
 
+import java.io.Closeable;
 import org.apache.pegasus.rpc.async.ReplicaSession;
 
 /** authentiation protocol */
-public interface AuthProtocol {
+public interface AuthProtocol extends Closeable {
   String name();
   /** start the authentiate process */
   void authenticate(ReplicaSession session);
 
   boolean isAuthRequest(final ReplicaSession.RequestEntry entry);
+
+  @Override
+  void close();
 }
diff --git 
a/java-client/src/main/java/org/apache/pegasus/security/AuthReplicaSessionInterceptor.java
 
b/java-client/src/main/java/org/apache/pegasus/security/AuthReplicaSessionInterceptor.java
index fe60dbe8e..6f9952723 100644
--- 
a/java-client/src/main/java/org/apache/pegasus/security/AuthReplicaSessionInterceptor.java
+++ 
b/java-client/src/main/java/org/apache/pegasus/security/AuthReplicaSessionInterceptor.java
@@ -18,11 +18,12 @@
  */
 package org.apache.pegasus.security;
 
+import java.io.Closeable;
 import org.apache.pegasus.client.ClientOptions;
 import org.apache.pegasus.rpc.async.ReplicaSession;
 import org.apache.pegasus.rpc.interceptor.ReplicaSessionInterceptor;
 
-public class AuthReplicaSessionInterceptor implements 
ReplicaSessionInterceptor {
+public class AuthReplicaSessionInterceptor implements 
ReplicaSessionInterceptor, Closeable {
   private AuthProtocol protocol;
 
   public AuthReplicaSessionInterceptor(ClientOptions options) throws 
IllegalArgumentException {
@@ -39,4 +40,9 @@ public class AuthReplicaSessionInterceptor implements 
ReplicaSessionInterceptor
     // tryPendRequest returns false means that the negotiation is succeed now
     return protocol.isAuthRequest(entry) || !session.tryPendRequest(entry);
   }
+
+  @Override
+  public void close() {
+    protocol.close();
+  }
 }
diff --git 
a/java-client/src/main/java/org/apache/pegasus/security/KerberosProtocol.java 
b/java-client/src/main/java/org/apache/pegasus/security/KerberosProtocol.java
index b5150aa49..8a29e8d9f 100644
--- 
a/java-client/src/main/java/org/apache/pegasus/security/KerberosProtocol.java
+++ 
b/java-client/src/main/java/org/apache/pegasus/security/KerberosProtocol.java
@@ -19,6 +19,8 @@
 package org.apache.pegasus.security;
 
 import com.sun.security.auth.callback.TextCallbackHandler;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -50,19 +52,21 @@ class KerberosProtocol implements AuthProtocol {
   // request. The JAAS framework defines the term "subject" to represent the 
source of a request. A
   // subject may be any entity, such as a person or a service.
   private Subject subject;
-  private String serviceName;
-  private String serviceFqdn;
-  private String keyTab;
-  private String principal;
+  private final String serviceName;
+  private final String serviceFqdn;
+  private final String keyTab;
+  private final String principal;
   final int CHECK_TGT_INTEVAL_SECONDS = 10;
   final ScheduledExecutorService service =
       Executors.newSingleThreadScheduledExecutor(
           new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
-              Thread t = new Thread(r);
+              String timestamp = 
LocalTime.now().format(DateTimeFormatter.ofPattern("HHmmss"));
+              Thread t = new Thread(r, "TGT renew for pegasus - " + timestamp);
               t.setDaemon(true);
-              t.setName("TGT renew for pegasus");
+              t.setUncaughtExceptionHandler(
+                  (thread, error) -> logger.error("Uncaught exception", 
error));
               return t;
             }
           });
@@ -97,7 +101,16 @@ class KerberosProtocol implements AuthProtocol {
 
   private void scheduleCheckTGTAndRelogin() {
     service.scheduleAtFixedRate(
-        () -> checkTGTAndRelogin(),
+        () -> {
+          try {
+            checkTGTAndRelogin();
+          } catch (Exception e) {
+            logger.warn(
+                "check TGT and ReLogin kerberos failed , will retry after {} 
seconds.",
+                CHECK_TGT_INTEVAL_SECONDS,
+                e);
+          }
+        },
         CHECK_TGT_INTEVAL_SECONDS,
         CHECK_TGT_INTEVAL_SECONDS,
         TimeUnit.SECONDS);
@@ -199,4 +212,9 @@ class KerberosProtocol implements AuthProtocol {
       }
     };
   }
+
+  @Override
+  public void close() {
+    service.shutdown();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to