This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a9dad0fc58da860fb0af1a3ccb902fc3fdaf8c04
Author: Rajini Sivaram <[email protected]>
AuthorDate: Thu Apr 29 14:32:50 2021 +0100

    KAFKA-12730; Avoid duplicate logout if Kerberos login fails (#10611)
    
    From Java 9 onwards, LoginContext#logout() throws an NPE if invoked 
multiple times due to https://bugs.openjdk.java.net/browse/JDK-8173069. 
KerberosLogin currently attempts logout followed by login in a background 
refresh thread. If login fails we retry the same sequence. As a result, a 
single login failure prevents subsequent re-login. And clients will never be 
able to authenticate successfully after the first failure, until the process is 
restarted. The commit checks if logout is ne [...]
    
    Reviewers: Manikumar Reddy <[email protected]>
---
 .../common/security/kerberos/KerberosLogin.java    | 14 ++++-
 .../kafka/server/GssapiAuthenticationTest.scala    | 64 ++++++++++++++++++----
 2 files changed, 63 insertions(+), 15 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index f39f35c..a91a964 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -360,17 +360,25 @@ public class KerberosLogin extends AbstractLogin {
             lastLogin = currentElapsedTime();
             //clear up the kerberos state. But the tokens are not cleared! As 
per
             //the Java kerberos login module code, only the kerberos 
credentials
-            //are cleared
-            logout();
+            //are cleared. If previous logout succeeded but login failed, we 
shouldn't
+            //logout again since duplicate logout causes NPE from Java 9 
onwards.
+            if (subject != null && !subject.getPrincipals().isEmpty()) {
+                logout();
+            }
             //login and also update the subject field of this instance to
             //have the new credentials (pass it to the LoginContext 
constructor)
             loginContext = new LoginContext(contextName(), subject, null, 
configuration());
             log.info("Initiating re-login for {}", principal);
-            loginContext.login();
+            login(loginContext);
         }
     }
 
     // Visibility to override for testing
+    protected void login(LoginContext loginContext) throws LoginException {
+        loginContext.login();
+    }
+
+    // Visibility to override for testing
     protected void logout() throws LoginException {
         loginContext.logout();
     }
diff --git 
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala 
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index fa21a94..efb51f1 100644
--- 
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -23,6 +23,7 @@ import java.time.Duration
 import java.util.{Collections, Properties}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 
+import javax.security.auth.login.LoginContext
 import kafka.api.{Both, IntegrationTestHarness, SaslSetup}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.CommonClientConfigs
@@ -100,6 +101,29 @@ class GssapiAuthenticationTest extends 
IntegrationTestHarness with SaslSetup {
   }
 
   /**
+   * Verifies that if login fails, subsequent re-login without failures works 
and clients
+   * are able to connect after the second re-login. Verifies that logout is 
performed only once
+   * since duplicate logouts without successful login results in NPE from Java 
9 onwards.
+   */
+  @Test
+  def testLoginFailure(): Unit = {
+    val selector = createSelectorWithRelogin()
+    try {
+      val login = TestableKerberosLogin.instance
+      assertNotNull(login)
+      login.loginException = Some(new RuntimeException("Test exception to fail 
login"))
+      executor.submit(() => login.reLogin(), 0)
+      executor.submit(() => login.reLogin(), 0)
+
+      verifyRelogin(selector, login)
+      assertEquals(2, login.loginAttempts)
+      assertEquals(1, login.logoutAttempts)
+    } finally {
+      selector.close()
+    }
+  }
+
+  /**
    * Verifies that there are no authentication failures during Kerberos 
re-login. If authentication
    * is performed when credentials are unavailable between logout and login, 
we handle it as a
    * transient error and not an authentication failure so that clients may 
retry.
@@ -111,23 +135,26 @@ class GssapiAuthenticationTest extends 
IntegrationTestHarness with SaslSetup {
       val login = TestableKerberosLogin.instance
       assertNotNull(login)
       executor.submit(() => login.reLogin(), 0)
-
-      val node1 = "1"
-      selector.connect(node1, serverAddr, 1024, 1024)
-      login.logoutResumeLatch.countDown()
-      login.logoutCompleteLatch.await(15, TimeUnit.SECONDS)
-      assertFalse("Authenticated during re-login", 
pollUntilReadyOrDisconnected(selector, node1))
-
-      login.reLoginResumeLatch.countDown()
-      login.reLoginCompleteLatch.await(15, TimeUnit.SECONDS)
-      val node2 = "2"
-      selector.connect(node2, serverAddr, 1024, 1024)
-      assertTrue("Authenticated failed after re-login", 
pollUntilReadyOrDisconnected(selector, node2))
+      verifyRelogin(selector, login)
     } finally {
       selector.close()
     }
   }
 
+  private def verifyRelogin(selector: Selector, login: TestableKerberosLogin): 
Unit = {
+    val node1 = "1"
+    selector.connect(node1, serverAddr, 1024, 1024)
+    login.logoutResumeLatch.countDown()
+    login.logoutCompleteLatch.await(15, TimeUnit.SECONDS)
+    assertFalse("Authenticated during re-login", 
pollUntilReadyOrDisconnected(selector, node1))
+
+    login.reLoginResumeLatch.countDown()
+    login.reLoginCompleteLatch.await(15, TimeUnit.SECONDS)
+    val node2 = "2"
+    selector.connect(node2, serverAddr, 1024, 1024)
+    assertTrue("Authenticated failed after re-login", 
pollUntilReadyOrDisconnected(selector, node2))
+  }
+
   /**
    * Tests that Kerberos error `Server not found in Kerberos database (7)` is 
handled
    * as a fatal authentication failure.
@@ -258,6 +285,9 @@ class TestableKerberosLogin extends KerberosLogin {
   val logoutCompleteLatch = new CountDownLatch(1)
   val reLoginResumeLatch = new CountDownLatch(1)
   val reLoginCompleteLatch = new CountDownLatch(1)
+  @volatile var loginException: Option[RuntimeException] = None
+  @volatile var loginAttempts = 0
+  @volatile var logoutAttempts = 0
 
   assertNull(TestableKerberosLogin.instance)
   TestableKerberosLogin.instance = this
@@ -267,7 +297,17 @@ class TestableKerberosLogin extends KerberosLogin {
     reLoginCompleteLatch.countDown()
   }
 
+  override protected def login(loginContext: LoginContext): Unit = {
+    loginAttempts += 1
+    loginException.foreach { e =>
+      loginException = None
+      throw e
+    }
+    super.login(loginContext)
+  }
+
   override protected def logout(): Unit = {
+    logoutAttempts += 1
     logoutResumeLatch.await(15, TimeUnit.SECONDS)
     super.logout()
     logoutCompleteLatch.countDown()

Reply via email to