This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4d43abf1e09 KAFKA-14770: Allow dynamic keystore update for brokers if
string representation of DN matches even if canonical DNs don't match (#13346)
4d43abf1e09 is described below
commit 4d43abf1e09e01fc5e7af52f65e3fbae02cf9771
Author: Rajini Sivaram <[email protected]>
AuthorDate: Tue Mar 7 09:41:01 2023 +0000
KAFKA-14770: Allow dynamic keystore update for brokers if string
representation of DN matches even if canonical DNs don't match (#13346)
To avoid mistakes during dynamic broker config updates that could
potentially affect clients, we restrict changes that can be performed
dynamically without broker restart. For broker keystore updates, we require the
DN to be the same for the old and new certificates since this could potentially
contain host names used for host name verification by clients. DNs are compared
using standard Java implementation of X500Principal.equals() which compares
canonical names. If tags of fields ch [...]
Reviewers: Manikumar Reddy <[email protected]>, Kalpesh Patel
<[email protected]>
---
.../kafka/common/security/ssl/SslFactory.java | 8 ++++-
.../kafka/common/security/ssl/SslFactoryTest.java | 38 ++++++++++++++++++++++
.../java/org/apache/kafka/test/TestSslUtils.java | 19 +++++++++--
3 files changed, 62 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index d0cc4cc1e69..65c37aa6b47 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -312,7 +312,13 @@ public class SslFactory implements Reconfigurable,
Closeable {
for (int i = 0; i < newEntries.size(); i++) {
CertificateEntries newEntry = newEntries.get(i);
CertificateEntries oldEntry = oldEntries.get(i);
- if (!Objects.equals(newEntry.subjectPrincipal,
oldEntry.subjectPrincipal)) {
+ Principal newPrincipal = newEntry.subjectPrincipal;
+ Principal oldPrincipal = oldEntry.subjectPrincipal;
+ // Compare principal objects to compare canonical names (e.g.
to ignore leading/trailing whitespaces).
+ // Canonical names may differ if the tags of a field changes
from one with a printable string representation
+ // to one without or vice-versa due to optional conversion to
hex representation based on the tag. So we
+ // also compare Principal.getName which compares the RFC2253
name. If either matches, allow dynamic update.
+ if (!Objects.equals(newPrincipal, oldPrincipal) &&
!newPrincipal.getName().equalsIgnoreCase(oldPrincipal.getName())) {
throw new ConfigException(String.format("Keystore
DistinguishedName does not match: " +
" existing={alias=%s, DN=%s}, new={alias=%s, DN=%s}",
oldEntry.alias, oldEntry.subjectPrincipal,
newEntry.alias, newEntry.subjectPrincipal));
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
index 21dcd6e4b0f..7ac707b5de7 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -20,7 +20,9 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.security.GeneralSecurityException;
+import java.security.KeyPair;
import java.security.KeyStore;
+import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Map;
@@ -46,6 +48,7 @@ import org.apache.kafka.common.network.Mode;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
+import static
org.apache.kafka.common.security.ssl.SslFactory.CertificateEntries.ensureCompatible;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -521,6 +524,41 @@ public abstract class SslFactoryTest {
assertFalse(securityConfig.unused().contains(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG));
}
+ @Test
+ public void testDynamicUpdateCompatibility() throws Exception {
+ KeyPair keyPair = TestSslUtils.generateKeyPair("RSA");
+ KeyStore ks = createKeyStore(keyPair, "*.example.com", "Kafka", true,
"localhost", "*.example.com");
+ ensureCompatible(ks, ks);
+ ensureCompatible(ks, createKeyStore(keyPair, "*.example.com", "Kafka",
true, "localhost", "*.example.com"));
+ ensureCompatible(ks, createKeyStore(keyPair, " *.example.com", " Kafka
", true, "localhost", "*.example.com"));
+ ensureCompatible(ks, createKeyStore(keyPair, "*.example.COM", "Kafka",
true, "localhost", "*.example.com"));
+ ensureCompatible(ks, createKeyStore(keyPair, "*.EXAMPLE.COM", "KAFKA",
true, "localhost", "*.example.com"));
+ ensureCompatible(ks, createKeyStore(keyPair, "*.EXAMPLE.COM", "Kafka",
true, "*.example.com"));
+ ensureCompatible(ks, createKeyStore(keyPair, "*.EXAMPLE.COM", "Kafka",
true, "localhost"));
+
+ ensureCompatible(ks, createKeyStore(keyPair, "*.example.com", "Kafka",
false, "localhost", "*.example.com"));
+ ensureCompatible(ks, createKeyStore(keyPair, "*.example.COM", "Kafka",
false, "localhost", "*.example.com"));
+ ensureCompatible(ks, createKeyStore(keyPair, "*.EXAMPLE.COM", "KAFKA",
false, "localhost", "*.example.com"));
+ ensureCompatible(ks, createKeyStore(keyPair, "*.EXAMPLE.COM", "Kafka",
false, "*.example.com"));
+ ensureCompatible(ks, createKeyStore(keyPair, "*.EXAMPLE.COM", "Kafka",
false, "localhost"));
+
+ assertThrows(ConfigException.class, () ->
+ ensureCompatible(ks, createKeyStore(keyPair, " *.example.com",
" Kafka ", false, "localhost", "*.example.com")));
+ assertThrows(ConfigException.class, () ->
+ ensureCompatible(ks, createKeyStore(keyPair,
"*.another.example.com", "Kafka", true, "*.example.com")));
+ assertThrows(ConfigException.class, () ->
+ ensureCompatible(ks, createKeyStore(keyPair, "*.EXAMPLE.COM",
"Kafka", true, "*.another.example.com")));
+ }
+
+ private KeyStore createKeyStore(KeyPair keyPair, String commonName, String
org, boolean utf8, String... dnsNames) throws Exception {
+ X509Certificate cert = new
TestSslUtils.CertificateBuilder().sanDnsNames(dnsNames)
+ .generate(commonName, org, utf8, keyPair);
+ KeyStore ks = KeyStore.getInstance("PKCS12");
+ ks.load(null, null);
+ ks.setKeyEntry("kafka", keyPair.getPrivate(), null, new
X509Certificate[] {cert});
+ return ks;
+ }
+
private KeyStore sslKeyStore(Map<String, Object> sslConfig) {
SecurityStore store;
if (sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG) != null) {
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index 6d006ea9c9e..675e6563f87 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -23,7 +23,12 @@ import
org.apache.kafka.common.security.auth.SslEngineFactory;
import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
import org.bouncycastle.asn1.DEROctetString;
import org.bouncycastle.asn1.DERSequence;
+import org.bouncycastle.asn1.DERT61String;
+import org.bouncycastle.asn1.DERUTF8String;
+import org.bouncycastle.asn1.x500.AttributeTypeAndValue;
+import org.bouncycastle.asn1.x500.RDN;
import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x500.style.BCStyle;
import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
import org.bouncycastle.asn1.x509.Extension;
import org.bouncycastle.asn1.x509.GeneralName;
@@ -382,6 +387,17 @@ public class TestSslUtils {
}
public X509Certificate generate(String dn, KeyPair keyPair) throws
CertificateException {
+ return generate(new X500Name(dn), keyPair);
+ }
+
+ public X509Certificate generate(String commonName, String org, boolean
utf8, KeyPair keyPair) throws CertificateException {
+ RDN[] rdns = new RDN[2];
+ rdns[0] = new RDN(new AttributeTypeAndValue(BCStyle.CN, utf8 ? new
DERUTF8String(commonName) : new DERT61String(commonName)));
+ rdns[1] = new RDN(new AttributeTypeAndValue(BCStyle.O, utf8 ? new
DERUTF8String(org) : new DERT61String(org)));
+ return generate(new X500Name(rdns), keyPair);
+ }
+
+ public X509Certificate generate(X500Name dn, KeyPair keyPair) throws
CertificateException {
try {
Security.addProvider(new BouncyCastleProvider());
AlgorithmIdentifier sigAlgId = new
DefaultSignatureAlgorithmIdentifierFinder().find(algorithm);
@@ -399,11 +415,10 @@ public class TestSslUtils {
else
throw new IllegalArgumentException("Unsupported algorithm
" + keyAlgorithm);
ContentSigner sigGen =
signerBuilder.build(privateKeyAsymKeyParam);
- X500Name name = new X500Name(dn);
Date from = new Date();
Date to = new Date(from.getTime() + days * 86400000L);
BigInteger sn = new BigInteger(64, new SecureRandom());
- X509v3CertificateBuilder v3CertGen = new
X509v3CertificateBuilder(name, sn, from, to, name, subPubKeyInfo);
+ X509v3CertificateBuilder v3CertGen = new
X509v3CertificateBuilder(dn, sn, from, to, dn, subPubKeyInfo);
if (subjectAltName != null)
v3CertGen.addExtension(Extension.subjectAlternativeName,
false, subjectAltName);