This is an automated email from the ASF dual-hosted git repository. burcham pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new d2535394a8 GEODE-10122: P2P Messaging Handles TLS KeyUpdate Message (#7449) d2535394a8 is described below commit d2535394a82ac5faf10f004f4e3c15f756f7b177 Author: Bill Burcham <bill.burc...@gmail.com> AuthorDate: Wed Apr 6 20:14:48 2022 -0700 GEODE-10122: P2P Messaging Handles TLS KeyUpdate Message (#7449) * Key expiration works for TLSv1.3 and GCM-based ciphers * TLS KeyUpdate messages are processed correctly --- .../internal/P2PMessagingConcurrencyDUnitTest.java | 2 +- ...P2pMessagingSslTlsKeyUpdateDistributedTest.java | 367 +++++++++++++++ .../tcp/ConnectionCloseSSLTLSDUnitTest.java | 8 +- .../internal/net/NioSslEngineKeyUpdateTest.java | 497 +++++++++++++++++++++ .../apache/geode/internal/net/NioSslEngine.java | 69 +-- .../org/apache/geode/internal/tcp/Connection.java | 2 +- .../geode/internal/net/NioSslEngineTest.java | 40 +- 7 files changed, 939 insertions(+), 46 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java index 28abe5dd3e..e761e1bb12 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java @@ -167,7 +167,6 @@ public class P2PMessagingConcurrencyDUnitTest { bytesTransferredAdder = new LongAdder(); final ClusterDistributionManager cdm = getCDM(); - final Random random = new Random(RANDOM_SEED); final AtomicInteger nextSenderId = new AtomicInteger(); /* @@ -194,6 +193,7 @@ public class P2PMessagingConcurrencyDUnitTest { throw new RuntimeException("doSending failed", e); } final int firstMessageId = senderId * SENDER_COUNT; + final Random random = new Random(RANDOM_SEED); for (int messageId = firstMessageId; messageId < firstMessageId + MESSAGES_PER_SENDER; messageId++) { final TestMessage msg = new TestMessage(receiverMember, random, messageId); diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java new file mode 100644 index 0000000000..e31f9cfd79 --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.distributed.internal; + +import static org.apache.geode.distributed.ConfigurationProperties.SSL_CIPHERS; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_PROTOCOLS; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Field; +import java.security.GeneralSecurityException; +import java.security.Security; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; + +import junitparams.Parameters; +import org.jetbrains.annotations.NotNull; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.ssl.CertStores; +import org.apache.geode.cache.ssl.CertificateBuilder; +import org.apache.geode.cache.ssl.CertificateMaterial; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.serialization.DeserializationContext; +import org.apache.geode.internal.serialization.SerializationContext; +import org.apache.geode.test.dunit.SerializableRunnableIF; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.junit.categories.MembershipTest; +import org.apache.geode.test.junit.runners.GeodeParamsRunner; +import org.apache.geode.test.version.VersionManager; + +/** + * In TLSv1.3, when a GCM-based cipher is used, there is a limit on the number + * of bytes that may be encoded with a key. When that limit is reached, a TLS + * KeyUpdate message is generated (by the SSLEngine). That message causes a new + * key to be negotiated between the peer SSLEngines. + * + * This test arranges for a low encryption byte limit to be set for the sending member + * and then for the receiving member. With the low byte limit configured, the test + * sends P2P messages via TLS and verifies request-reply message processing. + */ +@Category({MembershipTest.class}) +@RunWith(GeodeParamsRunner.class) +public class P2pMessagingSslTlsKeyUpdateDistributedTest { + + private static final String TLS_PROTOCOL = "TLSv1.3"; + private static final String TLS_CIPHER_SUITE = "TLS_AES_256_GCM_SHA384"; + + private static final int ENCRYPTED_BYTES_LIMIT = 64 * 1024; + + private static final int MESSAGE_SIZE = 1024; + + /* + * How many messages will be generated? We generate enough to cause KeyUpdate + * to be generated, and then we generate many more beyond that. Even with buggy + * wrap/unwrap logic, the retries in DirectChannel.sendToMany() and the transparent + * connection reestablishment in ConnectionTable can mask those bugs. So to reliably + * fail in the presence of bugs we need to generate lots of extra messages. + */ + private static final int MESSAGES_PER_SENDER = ENCRYPTED_BYTES_LIMIT / MESSAGE_SIZE + 2000; + + { + assertThat(MESSAGE_SIZE * MESSAGES_PER_SENDER > 10 * ENCRYPTED_BYTES_LIMIT); + } + + public static final int MAX_REPLY_WAIT_MILLIS = 1_000; + + private static Properties geodeConfigurationProperties; + + @Rule + public final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3); + + private MemberVM sender; + private MemberVM receiver; + + @After + public void afterEach() { + clusterStartupRule.getVM(0).bounceForcibly(); + clusterStartupRule.getVM(1).bounceForcibly(); + clusterStartupRule.getVM(2).bounceForcibly(); + } + + /* + * bytes sent on sender JVM, bytes received on receiver JVM + * (not used in test JVM) + */ + private static LongAdder bytesTransferredAdder; + + // in receiver JVM only + private static LongAdder repliesGeneratedAdder; + + // in sender JVM only + private static LongAdder repliesReceivedAdder; + + + private void configureJVMsAndStartClusterMembers( + final long locatorEncryptedBytesLimit, + final long senderEncryptedBytesLimit, + final long receiverEncryptedBytesLimit) + throws GeneralSecurityException, IOException { + + clusterStartupRule.getVM(0).invoke( + setSecurityProperties(locatorEncryptedBytesLimit)); + clusterStartupRule.getVM(1).invoke( + setSecurityProperties(senderEncryptedBytesLimit)); + clusterStartupRule.getVM(2).invoke( + setSecurityProperties(receiverEncryptedBytesLimit)); + + final Properties senderConfiguration = geodeConfigurationProperties(); + final Properties receiverConfiguration = geodeConfigurationProperties(); + + final MemberVM locator = + clusterStartupRule.startLocatorVM(0, 0, VersionManager.CURRENT_VERSION, + x -> x.withProperties(senderConfiguration).withConnectionToLocator() + .withoutClusterConfigurationService().withoutManagementRestService()); + + sender = clusterStartupRule.startServerVM(1, senderConfiguration, locator.getPort()); + receiver = clusterStartupRule.startServerVM(2, receiverConfiguration, locator.getPort()); + } + + private @NotNull SerializableRunnableIF setSecurityProperties(final long encryptedBytesLimit) { + return () -> { + Security.setProperty("jdk.tls.keyLimits", + "AES/GCM/NoPadding KeyUpdate " + encryptedBytesLimit); + + final Class<?> sslCipher = Class.forName("sun.security.ssl.SSLCipher"); + final Field cipherLimits = sslCipher.getDeclaredField("cipherLimits"); + cipherLimits.setAccessible(true); + assertThat((Map<String, Long>) cipherLimits.get(null)).containsEntry( + "AES/GCM/NOPADDING:KEYUPDATE", + encryptedBytesLimit); + }; + } + + @Test + @Parameters({ + "137438953472, 65536, 137438953472", + "137438953472, 137438953472, 65536", + }) + public void testP2PMessagingWithKeyUpdate( + final long locatorEncryptedBytesLimit, + final long senderEncryptedBytesLimit, + final long receiverEncryptedBytesLimit) + throws GeneralSecurityException, IOException { + + configureJVMsAndStartClusterMembers(locatorEncryptedBytesLimit, senderEncryptedBytesLimit, + receiverEncryptedBytesLimit); + + final InternalDistributedMember receiverMember = + receiver.invoke("get receiving member id", () -> { + + bytesTransferredAdder = new LongAdder(); + repliesGeneratedAdder = new LongAdder(); + + final ClusterDistributionManager cdm = getCDM(); + final InternalDistributedMember localMember = cdm.getDistribution().getLocalMember(); + return localMember; + }); + + // by returning a value from the invoked lambda we make invocation synchronous + final Boolean sendingComplete = + sender.invoke("message sending and reply counting", () -> { + + bytesTransferredAdder = new LongAdder(); + repliesReceivedAdder = new LongAdder(); + + final ClusterDistributionManager cdm = getCDM(); + + int failedRecipientCount = 0; + int droppedRepliesCount = 0; + + final ReplyProcessor21[] replyProcessors = new ReplyProcessor21[MESSAGES_PER_SENDER]; + + // this loop sends request messages + for (int messageId = 0; messageId < MESSAGES_PER_SENDER; messageId++) { + + final ReplyProcessor21 replyProcessor = new ReplyProcessor21(cdm, receiverMember); + replyProcessors[messageId] = replyProcessor; + final TestMessage msg = new TestMessage(messageId, receiverMember, + replyProcessor.getProcessorId()); + + final Set<InternalDistributedMember> failedRecipients = cdm.putOutgoing(msg); + if (failedRecipients == null) { + bytesTransferredAdder.add(MESSAGE_SIZE); + } else { + failedRecipientCount += failedRecipients.size(); + } + } + + // this loop counts reply arrivals + for (int messageId = 0; messageId < MESSAGES_PER_SENDER; messageId++) { + final ReplyProcessor21 replyProcessor = replyProcessors[messageId]; + final boolean receivedReply = + replyProcessor.waitForRepliesUninterruptibly(MAX_REPLY_WAIT_MILLIS); + if (receivedReply) { + repliesReceivedAdder.increment(); + } else { + droppedRepliesCount += 1; + } + } + + assertThat((long) failedRecipientCount).as("message delivery failed N times").isZero(); + assertThat((long) droppedRepliesCount).as("some replies were dropped").isZero(); + return true; + }); + + // at this point, sender is done sending + final long bytesSent = getByteCount(sender); + + await().untilAsserted( + () -> { + assertThat(getRepliesGenerated()).isEqualTo(MESSAGES_PER_SENDER); + assertThat(getRepliesReceived()).isEqualTo(MESSAGES_PER_SENDER); + assertThat(getByteCount(receiver)) + .as("bytes received != bytes sent") + .isEqualTo(bytesSent); + }); + } + + private long getRepliesGenerated() { + return receiver.invoke(() -> repliesGeneratedAdder.sum()); + } + + private long getRepliesReceived() { + return sender.invoke(() -> repliesReceivedAdder.sum()); + } + + private long getByteCount(final MemberVM member) { + return member.invoke(() -> bytesTransferredAdder.sum()); + } + + private static ClusterDistributionManager getCDM() { + return (ClusterDistributionManager) ((InternalCache) CacheFactory.getAnyInstance()) + .getDistributionManager(); + } + + private static class TestMessage extends DistributionMessage { + private volatile int messageId; + private volatile int replyProcessorId; + private volatile int length; + + TestMessage(final int messageId, + final InternalDistributedMember receiver, + final int replyProcessorId) { + setRecipient(receiver); + this.messageId = messageId; + this.replyProcessorId = replyProcessorId; + } + + // necessary for deserialization + public TestMessage() { + messageId = 0; + replyProcessorId = 0; + } + + @Override + public int getProcessorType() { + return OperationExecutors.STANDARD_EXECUTOR; + } + + @Override + protected void process(final ClusterDistributionManager dm) { + + // In case bugs cause fromData to be called more times than this method, + // we don't count the bytes as "transferred" until we're in this method. + bytesTransferredAdder.add(length); + + final ReplyMessage replyMsg = new ReplyMessage(); + replyMsg.setRecipient(getSender()); + replyMsg.setProcessorId(replyProcessorId); + replyMsg.setReturnValue("howdy!"); + dm.putOutgoing(replyMsg); + repliesGeneratedAdder.increment(); + } + + @Override + public void toData(final DataOutput out, final SerializationContext context) + throws IOException { + super.toData(out, context); + + out.writeInt(messageId); + out.writeInt(replyProcessorId); + + final ThreadLocalRandom random = ThreadLocalRandom.current(); + final int length = MESSAGE_SIZE; + + out.writeInt(length); + + final byte[] payload = new byte[length]; + random.nextBytes(payload); + + out.write(payload); + } + + @Override + public void fromData(final DataInput in, final DeserializationContext context) + throws IOException, ClassNotFoundException { + super.fromData(in, context); + + messageId = in.readInt(); + replyProcessorId = in.readInt(); + + length = in.readInt(); + + final byte[] payload = new byte[length]; + + in.readFully(payload); + } + + @Override + public int getDSFID() { + return NO_FIXED_ID; // for testing only! + } + } + + private static @NotNull Properties geodeConfigurationProperties() + throws GeneralSecurityException, IOException { + // subsequent calls must return the same value so members agree on credentials + if (geodeConfigurationProperties == null) { + final CertificateMaterial ca = new CertificateBuilder() + .commonName("Test CA") + .isCA() + .generate(); + + final CertificateMaterial serverCertificate = new CertificateBuilder() + .commonName("member") + .issuedBy(ca) + .generate(); + + final CertStores memberStore = new CertStores("member"); + memberStore.withCertificate("member", serverCertificate); + memberStore.trust("ca", ca); + // we want to exercise the ByteBufferSharing code paths; we don't care about client auth etc + final Properties props = memberStore.propertiesWith("all", false, false); + props.setProperty(SSL_PROTOCOLS, TLS_PROTOCOL); + props.setProperty(SSL_CIPHERS, TLS_CIPHER_SUITE); + geodeConfigurationProperties = props; + } + return geodeConfigurationProperties; + } +} diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java index 77fe9bf81f..72c79b03c0 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java @@ -33,7 +33,6 @@ import static org.apache.geode.test.dunit.VM.getVM; import static org.apache.geode.test.util.ResourceUtils.createTempFileFromResource; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.Fail.fail; import java.io.File; import java.io.Serializable; @@ -62,6 +61,7 @@ import org.apache.geode.logging.internal.log4j.api.LogService; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.rules.DistributedBlackboard; +import org.apache.geode.test.dunit.rules.DistributedErrorCollector; import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; import org.apache.geode.test.dunit.rules.DistributedRule; @@ -96,6 +96,9 @@ public class ConnectionCloseSSLTLSDUnitTest implements Serializable { public DistributedRestoreSystemProperties restoreSystemProperties = new DistributedRestoreSystemProperties(); + @Rule + public DistributedErrorCollector errorCollector = new DistributedErrorCollector(); + private VM locator; private VM sender; private VM receiver; @@ -139,9 +142,8 @@ public class ConnectionCloseSSLTLSDUnitTest implements Serializable { blackboard.signalGate(UPDATE_ENTERED_GATE); blackboard.waitForGate(SUSPEND_UPDATE_GATE); } catch (TimeoutException | InterruptedException e) { - fail("message observus interruptus"); + errorCollector.addError(e); } - logger.info("BGB: got before process message: " + message); }); } }; diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/NioSslEngineKeyUpdateTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/NioSslEngineKeyUpdateTest.java new file mode 100644 index 0000000000..e2493d85c9 --- /dev/null +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/NioSslEngineKeyUpdateTest.java @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.net; + +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +import java.io.FileInputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.security.GeneralSecurityException; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.Security; +import java.security.UnrecoverableKeyException; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.TrustManagerFactory; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.geode.cache.ssl.CertStores; +import org.apache.geode.cache.ssl.CertificateBuilder; +import org.apache.geode.cache.ssl.CertificateMaterial; +import org.apache.geode.distributed.internal.DMStats; + +/** + * In TLSv1.3, when a GCM-based cipher is used, there is a limit on the number + * of bytes that may be encoded with a key. When that limit is reached, a TLS + * KeyUpdate message is generated (by the SSLEngine). That message causes a new + * key to be negotiated between the peer SSLEngines. + * + * Geode's {@link NioSslEngine} class (subclass of {@link NioFilter}) encapsulates + * Java's {@link SSLEngine}. Geode's MsgStreamer, Connection, and ConnectionTable + * classes interact with NioSslEngine to accomplish peer-to-peer (P2P) messaging. + * + * This test constructs a pair of SSLEngine's and wraps them in NioSslEngine. + * Rather than relying on MsgStreamer, Connection, or ConnectionTable classes + * (which themselves have a lot of dependencies on other classes), this test + * implements simplified logic for driving the sending and receiving of data, + * i.e. calling NioSslEngine.wrap() and NioSslEngine.unwrap(). See + * {@link #send(int, NioFilter, SocketChannel, int)} and + * {@link #receive(int, NioFilter, SocketChannel, ByteBuffer)}. + * + * The {@link #keyUpdateDuringSecureDataTransferTest()} arranges for the encrypted bytes limit + * to be reached very quickly (see {@link #ENCRYPTED_BYTES_LIMIT}). + * The test verifies data transfer continues correctly, after the limit is reached. + * This indirectly verifies that the KeyUpdate protocol initiated by the sending + * SSLEngine is correctly handled by all the components involved. + */ +public class NioSslEngineKeyUpdateTest { + + private static final String TLS_PROTOCOL = "TLSv1.3"; + private static final String TLS_CIPHER_SUITE = "TLS_AES_256_GCM_SHA384"; + + // number of bytes the GCM cipher can encrypt before initiating a KeyUpdate + private static final int ENCRYPTED_BYTES_LIMIT = 1; + + { + Security.setProperty("jdk.tls.keyLimits", + "AES/GCM/NoPadding KeyUpdate " + ENCRYPTED_BYTES_LIMIT); + } + + private static BufferPool bufferPool; + private static SSLContext sslContext; + private static KeyStore keystore; + private static char[] keystorePassword; + private static KeyStore truststore; + + private SSLEngine clientEngine; + private SSLEngine serverEngine; + private int packetBufferSize; + + @BeforeAll + public static void beforeClass() throws GeneralSecurityException, IOException { + DMStats mockStats = mock(DMStats.class); + bufferPool = new BufferPool(mockStats); + + final Properties securityProperties = createKeystoreAndTruststore(); + + keystore = KeyStore.getInstance("JKS"); + keystorePassword = securityProperties.getProperty(SSL_KEYSTORE_PASSWORD).toCharArray(); + keystore.load(new FileInputStream(securityProperties.getProperty(SSL_KEYSTORE)), + keystorePassword); + + truststore = KeyStore.getInstance("JKS"); + final char[] truststorePassword = + securityProperties.getProperty(SSL_TRUSTSTORE_PASSWORD).toCharArray(); + truststore.load(new FileInputStream(securityProperties.getProperty(SSL_TRUSTSTORE)), + truststorePassword); + } + + @BeforeEach + public void before() throws NoSuchAlgorithmException, UnrecoverableKeyException, + KeyStoreException, KeyManagementException { + final KeyManagerFactory kmf = KeyManagerFactory.getInstance("PKIX"); + kmf.init(keystore, keystorePassword); + final TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX"); + tmf.init(truststore); + + sslContext = SSLContext.getInstance("TLS"); + final SecureRandom random = new SecureRandom(new byte[] {1, 2, 3}); + sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), random); + + final SSLParameters defaultParameters = sslContext.getDefaultSSLParameters(); + final String[] protocols = defaultParameters.getProtocols(); + final String[] cipherSuites = defaultParameters.getCipherSuites(); + System.out.println( + String.format("TLS settings (default) before handshake: Protocols: %s, Cipher Suites: %s", + Arrays.toString(protocols), Arrays.toString(cipherSuites))); + + clientEngine = createSSLEngine("server-host", true, sslContext); + packetBufferSize = clientEngine.getSession().getPacketBufferSize(); + + serverEngine = createSSLEngine("client-host", false, sslContext); + } + + /* + * Verify initial handshake succeeds in the presence of KeyUpdate messages + * (i.e. updating send-side cryptographic keys). + * + * This test verifies, primarily, the behavior of + * NioSslEngine.handshake(SocketChannel, int, ByteBuffer) + */ + @Test + public void keyUpdateDuringInitialHandshakeTest() { + clientServerTest( + (channel, filter, peerNetData) -> { + handshakeTLS(channel, filter, peerNetData, "Client:"); + return true; + }, + (channel, filter, peerNetData1) -> { + handshakeTLS(channel, filter, peerNetData1, + "Server:"); + return true; + }); + } + + /* + * Building on keyUpdateDuringInitialHandshakeTest(), this test verifies that + * after the handshake succeeds, subsequent data transfer succeeds in the presence + * of KeyUpdate messages (i.e. updating send-side cryptographic keys). + * + * This test verifies, primarily, the behavior of NioSslEngine#wrap(ByteBuffer). + */ + @Test + public void keyUpdateDuringSecureDataTransferTest() { + clientServerTest( + (final SocketChannel channel, + final NioSslEngine filter, + final ByteBuffer peerNetData) -> { + handshakeTLS(channel, filter, peerNetData, "Client:"); + /* + * In order to verify that KeyUpdate is properly handled in NioSslEngine.wrap() + * we must arrange for the KeyUpdate (generation and processing) to occur after + * the initial handshake and before the handshaking during NioSslEngine.close(). + * + * If we call send() only once, regardless of the number of bytes wrapped (even + * if it exceeds the encryption byte limit set in jdk.tls.keyLimits, the status + * result from SSLEngine.wrap() will be OK. We will fail to encounter the situation + * where it is e.g. BUFFER_OVERFLOW. + * + * By calling send() with bytesToSend >= the limit, we can be sure that the + * subsequent call to send() will trigger the KeyUpdate and will require proper + * handling of that situation in NioSslEngine.wrap(). + */ + send(ENCRYPTED_BYTES_LIMIT, filter, channel, 0); + send(ENCRYPTED_BYTES_LIMIT, filter, channel, ENCRYPTED_BYTES_LIMIT); + + return true; + }, + (final SocketChannel channel, + final NioSslEngine filter, + final ByteBuffer peerNetData) -> { + handshakeTLS(channel, filter, peerNetData, "Server:"); + /* + * Call receive() twice (like we did for send()) just to test that receive() is + * leaving buffers in the correct readable/writable state when it returns. + */ + for (int i = 0; i < 2; i++) { + final byte[] received = + receive(ENCRYPTED_BYTES_LIMIT, filter, channel, peerNetData); + assertThat(received).hasSize(ENCRYPTED_BYTES_LIMIT); + for (int j = i * ENCRYPTED_BYTES_LIMIT; j < (i + 1) + * ENCRYPTED_BYTES_LIMIT; j++) { + assertThat(received[j % received.length]).isEqualTo((byte) j); + } + } + return true; + }); + } + + /* + * Building on keyUpdateDuringSecureDataTransferTest(), this test verifies that + * NioSslEngine.close() succeeds in the presence of KeyUpdate messages + * (i.e. updating send-side cryptographic keys). This test is important because + * NioSslEngine.close() involves some TLS handshaking. + * + * This test verifies, primarily, the behavior of NioSslEngine#close(SocketChannel). + */ + @Test + public void keyUpdateDuringSocketCloseHandshakeTest() { + clientServerTest( + (final SocketChannel channel, + final NioSslEngine filter, + final ByteBuffer peerNetData) -> { + handshakeTLS(channel, filter, peerNetData, "Client:"); + /* + * Leave send-side SSLEngine in a state where it will generate a KeyUpdate + * TLS message during (but not before) NioSslEngine.close(). + */ + send(ENCRYPTED_BYTES_LIMIT, filter, channel, 0); + return true; + }, + (final SocketChannel channel, + final NioSslEngine filter, + final ByteBuffer peerNetData) -> { + handshakeTLS(channel, filter, peerNetData, "Server:"); + receive(ENCRYPTED_BYTES_LIMIT, filter, channel, peerNetData); + /* + * No need to validate the received data since our purpose is only to verify that + * NioSslEngine.close() succeeds cleanly. + */ + return true; + }); + } + + private static SSLEngine createSSLEngine(final String peerHost, final boolean useClientMode, + final SSLContext sslContext) { + final SSLEngine engine = sslContext.createSSLEngine(peerHost, 10001); + engine.setEnabledProtocols(new String[] {TLS_PROTOCOL}); + engine.setEnabledCipherSuites(new String[] {TLS_CIPHER_SUITE}); + engine.setUseClientMode(useClientMode); + return engine; + } + + private void clientServerTest(final PeerAction clientAction, final PeerAction serverAction) { + final ExecutorService executorService = Executors.newFixedThreadPool(2); + + final CompletableFuture<SocketAddress> boundAddress = new CompletableFuture<>(); + + final CountDownLatch serversWaiting = new CountDownLatch(1); + + final CompletableFuture<Boolean> serverHandshakeFuture = + supplyAsync( + () -> server(boundAddress, packetBufferSize, + serverAction, serversWaiting, serverEngine), + executorService); + + final CompletableFuture<Boolean> clientHandshakeFuture = + supplyAsync( + () -> client(boundAddress, packetBufferSize, + clientAction, serversWaiting, clientEngine), + executorService); + + CompletableFuture.allOf(serverHandshakeFuture, clientHandshakeFuture) + .join(); + } + + /* + * An action taken on a client or server after the SocketChannel has been established. + */ + private interface PeerAction { + boolean apply(final SocketChannel acceptedChannel, + final NioSslEngine filter, + final ByteBuffer peerNetData) throws IOException; + } + + private static boolean client( + final CompletableFuture<SocketAddress> boundAddress, + final int packetBufferSize, + final PeerAction peerAction, + final CountDownLatch serversWaiting, + final SSLEngine engine) { + try { + try (final SocketChannel connectedChannel = SocketChannel.open()) { + connectedChannel.connect(boundAddress.get()); + final ByteBuffer peerNetData = + ByteBuffer.allocateDirect(packetBufferSize); + + final NioSslEngine filter = new NioSslEngine(engine, bufferPool); + + final boolean result = + peerAction.apply(connectedChannel, filter, peerNetData); + + serversWaiting.await(); // wait for last server to give up before closing our socket + + filter.close(connectedChannel); + + return result; + } + } catch (IOException | InterruptedException | ExecutionException e) { + printException("In client:", e); + throw new RuntimeException(e); + } + } + + private static boolean server( + final CompletableFuture<SocketAddress> boundAddress, + final int packetBufferSize, + final PeerAction peerAction, + final CountDownLatch serversWaiting, + final SSLEngine engine) { + try (final ServerSocketChannel boundChannel = ServerSocketChannel.open()) { + final InetSocketAddress bindAddress = + new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); + boundChannel.bind(bindAddress); + boundAddress.complete(boundChannel.getLocalAddress()); + try (final SocketChannel acceptedChannel = boundChannel.accept()) { + final ByteBuffer peerNetData = + ByteBuffer.allocateDirect(packetBufferSize); + + final NioSslEngine filter = new NioSslEngine(engine, bufferPool); + + final boolean result = + peerAction.apply(acceptedChannel, filter, peerNetData); + + filter.close(acceptedChannel); + + return result; + } + } catch (IOException e) { + printException("In server:", e); + throw new RuntimeException(e); + } finally { + serversWaiting.countDown(); + } + } + + private static void printException(final String context, final Exception e) { + System.out.println(context + "\n"); + e.printStackTrace(); + } + + private static Properties createKeystoreAndTruststore() + throws GeneralSecurityException, IOException { + final CertificateMaterial ca = new CertificateBuilder() + .commonName("Test CA") + .isCA() + .generate(); + + CertificateMaterial serverCertificate = new CertificateBuilder() + .commonName("server") + .issuedBy(ca) + .generate(); + + final CertStores serverStore = CertStores.serverStore(); + serverStore.withCertificate("server", serverCertificate); + serverStore.trust("ca", ca); + return serverStore.propertiesWith("all", true, false); + } + + /** + * @param filter is a newly constructed and intitialized object; it has not been used + * for handshakes previously. + * @param peerNetData on entry: don't care about read/write state or contents + */ + private static boolean handshakeTLS(final SocketChannel channel, + final NioSslEngine filter, + final ByteBuffer peerNetData, + final String context) throws IOException { + final boolean blocking = channel.isBlocking(); + try { + channel.configureBlocking(false); + final boolean result = + filter.handshake(channel, 6_000, peerNetData); + System.out.println( + String.format( + "%s TLS settings after successful handshake: Protocol: %s, Cipher Suite: %s", + context, + filter.engine.getSession().getProtocol(), + filter.engine.getSession().getCipherSuite())); + return result; + } finally { + channel.configureBlocking(blocking); + } + } + + /** + * This method is trying to do what Connection readMessages() and processInputBuffer() do + * together. + * + * Note well: peerNetData may contain content on entry to this method. Also the filter's + * buffers (e.g. the buffer returned by unwrap) may already contain data from previous + * calls to unwrap(). + * + * @param peerNetData will be in write mode on entry and may already contain content + */ + private static byte[] receive( + final int bytesToReceive, + final NioFilter filter, + final SocketChannel channel, + final ByteBuffer peerNetData) throws IOException { + final byte[] received = new byte[bytesToReceive]; + // peerNetData in write mode + peerNetData.flip(); + // peerNetData in read mode + int pos = 0; + while (pos < bytesToReceive) { + /* + * On first iteration unwrap() is called before the channel is read. This is necessary + * since a previous call to this method (receive()) could have left data in the filter + * and there might not be any more data coming on the channel (ever). + * + * If no data was already held in the filter's buffer, and peerNetData was empty + * before calling unwrap() then the buffer returned by unwrap will be empty. But before + * we start the second loop iteration, we'll read (from the channel into peerNetData). + * + * The filter's unwrap() method takes peerNetData in read mode and when the method returns + * the buffer is in write mode (ready for us to add data to it if needed). + */ + try (final ByteBufferSharing appDataSharing = filter.unwrap(peerNetData)) { + // peerNetData in write mode + final ByteBuffer appData = appDataSharing.getBuffer(); + // appData in write mode + appData.flip(); + // appData in read mode + if (appData.hasRemaining()) { + final int newBytes = Math.min(appData.remaining(), received.length - pos); + assert pos + newBytes <= received.length; + appData.get(received, pos, newBytes); + pos += newBytes; + } else { + channel.read(peerNetData); + } + peerNetData.flip(); + // peerNetData in read mode ready for filter unwrap() call + appData.compact(); + // appData in write mode ready for filter unwrap() call + } + } + peerNetData.compact(); + // peerNetData in write mode + return received; + } + + /* + * This method is trying to do what Connection.writeFully() does. + */ + private static void send( + final int bytesToSend, + final NioFilter filter, + final SocketChannel channel, + final int startingValue) + throws IOException { + // if we wanted to send more than one buffer-full we could add an outer loop + final ByteBuffer appData = ByteBuffer.allocateDirect(bytesToSend); + for (int i = 0; i < bytesToSend; i++) { + appData.put((byte) (i + startingValue)); + } + appData.flip(); + try (final ByteBufferSharing netDataSharing = filter.wrap(appData)) { + final ByteBuffer netData = netDataSharing.getBuffer(); + while (netData.remaining() > 0) { + channel.write(netData); + } + } + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java index 7831444200..6893535d3b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java @@ -18,7 +18,7 @@ import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED; import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_TASK; import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP; import static javax.net.ssl.SSLEngineResult.Status.BUFFER_OVERFLOW; -import static javax.net.ssl.SSLEngineResult.Status.OK; +import static javax.net.ssl.SSLEngineResult.Status.CLOSED; import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_RECEIVER; import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_SENDER; @@ -40,6 +40,7 @@ import javax.net.ssl.SSLSession; import org.apache.logging.log4j.Logger; import org.apache.geode.GemFireIOException; +import org.apache.geode.annotations.Immutable; import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.internal.net.BufferPool.BufferType; import org.apache.geode.internal.net.ByteBufferVendor.OpenAttemptTimedOut; @@ -51,6 +52,8 @@ import org.apache.geode.logging.internal.log4j.api.LogService; * Its use should be confined to one thread or should be protected by external synchronization. */ public class NioSslEngine implements NioFilter { + @Immutable + private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new byte[0]); private static final Logger logger = LogService.getLogger(); private final BufferPool bufferPool; @@ -229,7 +232,8 @@ public class NioSslEngine implements NioFilter { } @Override - public ByteBufferSharing wrap(ByteBuffer appData) throws IOException { + public ByteBufferSharing wrap(ByteBuffer appData) + throws IOException { try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) { ByteBuffer myNetData = outputSharing.getBuffer(); @@ -237,24 +241,21 @@ public class NioSslEngine implements NioFilter { myNetData.clear(); while (appData.hasRemaining()) { - // ensure we have lots of capacity since encrypted data might - // be larger than the app data - int remaining = myNetData.capacity() - myNetData.position(); - - if (remaining < (appData.remaining() * 2)) { - int newCapacity = expandedCapacity(appData, myNetData); - myNetData = outputSharing.expandWriteBufferIfNeeded(newCapacity); + final SSLEngineResult wrapResult = engine.wrap(appData, myNetData); + switch (wrapResult.getStatus()) { + case BUFFER_OVERFLOW: + final int newCapacity = + myNetData.position() + engine.getSession().getPacketBufferSize(); + myNetData = outputSharing.expandWriteBufferIfNeeded(newCapacity); + break; + case BUFFER_UNDERFLOW: + case CLOSED: + throw new SSLException("Error encrypting data: " + wrapResult); } - SSLEngineResult wrapResult = engine.wrap(appData, myNetData); - if (wrapResult.getHandshakeStatus() == NEED_TASK) { handleBlockingTasks(); } - - if (wrapResult.getStatus() != OK) { - throw new SSLException("Error encrypting data: " + wrapResult); - } } myNetData.flip(); @@ -373,6 +374,9 @@ public class NioSslEngine implements NioFilter { @Override public synchronized void close(SocketChannel socketChannel) { + + assert socketChannel.isBlocking(); + if (closed) { return; } @@ -381,19 +385,25 @@ public class NioSslEngine implements NioFilter { try (final ByteBufferSharing outputSharing = outputBufferVendor.open(1, TimeUnit.MINUTES)) { final ByteBuffer myNetData = outputSharing.getBuffer(); - if (!engine.isOutboundDone()) { - ByteBuffer empty = ByteBuffer.wrap(new byte[0]); - engine.closeOutbound(); + engine.closeOutbound(); + + SSLEngineResult result = null; + + while (!engine.isOutboundDone()) { // clear the buffer to receive a CLOSE message from the SSLEngine myNetData.clear(); // Get close message - SSLEngineResult result = engine.wrap(empty, myNetData); - - if (result.getStatus() != SSLEngineResult.Status.CLOSED) { - throw new SSLHandshakeException( - "Error closing SSL session. Status=" + result.getStatus()); + result = engine.wrap(EMPTY_BYTE_BUFFER, myNetData); + + /* + * We would have liked to make this one of the while() conditions but + * if status is CLOSED we'll get a "Broken pipe" exception in the next write() + * so it needs to be handled here. + */ + if (result.getStatus() == CLOSED) { + break; } // Send close message to peer @@ -402,6 +412,10 @@ public class NioSslEngine implements NioFilter { socketChannel.write(myNetData); } } + if (result != null && result.getStatus() != CLOSED) { + throw new SSLHandshakeException( + "Error closing SSL session. Status=" + result.getStatus()); + } } catch (ClosedChannelException e) { // we can't send a close message if the channel is closed } catch (IOException e) { @@ -416,18 +430,13 @@ public class NioSslEngine implements NioFilter { } } - private int expandedCapacity(ByteBuffer sourceBuffer, ByteBuffer targetBuffer) { - return Math.max(targetBuffer.position() + sourceBuffer.remaining() * 2, - targetBuffer.capacity() * 2); - } - @VisibleForTesting - public ByteBufferVendor getOutputBufferVendorForTestingOnly() throws IOException { + public ByteBufferVendor getOutputBufferVendorForTestingOnly() { return outputBufferVendor; } @VisibleForTesting - public ByteBufferVendor getInputBufferVendorForTestingOnly() throws IOException { + public ByteBufferVendor getInputBufferVendorForTestingOnly() { return inputBufferVendor; } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 92d89c0656..57bb4880ca 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -1850,7 +1850,7 @@ public class Connection implements Runnable { * checks to see if an exception should not be logged: i.e., "forcibly closed", "reset by peer", * or "connection reset" */ - private static boolean isIgnorableIOException(Exception e) { + private static boolean isIgnorableIOException(IOException e) { if (e instanceof ClosedChannelException) { return true; } diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java index 4315f8c1dc..abf93bd753 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java @@ -37,6 +37,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.net.Socket; import java.net.SocketException; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SocketChannel; @@ -189,7 +190,7 @@ public class NioSslEngineTest { } @Test - public void wrap() throws Exception { + public void engineWrapCausesResizeThenSucceeds() throws Exception { try (final ByteBufferSharing outputSharing = nioSslEngine.getOutputBufferVendorForTestingOnly().open()) { @@ -205,6 +206,7 @@ public class NioSslEngineTest { // buffer TestSSLEngine testEngine = new TestSSLEngine(); testEngine.addReturnResult( + new SSLEngineResult(BUFFER_OVERFLOW, NEED_TASK, 0, 0), new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining())); spyNioSslEngine.engine = testEngine; @@ -216,12 +218,12 @@ public class NioSslEngineTest { appData.flip(); assertThat(wrappedBuffer).isEqualTo(appData); } - verify(spyNioSslEngine, times(1)).handleBlockingTasks(); + verify(spyNioSslEngine, times(2)).handleBlockingTasks(); } } @Test - public void wrapFails() throws IOException { + public void engineWrapCausesResizeThenCloses() throws IOException { try (final ByteBufferSharing outputSharing = nioSslEngine.getOutputBufferVendorForTestingOnly().open()) { // make the application data too big to fit into the engine's encryption buffer @@ -236,10 +238,12 @@ public class NioSslEngineTest { // buffer TestSSLEngine testEngine = new TestSSLEngine(); testEngine.addReturnResult( + new SSLEngineResult(BUFFER_OVERFLOW, NEED_TASK, 0, 0), new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining())); spyNioSslEngine.engine = testEngine; - assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class) + assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)) + .isInstanceOf(SSLException.class) .hasMessageContaining("Error encrypting data"); } } @@ -367,9 +371,10 @@ public class NioSslEngineTest { SocketChannel mockChannel = mock(SocketChannel.class); Socket mockSocket = mock(Socket.class); when(mockChannel.socket()).thenReturn(mockSocket); + when(mockChannel.isBlocking()).thenReturn(true); when(mockSocket.isClosed()).thenReturn(false); - when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE); + when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE).thenReturn(Boolean.TRUE); when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn( new SSLEngineResult(CLOSED, FINISHED, 0, 0)); nioSslEngine.close(mockChannel); @@ -383,13 +388,14 @@ public class NioSslEngineTest { } @Test - public void closeWhenUnwrapError() throws Exception { + public void closeWhenWrapError() throws Exception { SocketChannel mockChannel = mock(SocketChannel.class); Socket mockSocket = mock(Socket.class); when(mockChannel.socket()).thenReturn(mockSocket); + when(mockChannel.isBlocking()).thenReturn(true); when(mockSocket.isClosed()).thenReturn(true); - when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE); + when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE).thenReturn(Boolean.TRUE); when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn( new SSLEngineResult(BUFFER_OVERFLOW, FINISHED, 0, 0)); assertThatThrownBy(() -> nioSslEngine.close(mockChannel)).isInstanceOf(GemFireIOException.class) @@ -403,6 +409,7 @@ public class NioSslEngineTest { Socket mockSocket = mock(Socket.class); when(mockChannel.socket()).thenReturn(mockSocket); when(mockSocket.isClosed()).thenReturn(true); + when(mockChannel.isBlocking()).thenReturn(true); when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE); when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> { @@ -411,7 +418,7 @@ public class NioSslEngineTest { // give the NioSslEngine something to write on its socket channel, simulating a TLS close // message outputSharing.getBuffer().put("Goodbye cruel world".getBytes()); - return new SSLEngineResult(CLOSED, FINISHED, 0, 0); + return new SSLEngineResult(OK, NEED_UNWRAP, 0, 0); } }); when(mockChannel.write(any(ByteBuffer.class))).thenThrow(new ClosedChannelException()); @@ -595,10 +602,19 @@ public class NioSslEngineTest { @Override public SSLEngineResult wrap(ByteBuffer[] sources, int i, int i1, ByteBuffer destination) { - for (ByteBuffer source : sources) { + assertThat(sources.length) + .as("test unexpectedly tried to wrap with multiple sources") + .isEqualTo(1); + final ByteBuffer source = sources[0]; + final SSLEngineResult nextResult = nextResult(); + try { destination.put(source); + } catch (final BufferOverflowException e) { + assertThat(BUFFER_OVERFLOW) + .as("got unexpected buffer overflow") + .isEqualTo(nextResult.getStatus()); } - return nextResult(); + return nextResult; } @Override @@ -662,7 +678,9 @@ public class NioSslEngineTest { @Override public SSLSession getSession() { - return null; + final SSLSession session = mock(SSLSession.class); + when(session.getPacketBufferSize()).thenReturn(16 * 1024); + return session; } @Override