This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ed8659b KAFKA-10727; Handle Kerberos error during re-login as
transient failure in clients (#9605)
ed8659b is described below
commit ed8659b4a09a4affff6798b8077ed2d8fb94b6da
Author: Rajini Sivaram <[email protected]>
AuthorDate: Mon Nov 23 09:04:16 2020 +0000
KAFKA-10727; Handle Kerberos error during re-login as transient failure in
clients (#9605)
We use a background thread for Kerberos to perform re-login before tickets
expire. The thread performs logout() followed by login(), relying on the Java
library to clear and then populate credentials in Subject. This leaves a timing
window where clients fail to authenticate because credentials are not
available. We cannot introduce any form of locking since authentication is
performed on the network thread. So this commit treats NO_CRED as a transient
failure rather than a fatal authe [...]
Reviewers: Ron Dagostino <[email protected]>, Manikumar Reddy
<[email protected]>
---
.../kafka/common/network/SaslChannelBuilder.java | 2 +-
.../authenticator/SaslClientAuthenticator.java | 8 +-
.../common/security/kerberos/KerberosError.java | 19 ++++
.../common/security/kerberos/KerberosLogin.java | 9 +-
.../kafka/server/GssapiAuthenticationTest.scala | 107 ++++++++++++++++++---
5 files changed, 126 insertions(+), 19 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index a29148a..f01c4ef 100644
---
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -327,7 +327,7 @@ public class SaslChannelBuilder implements ChannelBuilder,
ListenerReconfigurabl
}
}
- private Class<? extends Login> defaultLoginClass() {
+ protected Class<? extends Login> defaultLoginClass() {
if (jaasContexts.containsKey(SaslConfigs.GSSAPI_MECHANISM))
return KerberosLogin.class;
if
(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(clientSaslMechanism))
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 9c99dab..00a4bfc 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -539,15 +539,17 @@ public class SaslClientAuthenticator implements
Authenticator {
" Users must configure FQDN of kafka brokers when
authenticating using SASL and" +
" `socketChannel.socket().getInetAddress().getHostName()`
must match the hostname in `principal/hostname@realm`";
}
- error += " Kafka Client will go to AUTHENTICATION_FAILED state.";
//Unwrap the SaslException inside `PrivilegedActionException`
Throwable cause = e.getCause();
// Treat transient Kerberos errors as non-fatal SaslExceptions
that are processed as I/O exceptions
// and all other failures as fatal SaslAuthenticationException.
- if (kerberosError != null && kerberosError.retriable())
+ if ((kerberosError != null && kerberosError.retriable()) ||
(kerberosError == null && KerberosError.isRetriableClientGssException(e))) {
+ error += " Kafka Client will retry.";
throw new SaslException(error, cause);
- else
+ } else {
+ error += " Kafka Client will go to AUTHENTICATION_FAILED
state.";
throw new SaslAuthenticationException(error, cause);
+ }
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
index 9c76482..4b8e8e0 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.security.kerberos;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.utils.Java;
+import org.ietf.jgss.GSSException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,4 +110,22 @@ public enum KerberosError {
}
return null;
}
+
+ /**
+ * Returns true if the exception should be handled as a transient failure
on clients.
+ * We handle GSSException.NO_CRED as retriable on the client-side since
this may
+ * occur during re-login if a clients attempts to authentication after
logout, but
+ * before the subsequent login.
+ */
+ public static boolean isRetriableClientGssException(Exception exception) {
+ Throwable cause = exception.getCause();
+ while (cause != null && !(cause instanceof GSSException)) {
+ cause = cause.getCause();
+ }
+ if (cause != null) {
+ GSSException gssException = (GSSException) cause;
+ return gssException.getMajor() == GSSException.NO_CRED;
+ }
+ return false;
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index 5efd722..9626da8 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -346,7 +346,7 @@ public class KerberosLogin extends AbstractLogin {
* Re-login a principal. This method assumes that {@link #login()} has
happened already.
* @throws javax.security.auth.login.LoginException on a failure
*/
- private void reLogin() throws LoginException {
+ protected void reLogin() throws LoginException {
if (!isKrbTicket) {
return;
}
@@ -363,7 +363,7 @@ public class KerberosLogin extends AbstractLogin {
//clear up the kerberos state. But the tokens are not cleared! As
per
//the Java kerberos login module code, only the kerberos
credentials
//are cleared
- loginContext.logout();
+ logout();
//login and also update the subject field of this instance to
//have the new credentials (pass it to the LoginContext
constructor)
loginContext = new LoginContext(contextName(), subject, null,
configuration());
@@ -372,6 +372,11 @@ public class KerberosLogin extends AbstractLogin {
}
}
+ // Visibility to override for testing
+ protected void logout() throws LoginException {
+ loginContext.logout();
+ }
+
private long currentElapsedTime() {
return time.hiResClockMs();
}
diff --git
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index d6414c0..fa21a94 100644
---
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -20,8 +20,8 @@ package kafka.server
import java.net.InetSocketAddress
import java.time.Duration
-import java.util.Properties
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.{Collections, Properties}
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import kafka.api.{Both, IntegrationTestHarness, SaslSetup}
import kafka.utils.TestUtils
@@ -31,7 +31,8 @@ import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.errors.SaslAuthenticationException
import org.apache.kafka.common.network._
import org.apache.kafka.common.security.{JaasContext, TestSecurityConfig}
-import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.auth.{Login, SecurityProtocol}
+import org.apache.kafka.common.security.kerberos.KerberosLogin
import org.apache.kafka.common.utils.{LogContext, MockTime}
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -57,6 +58,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness
with SaslSetup {
@Before
override def setUp(): Unit = {
+ TestableKerberosLogin.reset()
startSasl(jaasSections(kafkaServerSaslMechanisms,
Option(kafkaClientSaslMechanism), Both))
serverConfig.put(KafkaConfig.SslClientAuthProp, "required")
serverConfig.put(KafkaConfig.FailedAuthenticationDelayMsProp,
failedAuthenticationDelayMs.toString)
@@ -78,6 +80,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness
with SaslSetup {
executor.shutdownNow()
super.tearDown()
closeSasl()
+ TestableKerberosLogin.reset()
}
/**
@@ -97,6 +100,35 @@ class GssapiAuthenticationTest extends
IntegrationTestHarness with SaslSetup {
}
/**
+ * Verifies that there are no authentication failures during Kerberos
re-login. If authentication
+ * is performed when credentials are unavailable between logout and login,
we handle it as a
+ * transient error and not an authentication failure so that clients may
retry.
+ */
+ @Test
+ def testReLogin(): Unit = {
+ val selector = createSelectorWithRelogin()
+ try {
+ val login = TestableKerberosLogin.instance
+ assertNotNull(login)
+ executor.submit(() => login.reLogin(), 0)
+
+ val node1 = "1"
+ selector.connect(node1, serverAddr, 1024, 1024)
+ login.logoutResumeLatch.countDown()
+ login.logoutCompleteLatch.await(15, TimeUnit.SECONDS)
+ assertFalse("Authenticated during re-login",
pollUntilReadyOrDisconnected(selector, node1))
+
+ login.reLoginResumeLatch.countDown()
+ login.reLoginCompleteLatch.await(15, TimeUnit.SECONDS)
+ val node2 = "2"
+ selector.connect(node2, serverAddr, 1024, 1024)
+ assertTrue("Authenticated failed after re-login",
pollUntilReadyOrDisconnected(selector, node2))
+ } finally {
+ selector.close()
+ }
+ }
+
+ /**
* Tests that Kerberos error `Server not found in Kerberos database (7)` is
handled
* as a fatal authentication failure.
*/
@@ -149,16 +181,8 @@ class GssapiAuthenticationTest extends
IntegrationTestHarness with SaslSetup {
while (actualSuccessfulAuths < numSuccessfulAuths) {
val nodeId = actualSuccessfulAuths.toString
selector.connect(nodeId, serverAddr, 1024, 1024)
- TestUtils.waitUntilTrue(() => {
- selector.poll(100)
- val disconnectState = selector.disconnected().get(nodeId)
- // Verify that disconnect state is not AUTHENTICATION_FAILED
- if (disconnectState != null)
- assertEquals(s"Authentication failed with exception
${disconnectState.exception()}",
- ChannelState.State.AUTHENTICATE, disconnectState.state())
- selector.isChannelReady(nodeId) || disconnectState != null
- }, "Client not ready or disconnected within timeout")
- if (selector.isChannelReady(nodeId))
+ val isReady = pollUntilReadyOrDisconnected(selector, nodeId)
+ if (isReady)
actualSuccessfulAuths += 1
selector.close(nodeId)
}
@@ -167,6 +191,22 @@ class GssapiAuthenticationTest extends
IntegrationTestHarness with SaslSetup {
}
}
+ private def pollUntilReadyOrDisconnected(selector: Selector, nodeId:
String): Boolean = {
+ TestUtils.waitUntilTrue(() => {
+ selector.poll(100)
+ val disconnectState = selector.disconnected().get(nodeId)
+ // Verify that disconnect state is not AUTHENTICATION_FAILED
+ if (disconnectState != null) {
+ assertEquals(s"Authentication failed with exception
${disconnectState.exception()}",
+ ChannelState.State.AUTHENTICATE, disconnectState.state())
+ }
+ selector.isChannelReady(nodeId) || disconnectState != null
+ }, "Client not ready or disconnected within timeout")
+ val isReady = selector.isChannelReady(nodeId)
+ selector.close(nodeId)
+ isReady
+ }
+
/**
* Verifies that authentication with the current `clientConfig` results in
disconnection and that
* the disconnection is notified with disconnect state
`AUTHENTICATION_FAILED`. This is to ensure
@@ -192,4 +232,45 @@ class GssapiAuthenticationTest extends
IntegrationTestHarness with SaslSetup {
time, true, new LogContext())
NetworkTestUtils.createSelector(channelBuilder, time)
}
+
+ private def createSelectorWithRelogin(): Selector = {
+
clientConfig.setProperty(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, "0")
+ val config = new TestSecurityConfig(clientConfig)
+ val jaasContexts = Collections.singletonMap("GSSAPI",
JaasContext.loadClientContext(config.values()))
+ val channelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContexts,
securityProtocol,
+ null, false, kafkaClientSaslMechanism, true, null, null, time, new
LogContext()) {
+ override protected def defaultLoginClass(): Class[_ <: Login] =
classOf[TestableKerberosLogin]
+ }
+ channelBuilder.configure(config.values())
+ NetworkTestUtils.createSelector(channelBuilder, time)
+ }
+}
+
+object TestableKerberosLogin {
+ @volatile var instance: TestableKerberosLogin = _
+ def reset(): Unit = {
+ instance = null
+ }
+}
+
+class TestableKerberosLogin extends KerberosLogin {
+ val logoutResumeLatch = new CountDownLatch(1)
+ val logoutCompleteLatch = new CountDownLatch(1)
+ val reLoginResumeLatch = new CountDownLatch(1)
+ val reLoginCompleteLatch = new CountDownLatch(1)
+
+ assertNull(TestableKerberosLogin.instance)
+ TestableKerberosLogin.instance = this
+
+ override def reLogin(): Unit = {
+ super.reLogin()
+ reLoginCompleteLatch.countDown()
+ }
+
+ override protected def logout(): Unit = {
+ logoutResumeLatch.await(15, TimeUnit.SECONDS)
+ super.logout()
+ logoutCompleteLatch.countDown()
+ reLoginResumeLatch.await(15, TimeUnit.SECONDS)
+ }
}