This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new edfd769 KAFKA-13418: Support key updates with TLS 1.3 (#11966)
edfd769 is described below
commit edfd769f426e5baaf94c379e23624ec82e3e80bb
Author: Ismael Juma <[email protected]>
AuthorDate: Tue Mar 29 14:59:38 2022 -0700
KAFKA-13418: Support key updates with TLS 1.3 (#11966)
Key updates with TLS 1.3 trigger code paths similar to renegotiation with
TLS 1.2.
Update the read/write paths not to throw an exception in this case (kept
the exception
in the `handshake` method).
With the default configuration, key updates happen after 2^37 bytes are
encrypted.
There is a security property to adjust this configuration, but the change
has to be
done before it is used for the first time and it cannot be changed after
that. As such,
it is best done via a system test (filed KAFKA-13779).
To validate the change, I wrote a unit test that forces key updates and
manually ran
a producer workload that produced more than 2^37 bytes. Both cases failed
without
these changes and pass with them.
Note that Shylaja Kokoori attached a patch with the SslTransportLayer fix
and hence
included them as a co-author of this change.
Reviewers: Rajini Sivaram <[email protected]>
Co-authored-by: Shylaja Kokoori
---
.../kafka/common/network/SslTransportLayer.java | 16 ++--
.../apache/kafka/common/network/SelectorTest.java | 5 --
.../kafka/common/network/SslSelectorTest.java | 44 ++---------
.../kafka/common/network/Tls12SelectorTest.java | 72 +++++++++++++++++
.../kafka/common/network/Tls13SelectorTest.java | 92 ++++++++++++++++++++++
5 files changed, 180 insertions(+), 49 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index b9879ad..d276e99 100644
---
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -71,6 +71,8 @@ public class SslTransportLayer implements TransportLayer {
CLOSING
}
+ private static final String TLS13 = "TLSv1.3";
+
private final String channelId;
private final SSLEngine sslEngine;
private final SelectionKey key;
@@ -446,7 +448,7 @@ public class SslTransportLayer implements TransportLayer {
if (netWriteBuffer.hasRemaining())
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
else {
- state = sslEngine.getSession().getProtocol().equals("TLSv1.3")
? State.POST_HANDSHAKE : State.READY;
+ state = sslEngine.getSession().getProtocol().equals(TLS13) ?
State.POST_HANDSHAKE : State.READY;
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
SSLSession session = sslEngine.getSession();
log.debug("SSL handshake completed successfully with peerHost
'{}' peerPort {} peerPrincipal '{}' cipherSuite '{}'",
@@ -578,10 +580,11 @@ public class SslTransportLayer implements TransportLayer {
throw e;
}
netReadBuffer.compact();
- // handle ssl renegotiation.
+ // reject renegotiation if TLS < 1.3, key updates for TLS 1.3
are allowed
if (unwrapResult.getHandshakeStatus() !=
HandshakeStatus.NOT_HANDSHAKING &&
unwrapResult.getHandshakeStatus() !=
HandshakeStatus.FINISHED &&
- unwrapResult.getStatus() == Status.OK) {
+ unwrapResult.getStatus() == Status.OK &&
+ !sslEngine.getSession().getProtocol().equals(TLS13)) {
log.error("Renegotiation requested, but it is not
supported, channelId {}, " +
"appReadBuffer pos {}, netReadBuffer pos {},
netWriteBuffer pos {} handshakeStatus {}", channelId,
appReadBuffer.position(), netReadBuffer.position(),
netWriteBuffer.position(), unwrapResult.getHandshakeStatus());
@@ -699,9 +702,12 @@ public class SslTransportLayer implements TransportLayer {
SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
netWriteBuffer.flip();
- //handle ssl renegotiation
- if (wrapResult.getHandshakeStatus() !=
HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK)
+ // reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are
allowed
+ if (wrapResult.getHandshakeStatus() !=
HandshakeStatus.NOT_HANDSHAKING &&
+ wrapResult.getStatus() == Status.OK &&
+ !sslEngine.getSession().getProtocol().equals(TLS13)) {
throw renegotiationException();
+ }
if (wrapResult.getStatus() == Status.OK) {
written += wrapResult.bytesConsumed();
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index f276cd4..43b0956 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -110,10 +110,6 @@ public class SelectorTest {
}
}
- public SecurityProtocol securityProtocol() {
- return SecurityProtocol.PLAINTEXT;
- }
-
protected Map<String, Object> clientConfigs() {
return new HashMap<>();
}
@@ -1015,7 +1011,6 @@ public class SelectorTest {
private String blockingRequest(String node, String s) throws IOException {
selector.send(createSend(node, s));
- selector.poll(1000L);
while (true) {
selector.poll(1000L);
for (NetworkReceive receive : selector.completedReceives())
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 7f95566..0ddfce6 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.network;
import java.nio.channels.SelectionKey;
+import java.security.GeneralSecurityException;
import javax.net.ssl.SSLEngine;
import org.apache.kafka.common.config.SecurityConfig;
@@ -43,11 +44,9 @@ import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.Security;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -61,7 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* A set of tests for the selector. These use a test harness that runs a
simple socket server that echos back responses.
*/
-public class SslSelectorTest extends SelectorTest {
+public abstract class SslSelectorTest extends SelectorTest {
private Map<String, Object> sslClientConfigs;
@@ -73,7 +72,7 @@ public class SslSelectorTest extends SelectorTest {
this.server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs);
this.server.start();
this.time = new MockTime();
- sslClientConfigs = TestSslUtils.createSslConfig(false, false,
Mode.CLIENT, trustStoreFile, "client");
+ sslClientConfigs = createSslClientConfigs(trustStoreFile);
LogContext logContext = new LogContext();
this.channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false,
logContext);
this.channelBuilder.configure(sslClientConfigs);
@@ -81,6 +80,8 @@ public class SslSelectorTest extends SelectorTest {
this.selector = new Selector(5000, metrics, time, "MetricGroup",
channelBuilder, logContext);
}
+ protected abstract Map<String, Object> createSslClientConfigs(File
trustStoreFile) throws GeneralSecurityException, IOException;
+
@AfterEach
public void tearDown() throws Exception {
this.selector.close();
@@ -89,18 +90,12 @@ public class SslSelectorTest extends SelectorTest {
}
@Override
- public SecurityProtocol securityProtocol() {
- return SecurityProtocol.PLAINTEXT;
- }
-
- @Override
protected Map<String, Object> clientConfigs() {
return sslClientConfigs;
}
@Test
public void testConnectionWithCustomKeyManager() throws Exception {
-
TestProviderCreator testProviderCreator = new TestProviderCreator();
int requestSize = 100 * 1024;
@@ -249,35 +244,6 @@ public class SslSelectorTest extends SelectorTest {
verifySelectorEmpty();
}
- /**
- * Renegotiation is not supported since it is potentially unsafe and it
has been removed in TLS 1.3
- */
- @Test
- public void testRenegotiationFails() throws Exception {
- String node = "0";
- // create connections
- InetSocketAddress addr = new InetSocketAddress("localhost",
server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- // send echo requests and receive responses
- while (!selector.isChannelReady(node)) {
- selector.poll(1000L);
- }
- selector.send(createSend(node, node + "-" + 0));
- selector.poll(0L);
- server.renegotiate();
- selector.send(createSend(node, node + "-" + 1));
- long expiryTime = System.currentTimeMillis() + 2000;
-
- List<String> disconnected = new ArrayList<>();
- while (!disconnected.contains(node) && System.currentTimeMillis() <
expiryTime) {
- selector.poll(10);
- disconnected.addAll(selector.disconnected().keySet());
- }
- assertTrue(disconnected.contains(node), "Renegotiation should cause
disconnection");
-
- }
-
@Override
@Test
public void testMuteOnOOM() throws Exception {
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/Tls12SelectorTest.java
b/clients/src/test/java/org/apache/kafka/common/network/Tls12SelectorTest.java
new file mode 100644
index 0000000..59903b5
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/common/network/Tls12SelectorTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+import static java.util.Arrays.asList;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.test.TestSslUtils;
+import org.junit.jupiter.api.Test;
+
+public class Tls12SelectorTest extends SslSelectorTest {
+
+ @Override
+ protected Map<String, Object> createSslClientConfigs(File trustStoreFile)
+ throws GeneralSecurityException, IOException {
+ Map<String, Object> configs = TestSslUtils.createSslConfig(false,
false, Mode.CLIENT,
+ trustStoreFile, "client");
+ configs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
asList("TLSv1.2"));
+ return configs;
+ }
+
+ /**
+ * Renegotiation is not supported when TLS 1.2 is used (renegotiation was
removed from TLS 1.3)
+ */
+ @Test
+ public void testRenegotiationFails() throws Exception {
+ String node = "0";
+ // create connections
+ InetSocketAddress addr = new InetSocketAddress("localhost",
server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ // send echo requests and receive responses
+ while (!selector.isChannelReady(node)) {
+ selector.poll(1000L);
+ }
+ selector.send(createSend(node, node + "-" + 0));
+ selector.poll(0L);
+ server.renegotiate();
+ selector.send(createSend(node, node + "-" + 1));
+ long expiryTime = System.currentTimeMillis() + 2000;
+
+ List<String> disconnected = new ArrayList<>();
+ while (!disconnected.contains(node) && System.currentTimeMillis() <
expiryTime) {
+ selector.poll(10);
+ disconnected.addAll(selector.disconnected().keySet());
+ }
+ assertTrue(disconnected.contains(node), "Renegotiation should cause
disconnection");
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/Tls13SelectorTest.java
b/clients/src/test/java/org/apache/kafka/common/network/Tls13SelectorTest.java
new file mode 100644
index 0000000..afae3e2
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/common/network/Tls13SelectorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+import static java.util.Arrays.asList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledForJreRange;
+import org.junit.jupiter.api.condition.JRE;
+
+@EnabledForJreRange(min = JRE.JAVA_11) // TLS 1.3 is only supported with Java
11 and newer
+public class Tls13SelectorTest extends SslSelectorTest {
+
+ @Override
+ protected Map<String, Object> createSslClientConfigs(File trustStoreFile)
throws GeneralSecurityException, IOException {
+ Map<String, Object> configs = TestSslUtils.createSslConfig(false,
false, Mode.CLIENT,
+ trustStoreFile, "client");
+ configs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
asList("TLSv1.3"));
+ return configs;
+ }
+
+ /**
+ * TLS 1.3 has a post-handshake key and IV update, which will update the
sending and receiving keys
+ * for one side of the connection.
+ *
+ * Key Usage Limits will trigger an update when the algorithm limits are
reached, but the default
+ * value is too large (2^37 bytes of plaintext data) for a unit test. This
value can be overridden
+ * via the security property `jdk.tls.keyLimits`, but that's also
difficult to achieve in a unit
+ * test.
+ *
+ * Applications can also trigger an update by calling
`SSLSocket.startHandshake()` or
+ * `SSLEngine.beginHandshake()` (this would trigger `renegotiation` with
TLS 1.2) and that's the
+ * approach we take here.
+ */
+ @Test
+ public void testKeyUpdate() throws Exception {
+ String node = "0";
+ // create connections
+ InetSocketAddress addr = new InetSocketAddress("localhost",
server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+ // send echo requests and receive responses
+ while (!selector.isChannelReady(node)) {
+ selector.poll(1000L);
+ }
+ selector.send(createSend(node, node + "-" + 0));
+ selector.poll(0L);
+ server.renegotiate();
+ selector.send(createSend(node, node + "-" + 1));
+ List<NetworkReceive> received = new ArrayList<>();
+ TestUtils.waitForCondition(() -> {
+ try {
+ selector.poll(1000L);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ for (NetworkReceive receive : selector.completedReceives()) {
+ if (receive.source().equals(node))
+ received.add(receive);
+ }
+ return received.size() == 2;
+ }, "Expected two receives, got " + received.size());
+
+ assertEquals(asList("0-0", "0-1"),
received.stream().map(this::asString).collect(Collectors.toList()));
+ }
+}