This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 923b6c3fea1 KAFKA-19153: Add OAuth integration tests (#19938)
923b6c3fea1 is described below
commit 923b6c3fea1fa7c9f06ebf01440dd5c12fac58db
Author: Kirk True <[email protected]>
AuthorDate: Thu Jun 12 12:48:14 2025 -0700
KAFKA-19153: Add OAuth integration tests (#19938)
Adds a test dependency on
[mock-oauth2-server](https://github.com/navikt/mock-oauth2-server/) for
integration tests for OAuth layer. Also includes fixes for some
regressions that were caught by the integration tests.
Reviewers: Manikumar Reddy <[email protected]>, Lianet Magrans
<[email protected]>
---
build.gradle | 1 +
.../oauthbearer/ClientCredentialsJwtRetriever.java | 9 +-
.../OAuthBearerValidatorCallbackHandler.java | 2 +-
.../internals/secured/ConfigurationUtils.java | 12 +
.../secured/assertion/DefaultAssertionCreator.java | 4 +-
.../oauthbearer/JwtBearerJwtRetrieverTest.java | 5 +-
.../kafka/api/ClientOAuthIntegrationTest.scala | 261 +++++++++++++++++++++
gradle/dependencies.gradle | 2 +
licenses/mock-oauth2-server-MIT | 21 ++
9 files changed, 308 insertions(+), 9 deletions(-)
diff --git a/build.gradle b/build.gradle
index 767395ad32d..8e0f4393252 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1060,6 +1060,7 @@ project(':core') {
testImplementation libs.junitJupiter
testImplementation libs.caffeine
testImplementation testLog4j2Libs
+ testImplementation libs.mockOAuth2Server
testRuntimeOnly runtimeTestLibs
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientCredentialsJwtRetriever.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientCredentialsJwtRetriever.java
index 627434f6d3c..4744fd91289 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientCredentialsJwtRetriever.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientCredentialsJwtRetriever.java
@@ -39,7 +39,10 @@ import javax.security.auth.login.AppConfigurationEntry;
import static
org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
+import static
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID;
+import static
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET;
import static
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_HEADER_URLENCODE;
+import static
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE;
import static
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
import static
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
import static
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG;
@@ -173,8 +176,8 @@ public class ClientCredentialsJwtRetriever implements
JwtRetriever {
private String clientId() {
return getValue(
+ SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID,
CLIENT_ID_CONFIG,
- "clientId",
true,
cu::validateString,
jou::validateString
@@ -183,8 +186,8 @@ public class ClientCredentialsJwtRetriever implements
JwtRetriever {
private String clientSecret() {
return getValue(
+ SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET,
CLIENT_SECRET_CONFIG,
- "clientSecret",
true,
cu::validatePassword,
jou::validateString
@@ -193,8 +196,8 @@ public class ClientCredentialsJwtRetriever implements
JwtRetriever {
private String scope() {
return getValue(
+ SASL_OAUTHBEARER_SCOPE,
SCOPE_CONFIG,
- "scope",
false,
cu::validateString,
jou::validateString
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java
index 6563d36b8b6..60fa8cdb678 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java
@@ -170,7 +170,7 @@ public class OAuthBearerValidatorCallbackHandler implements
AuthenticateCallback
}
private void checkConfigured() {
- if (verificationKeyResolver == null || jwtValidator == null)
+ if (jwtValidator == null)
throw new IllegalStateException(String.format("To use %s, first
call the configure method", getClass().getSimpleName()));
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
index a0819766a38..3eebecf8fde 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
@@ -22,6 +22,9 @@ import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
@@ -47,6 +50,8 @@ import static
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALL
public class ConfigurationUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConfigurationUtils.class);
+
private final Map<String, ?> configs;
private final String prefix;
@@ -344,6 +349,13 @@ public class ConfigurationUtils {
((OAuthBearerConfigurable) o).configure(configs,
saslMechanism, jaasConfigEntries);
} catch (Exception e) {
Utils.maybeCloseQuietly(o, "Instance of class " +
o.getClass().getName() + " failed call to configure()");
+ LOG.warn(
+ "The class {} defined in the {} configuration encountered
an error on configure(): {}",
+ o.getClass().getName(),
+ configName,
+ e.getMessage(),
+ e
+ );
throw new ConfigException(
String.format(
"The class %s defined in the %s configuration
encountered an error on configure(): %s",
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/DefaultAssertionCreator.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/DefaultAssertionCreator.java
index db562fade87..52b9eb2fb53 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/DefaultAssertionCreator.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/DefaultAssertionCreator.java
@@ -16,7 +16,7 @@
*/
package
org.apache.kafka.common.security.oauthbearer.internals.secured.assertion;
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.oauthbearer.JwtRetrieverException;
import
org.apache.kafka.common.security.oauthbearer.internals.secured.CachedFile;
import org.apache.kafka.common.utils.Utils;
@@ -89,7 +89,7 @@ public class DefaultAssertionCreator implements
AssertionCreator {
return privateKey(contents.getBytes(StandardCharsets.UTF_8),
passphrase);
} catch (GeneralSecurityException | IOException e) {
- throw new KafkaException("An error occurred generating the
OAuth assertion private key from " + file.getPath(), e);
+ throw new JwtRetrieverException("An error occurred generating
the OAuth assertion private key from " + file.getPath(), e);
}
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java
index c466ac83689..4a4e567dedf 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.common.security.oauthbearer;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
import
org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerTest;
@@ -95,7 +94,7 @@ public class JwtBearerJwtRetrieverTest extends
OAuthBearerTest {
List<AppConfigurationEntry> jaasConfigEntries = getJaasConfigEntries();
try (JwtBearerJwtRetriever jwtRetriever = new JwtBearerJwtRetriever())
{
- KafkaException e = assertThrows(KafkaException.class, () ->
jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries));
+ JwtRetrieverException e =
assertThrows(JwtRetrieverException.class, () -> jwtRetriever.configure(configs,
OAUTHBEARER_MECHANISM, jaasConfigEntries));
assertNotNull(e.getCause());
assertInstanceOf(GeneralSecurityException.class, e.getCause());
}
@@ -144,7 +143,7 @@ public class JwtBearerJwtRetrieverTest extends
OAuthBearerTest {
List<AppConfigurationEntry> jaasConfigEntries = getJaasConfigEntries();
try (JwtBearerJwtRetriever jwtRetriever = new JwtBearerJwtRetriever())
{
- KafkaException e = assertThrows(KafkaException.class, () ->
jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries));
+ JwtRetrieverException e =
assertThrows(JwtRetrieverException.class, () -> jwtRetriever.configure(configs,
OAUTHBEARER_MECHANISM, jaasConfigEntries));
assertNotNull(e.getCause());
assertInstanceOf(IOException.class, e.getCause());
}
diff --git
a/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala
new file mode 100644
index 00000000000..22ab6f2673c
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.api
+
+import com.nimbusds.jose.jwk.RSAKey
+import kafka.api.{IntegrationTestHarness, SaslSetup}
+import kafka.utils.TestInfoUtils
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.{ConfigException, SaslConfigs}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
+
+import java.util.{Base64, Collections, Properties}
+import no.nav.security.mock.oauth2.{MockOAuth2Server, OAuth2Config}
+import no.nav.security.mock.oauth2.token.{KeyProvider, OAuth2TokenProvider}
+import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import
org.apache.kafka.common.security.oauthbearer.{OAuthBearerLoginCallbackHandler,
OAuthBearerLoginModule, OAuthBearerValidatorCallbackHandler}
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.test.TestUtils
+import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertThrows}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.MethodSource
+
+import java.io.File
+import java.nio.ByteBuffer
+import java.nio.channels.FileChannel
+import java.nio.file.StandardOpenOption
+import java.security.{KeyPairGenerator, PrivateKey}
+import java.security.interfaces.RSAPublicKey
+import java.util
+
+/**
+ * Integration tests for the consumer that cover basic usage as well as
coordinator failure
+ */
+class ClientOAuthIntegrationTest extends IntegrationTestHarness with SaslSetup
{
+
+ override val brokerCount = 3
+
+ override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+ override protected val serverSaslProperties =
Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms,
kafkaClientSaslMechanism))
+ override protected val clientSaslProperties =
Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+
+ protected def kafkaClientSaslMechanism = "OAUTHBEARER"
+ protected def kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
+
+ val issuerId = "default"
+ var mockOAuthServer: MockOAuth2Server = _
+ var privateKey: PrivateKey = _
+
+ @BeforeEach
+ override def setUp(testInfo: TestInfo): Unit = {
+ // Step 1: Generate the key pair dynamically.
+ val keyGen = KeyPairGenerator.getInstance("RSA")
+ keyGen.initialize(2048)
+ val keyPair = keyGen.generateKeyPair()
+
+ privateKey = keyPair.getPrivate
+
+ // Step 2: Create the RSA JWK from key pair.
+ val rsaJWK = new
RSAKey.Builder(keyPair.getPublic.asInstanceOf[RSAPublicKey])
+ .privateKey(privateKey)
+ .keyID("foo")
+ .build()
+
+ // Step 3: Create the OAuth server using the keys just created
+ val keyProvider = new KeyProvider(Collections.singletonList(rsaJWK))
+ val tokenProvider = new OAuth2TokenProvider(keyProvider)
+ val oauthConfig = new OAuth2Config(false, null, null, false, tokenProvider)
+ mockOAuthServer = new MockOAuth2Server(oauthConfig)
+
+ mockOAuthServer.start()
+ val tokenEndpointUrl =
mockOAuthServer.tokenEndpointUrl(issuerId).url().toString
+ val jwksUrl = mockOAuthServer.jwksUrl(issuerId).url().toString
+
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG,
s"$tokenEndpointUrl,$jwksUrl")
+
+ val listenerNamePrefix =
s"listener.name.${listenerName.value().toLowerCase}"
+
+
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${SaslConfigs.SASL_JAAS_CONFIG}",
s"${classOf[OAuthBearerLoginModule].getName} required ;")
+
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE}",
issuerId)
+
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL}",
jwksUrl)
+
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG}",
classOf[OAuthBearerValidatorCallbackHandler].getName)
+
+ // create static config including client login context with credentials
for JaasTestUtils 'client2'
+ startSasl(jaasSections(kafkaServerSaslMechanisms,
Option(kafkaClientSaslMechanism)))
+
+ // The superuser needs the configuration in setUp because it's used to
create resources before the individual
+ // test methods are invoked.
+ superuserClientConfig.putAll(defaultClientCredentialsConfigs())
+
+ super.setUp(testInfo)
+ }
+
+ @AfterEach
+ override def tearDown(): Unit = {
+ if (mockOAuthServer != null)
+ mockOAuthServer.shutdown()
+
+ closeSasl()
+ super.tearDown()
+
+
System.clearProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG)
+
System.clearProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG)
+ }
+
+ def defaultOAuthConfigs(): Properties = {
+ val tokenEndpointUrl =
mockOAuthServer.tokenEndpointUrl(issuerId).url().toString
+
+ val configs = new Properties()
+ configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
securityProtocol.name)
+ configs.put(SaslConfigs.SASL_JAAS_CONFIG,
jaasClientLoginModule(kafkaClientSaslMechanism))
+ configs.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
classOf[OAuthBearerLoginCallbackHandler].getName)
+ configs.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL,
tokenEndpointUrl)
+ configs
+ }
+
+ def defaultClientCredentialsConfigs(): Properties = {
+ val configs = defaultOAuthConfigs()
+ configs.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID,
"test-client")
+ configs.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET,
"test-secret")
+ configs
+ }
+
+ def defaultJwtBearerConfigs(): Properties = {
+ val configs = defaultOAuthConfigs()
+ configs.put(SaslConfigs.SASL_JAAS_CONFIG,
jaasClientLoginModule(kafkaClientSaslMechanism))
+ configs.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
classOf[OAuthBearerLoginCallbackHandler].getName)
+ configs.put(SaslConfigs.SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS,
"org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever")
+ configs
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+ @MethodSource(Array("getTestGroupProtocolParametersAll"))
+ def testBasicClientCredentials(groupProtocol: String): Unit = {
+ val configs = defaultClientCredentialsConfigs()
+ assertDoesNotThrow(() => createProducer(configOverrides = configs))
+ assertDoesNotThrow(() => createConsumer(configOverrides = configs))
+ assertDoesNotThrow(() => createAdminClient(configOverrides = configs))
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+ @MethodSource(Array("getTestGroupProtocolParametersAll"))
+ def testBasicJwtBearer(groupProtocol: String): Unit = {
+ val jwt = mockOAuthServer.issueToken(issuerId, "jdoe", "someaudience",
Collections.singletonMap("scope", "test"))
+ val assertionFile = TestUtils.tempFile(jwt.serialize())
+
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG,
assertionFile.getAbsolutePath)
+
+ val configs = defaultJwtBearerConfigs()
+ configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE,
assertionFile.getAbsolutePath)
+
+ assertDoesNotThrow(() => createProducer(configOverrides = configs))
+ assertDoesNotThrow(() => createConsumer(configOverrides = configs))
+ assertDoesNotThrow(() => createAdminClient(configOverrides = configs))
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+ @MethodSource(Array("getTestGroupProtocolParametersAll"))
+ def testBasicJwtBearer2(groupProtocol: String): Unit = {
+ val privateKeyFile = generatePrivateKeyFile()
+
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG,
privateKeyFile.getAbsolutePath)
+
+ val configs = defaultJwtBearerConfigs()
+ configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_FILE,
privateKeyFile.getPath)
+ configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_AUD, "default")
+ configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_SUB,
"kafka-client-test-sub")
+ configs.put(SaslConfigs.SASL_OAUTHBEARER_SCOPE, "default")
+ // configs.put(SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME, "aud")
+
+ assertDoesNotThrow(() => createProducer(configOverrides = configs))
+ assertDoesNotThrow(() => createConsumer(configOverrides = configs))
+ assertDoesNotThrow(() => createAdminClient(configOverrides = configs))
+ }
+
+ @Disabled("KAFKA-19394: Failure in
ConsumerNetworkThread.initializeResources() can cause hangs on
AsyncKafkaConsumer.close()")
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+ @MethodSource(Array("getTestGroupProtocolParametersAll"))
+ def testJwtBearerWithMalformedAssertionFile(groupProtocol: String): Unit = {
+ // Create the assertion file, but fill it with non-JWT garbage.
+ val assertionFile = TestUtils.tempFile("CQEN*)Q#F)&)^#QNC")
+
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG,
assertionFile.getAbsolutePath)
+
+ val configs = defaultJwtBearerConfigs()
+ configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE,
assertionFile.getAbsolutePath)
+
+ assertThrows(classOf[KafkaException], () => createProducer(configOverrides
= configs))
+ assertThrows(classOf[KafkaException], () => createConsumer(configOverrides
= configs))
+ assertThrows(classOf[KafkaException], () =>
createAdminClient(configOverrides = configs))
+ }
+
+ @Disabled("KAFKA-19394: Failure in
ConsumerNetworkThread.initializeResources() can cause hangs on
AsyncKafkaConsumer.close()")
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+ @MethodSource(Array("getTestGroupProtocolParametersAll"))
+ def testJwtBearerWithEmptyAssertionFile(groupProtocol: String): Unit = {
+ // Create the assertion file, but leave it empty.
+ val assertionFile = TestUtils.tempFile()
+
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG,
assertionFile.getAbsolutePath)
+
+ val configs = defaultJwtBearerConfigs()
+ configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE,
assertionFile.getAbsolutePath)
+
+ assertThrows(classOf[KafkaException], () => createProducer(configOverrides
= configs))
+ assertThrows(classOf[KafkaException], () => createConsumer(configOverrides
= configs))
+ assertThrows(classOf[KafkaException], () =>
createAdminClient(configOverrides = configs))
+ }
+
+ @Disabled("KAFKA-19394: Failure in
ConsumerNetworkThread.initializeResources() can cause hangs on
AsyncKafkaConsumer.close()")
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+ @MethodSource(Array("getTestGroupProtocolParametersAll"))
+ def testJwtBearerWithMissingAssertionFile(groupProtocol: String): Unit = {
+ val missingFileName = "/this/does/not/exist.txt"
+
+ val configs = defaultJwtBearerConfigs()
+ configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, missingFileName)
+
+ assertThrows(classOf[KafkaException], () => createProducer(configOverrides
= configs))
+ assertThrows(classOf[KafkaException], () => createConsumer(configOverrides
= configs))
+ assertThrows(classOf[KafkaException], () =>
createAdminClient(configOverrides = configs))
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+ @MethodSource(Array("getTestGroupProtocolParametersAll"))
+ def testUnsupportedJwtRetriever(groupProtocol: String): Unit = {
+ val className =
"org.apache.kafka.common.security.oauthbearer.ThisIsNotARealJwtRetriever"
+
+ val configs = defaultOAuthConfigs()
+ configs.put(SaslConfigs.SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS, className)
+
+ assertThrows(classOf[ConfigException], () =>
createProducer(configOverrides = configs))
+ assertThrows(classOf[ConfigException], () =>
createConsumer(configOverrides = configs))
+ assertThrows(classOf[ConfigException], () =>
createAdminClient(configOverrides = configs))
+ }
+
+ def generatePrivateKeyFile(): File = {
+ val file = File.createTempFile("private-", ".key")
+ val bytes = Base64.getEncoder.encode(privateKey.getEncoded)
+ var channel: FileChannel = null
+
+ try {
+ channel = FileChannel.open(file.toPath,
util.EnumSet.of(StandardOpenOption.WRITE))
+ Utils.writeFully(channel, ByteBuffer.wrap(bytes))
+ } finally {
+ channel.close()
+ }
+
+ file
+ }
+}
\ No newline at end of file
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index cd6af7ee168..2e97bf2a4c4 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -123,6 +123,7 @@ versions += [
slf4j: "1.7.36",
snappy: "1.1.10.7",
spotbugs: "4.8.6",
+ mockOAuth2Server: "2.2.1",
zinc: "1.9.2",
// When updating the zstd version, please do as well in
docker/native/native-image-configs/resource-config.json
// Also make sure the compression levels in
org.apache.kafka.common.record.CompressionType are still valid
@@ -224,6 +225,7 @@ libs += [
snappy: "org.xerial.snappy:snappy-java:$versions.snappy",
spotbugs: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs",
swaggerAnnotations: "io.swagger.core.v3:swagger-annotations:$swaggerVersion",
+ mockOAuth2Server:
"no.nav.security:mock-oauth2-server:$versions.mockOAuth2Server",
jfreechart: "jfreechart:jfreechart:$versions.jfreechart",
mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact",
zstd: "com.github.luben:zstd-jni:$versions.zstd",
diff --git a/licenses/mock-oauth2-server-MIT b/licenses/mock-oauth2-server-MIT
new file mode 100644
index 00000000000..ef1b1129b10
--- /dev/null
+++ b/licenses/mock-oauth2-server-MIT
@@ -0,0 +1,21 @@
+# The MIT License
+
+Copyright 2025 NAV (Arbeids- og velferdsdirektoratet) - The Norwegian Labour
and Welfare Administration
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the "Software"),
+to deal in the Software without restriction, including without limitation
+the rights to use, copy, modify, merge, publish, distribute, sublicense,
+and/or sell copies of the Software, and to permit persons to whom the
+Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included
+in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+USE OR OTHER DEALINGS IN THE SOFTWARE.
\ No newline at end of file