This is an automated email from the ASF dual-hosted git repository. burcham pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 08e9e96 GEODE-8652: NioSslEngine.close() Bypasses Locks (#5666) 08e9e96 is described below commit 08e9e9673d0ed05555a3d74c6d16e706817cab09 Author: Bill Burcham <bill.burc...@gmail.com> AuthorDate: Thu Oct 29 16:38:25 2020 -0700 GEODE-8652: NioSslEngine.close() Bypasses Locks (#5666) - NioSslEngine.close() proceeds even if readers (or writers) are operating on its ByteBuffers, allowing Connection.close() to close its socket and proceed. - NioSslEngine.close() needed a lock only on the output buffer, so we split what was a single lock into two. Also instead of using synchronized we use a ReentrantLock so we can call tryLock() and time out if needed in NioSslEngine.close(). - Since readers/writers may hold locks on these input/output buffers when NioSslEngine.close() is called a reference count is maintained and the buffers are returned to the pool only when the last user is done. - To manage the locking and reference counting a new AutoCloseable ByteBufferSharing interface is introduced with a trivial implementation: ByteBufferSharingNoOp and a real implementation: ByteBufferSharingImpl. Co-authored-by: Bill Burcham <bill.burc...@gmail.com> Co-authored-by: Darrel Schneider <dschnei...@pivotal.io> Co-authored-by: Ernie Burghardt <burghar...@vmware.com> --- .../tcp/ConnectionCloseSSLTLSDUnitTest.java | 238 +++++++++++++ .../org/apache/geode/internal/tcp/server.keystore | Bin 0 -> 1256 bytes ...LSocketHostNameVerificationIntegrationTest.java | 4 +- .../internal/net/SSLSocketIntegrationTest.java | 57 +-- .../apache/geode/codeAnalysis/excludedClasses.txt | 1 + .../geode/internal/net/ByteBufferSharing.java | 55 +++ .../geode/internal/net/ByteBufferSharingImpl.java | 148 ++++++++ .../geode/internal/net/ByteBufferSharingNoOp.java | 52 +++ .../org/apache/geode/internal/net/NioFilter.java | 69 ++-- .../apache/geode/internal/net/NioPlainEngine.java | 27 +- .../apache/geode/internal/net/NioSslEngine.java | 353 ++++++++++--------- .../org/apache/geode/internal/tcp/Connection.java | 34 +- .../org/apache/geode/internal/tcp/MsgReader.java | 15 +- .../internal/net/ByteBufferSharingImplTest.java | 163 +++++++++ .../geode/internal/net/NioPlainEngineTest.java | 47 ++- .../geode/internal/net/NioSslEngineTest.java | 392 +++++++++++---------- 16 files changed, 1195 insertions(+), 460 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java new file mode 100644 index 0000000..77fe9bf --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.tcp; + +import static org.apache.geode.distributed.ConfigurationProperties.CONSERVE_SOCKETS; +import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION; +import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; +import static org.apache.geode.distributed.ConfigurationProperties.NAME; +import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_BUFFER_SIZE; +import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_LEASE_TIME; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_PROTOCOLS; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD; +import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION; +import static org.apache.geode.test.dunit.VM.getVM; +import static org.apache.geode.test.util.ResourceUtils.createTempFileFromResource; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Fail.fail; + +import java.io.File; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Properties; +import java.util.concurrent.TimeoutException; + +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.distributed.DistributedSystemDisconnectedException; +import org.apache.geode.distributed.Locator; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave; +import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage; +import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.DistributedBlackboard; +import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; +import org.apache.geode.test.dunit.rules.DistributedRule; + +/** + * It would be nice if this test didn't need to use the cache since the test's purpose is to test + * that the {@link Connection} class can be closed while readers and writers hold locks on its + * internal TLS {@link ByteBuffer}s + * + * But this test does use the cache (region) because it enabled us to use existing cache messaging + * and to use the DistributionMessageObserver (observer) hooks. + * + * see also ClusterCommunicationsDUnitTest + */ +public class ConnectionCloseSSLTLSDUnitTest implements Serializable { + + private static final int SMALL_BUFFER_SIZE = 8000; + private static final String UPDATE_ENTERED_GATE = "connectionCloseDUnitTest.regionUpdateEntered"; + private static final String SUSPEND_UPDATE_GATE = "connectionCloseDUnitTest.suspendRegionUpdate"; + private static final String regionName = "connectionCloseDUnitTestRegion"; + private static final Logger logger = LogService.getLogger(); + + private static Cache cache; + + @Rule + public DistributedRule distributedRule = + DistributedRule.builder().withVMCount(3).build(); + + @Rule + public DistributedBlackboard blackboard = new DistributedBlackboard(); + + @Rule + public DistributedRestoreSystemProperties restoreSystemProperties = + new DistributedRestoreSystemProperties(); + + private VM locator; + private VM sender; + private VM receiver; + + @Before + public void before() { + locator = getVM(0); + sender = getVM(1); + receiver = getVM(2); + } + + @After + public void after() { + receiver.invoke(() -> { + DistributionMessageObserver.setInstance(null); + }); + } + + @Test + public void connectionWithHungReaderIsCloseableAndUnhangsReader() + throws InterruptedException, TimeoutException { + + blackboard.clearGate(UPDATE_ENTERED_GATE); + blackboard.clearGate(SUSPEND_UPDATE_GATE); + + final int locatorPort = createLocator(locator); + createCacheAndRegion(sender, locatorPort); + createCacheAndRegion(receiver, locatorPort); + + receiver + .invoke("set up DistributionMessageObserver to 'hang' sender's put (on receiver)", + () -> { + final DistributionMessageObserver observer = + new DistributionMessageObserver() { + + @Override + public void beforeProcessMessage(final ClusterDistributionManager dm, + final DistributionMessage message) { + guardMessageProcessingHook(message, () -> { + try { + blackboard.signalGate(UPDATE_ENTERED_GATE); + blackboard.waitForGate(SUSPEND_UPDATE_GATE); + } catch (TimeoutException | InterruptedException e) { + fail("message observus interruptus"); + } + logger.info("BGB: got before process message: " + message); + }); + } + }; + DistributionMessageObserver.setInstance(observer); + }); + + final AsyncInvocation<Object> putInvocation = sender.invokeAsync("try a put", () -> { + final Region<Object, Object> region = cache.getRegion(regionName); + // test is going to close the cache while we are waiting for our ack + assertThatThrownBy(() -> { + region.put("hello", "world"); + }).isInstanceOf(DistributedSystemDisconnectedException.class); + }); + + // wait until our message observer is blocked + blackboard.waitForGate(UPDATE_ENTERED_GATE); + + // at this point our put() is blocked waiting for a direct ack + assertThat(putInvocation.isAlive()).as("put is waiting for remote region to ack").isTrue(); + + /* + * Now close the cache. The point of calling it is to test that we don't block while trying + * to close connections. Cache.close() calls DistributedSystem.disconnect() which in turn + * closes all the connections (and their sockets.) We want the sockets to close because that'll + * cause our hung put() to see a DistributedSystemDisconnectedException. + */ + sender.invoke("", () -> cache.close()); + + // wait for put task to complete: with an exception, that is! + putInvocation.get(); + + // un-stick our message observer + blackboard.signalGate(SUSPEND_UPDATE_GATE); + } + + private void guardMessageProcessingHook(final DistributionMessage message, + final Runnable runnable) { + if (message instanceof UpdateMessage) { + final UpdateMessage updateMessage = (UpdateMessage) message; + if (updateMessage.getRegionPath().equals("/" + regionName)) { + runnable.run(); + } + } + } + + private int createLocator(VM memberVM) { + return memberVM.invoke("create locator", () -> { + // if you need to debug SSL communications use this property: + // System.setProperty("javax.net.debug", "all"); + System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true"); + return Locator.startLocatorAndDS(0, new File(""), getDistributedSystemProperties()) + .getPort(); + }); + } + + private void createCacheAndRegion(VM memberVM, int locatorPort) { + memberVM.invoke("start cache and create region", () -> { + cache = createCache(locatorPort); + cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName); + }); + } + + private Cache createCache(int locatorPort) { + // if you need to debug SSL communications use this property: + // System.setProperty("javax.net.debug", "all"); + Properties properties = getDistributedSystemProperties(); + properties.setProperty(LOCATORS, "localhost[" + locatorPort + "]"); + return new CacheFactory(properties).create(); + } + + private Properties getDistributedSystemProperties() { + Properties properties = new Properties(); + properties.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false"); + properties.setProperty(USE_CLUSTER_CONFIGURATION, "false"); + properties.setProperty(NAME, "vm" + VM.getCurrentVMNum()); + properties.setProperty(CONSERVE_SOCKETS, "false"); // we are testing direct ack + properties.setProperty(SOCKET_LEASE_TIME, "10000"); + properties.setProperty(SOCKET_BUFFER_SIZE, "" + SMALL_BUFFER_SIZE); + + properties.setProperty(SSL_ENABLED_COMPONENTS, "cluster,locator"); + properties + .setProperty(SSL_KEYSTORE, createTempFileFromResource(getClass(), "server.keystore") + .getAbsolutePath()); + properties.setProperty(SSL_TRUSTSTORE, + createTempFileFromResource(getClass(), "server.keystore") + .getAbsolutePath()); + properties.setProperty(SSL_PROTOCOLS, "TLSv1.2"); + properties.setProperty(SSL_KEYSTORE_PASSWORD, "password"); + properties.setProperty(SSL_TRUSTSTORE_PASSWORD, "password"); + properties.setProperty(SSL_REQUIRE_AUTHENTICATION, "true"); + return properties; + } + +} diff --git a/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore b/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore new file mode 100644 index 0000000..8b5305f Binary files /dev/null and b/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore differ diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java index dc7df44..a70f3b1 100755 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java @@ -215,7 +215,9 @@ public class SSLSocketHostNameVerificationIntegrationTest { final NioSslEngine nioSslEngine = engine; engine.close(socket.getChannel()); assertThatThrownBy(() -> { - nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0])); + try (final ByteBufferSharing unused = + nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]))) { + } }) .isInstanceOf(IOException.class); } diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java index 19eab4f..add6b9a 100755 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java @@ -256,11 +256,13 @@ public class SSLSocketIntegrationTest { ByteBuffer buffer = bbos.getContentBuffer(); System.out.println( "client buffer position is " + buffer.position() + " and limit is " + buffer.limit()); - ByteBuffer wrappedBuffer = engine.wrap(buffer); - System.out.println("client wrapped buffer position is " + wrappedBuffer.position() - + " and limit is " + wrappedBuffer.limit()); - int bytesWritten = clientChannel.write(wrappedBuffer); - System.out.println("client bytes written is " + bytesWritten); + try (final ByteBufferSharing outputSharing = engine.wrap(buffer)) { + ByteBuffer wrappedBuffer = outputSharing.getBuffer(); + System.out.println("client wrapped buffer position is " + wrappedBuffer.position() + + " and limit is " + wrappedBuffer.limit()); + int bytesWritten = clientChannel.write(wrappedBuffer); + System.out.println("client bytes written is " + bytesWritten); + } } private Thread startServerNIO(final ServerSocket serverSocket, int timeoutMillis) @@ -299,7 +301,9 @@ public class SSLSocketIntegrationTest { final NioSslEngine nioSslEngine = engine; engine.close(socket.getChannel()); assertThatThrownBy(() -> { - nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0])); + try (final ByteBufferSharing unused = + nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]))) { + } }) .isInstanceOf(IOException.class); } @@ -313,24 +317,35 @@ public class SSLSocketIntegrationTest { private void readMessageFromNIOSSLClient(Socket socket, ByteBuffer buffer, NioSslEngine engine) throws IOException { - ByteBuffer unwrapped = engine.getUnwrappedBuffer(buffer); - // if we already have unencrypted data skip unwrapping - if (unwrapped.position() == 0) { - int bytesRead; - // if we already have encrypted data skip reading from the socket - if (buffer.position() == 0) { - bytesRead = socket.getChannel().read(buffer); - buffer.flip(); + try (final ByteBufferSharing sharedBuffer = engine.getUnwrappedBuffer()) { + final ByteBuffer unwrapped = sharedBuffer.getBuffer(); + // if we already have unencrypted data skip unwrapping + if (unwrapped.position() == 0) { + int bytesRead; + // if we already have encrypted data skip reading from the socket + if (buffer.position() == 0) { + bytesRead = socket.getChannel().read(buffer); + buffer.flip(); + } else { + bytesRead = buffer.remaining(); + } + System.out.println("server bytes read is " + bytesRead + ": buffer position is " + + buffer.position() + " and limit is " + buffer.limit()); + try (final ByteBufferSharing sharedBuffer2 = engine.unwrap(buffer)) { + final ByteBuffer unwrapped2 = sharedBuffer2.getBuffer(); + + unwrapped2.flip(); + System.out.println("server unwrapped buffer position is " + unwrapped2.position() + + " and limit is " + unwrapped2.limit()); + finishReadMessageFromNIOSSLClient(unwrapped2); + } } else { - bytesRead = buffer.remaining(); + finishReadMessageFromNIOSSLClient(unwrapped); } - System.out.println("server bytes read is " + bytesRead + ": buffer position is " - + buffer.position() + " and limit is " + buffer.limit()); - unwrapped = engine.unwrap(buffer); - unwrapped.flip(); - System.out.println("server unwrapped buffer position is " + unwrapped.position() - + " and limit is " + unwrapped.limit()); } + } + + private void finishReadMessageFromNIOSSLClient(final ByteBuffer unwrapped) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream(unwrapped); DataInputStream dis = new DataInputStream(bbis); String welcome = dis.readUTF(); diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt index a46d5fc..33f43c3 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt @@ -104,3 +104,4 @@ org/apache/geode/cache/query/internal/xml/ElementType org/apache/geode/cache/query/internal/xml/ElementType$1 org/apache/geode/cache/query/internal/xml/ElementType$2 org/apache/geode/cache/query/internal/xml/ElementType$3 +org/apache/geode/internal/net/ByteBufferSharingImpl$OpenAttemptTimedOut \ No newline at end of file diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java new file mode 100644 index 0000000..cdfa897 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.net; + +import java.io.IOException; +import java.nio.ByteBuffer; + + +/** + * When a {@link ByteBufferSharing} is acquired in a try-with-resources the buffer is available (for + * reading and modification) within the scope of that try block. + * + * Releases managed ByteBuffer back to pool after last reference is dropped. + */ +public interface ByteBufferSharing extends AutoCloseable { + + /** + * Call this method only within a try-with-resource in which this {@link ByteBufferSharing} was + * acquired. Retain the reference only within the scope of that try-with-resources. + * + * @return the buffer: manipulable only within the scope of the try-with-resources + * @throws IOException if the buffer is no longer accessible + */ + ByteBuffer getBuffer() throws IOException; + + /** + * Expand the buffer if needed. This may return a different object so be sure to pay attention to + * the return value if you need access to the potentially- expanded buffer. + * + * Subsequent calls to {@link #getBuffer()} will return that new buffer too. + * + * @return the same buffer or a different (bigger) buffer + * @throws IOException if the buffer is no longer accessible + */ + ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException; + + /** + * Override {@link AutoCloseable#close()} without throws clause since we don't need one. + */ + @Override + void close(); +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java new file mode 100644 index 0000000..e9a941e --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.net; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.geode.annotations.VisibleForTesting; +import org.apache.geode.internal.net.BufferPool.BufferType; + +/** + * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a + * {@link ByteBuffer}) is available (for reading and modification) in the scope of the + * try-with-resources. + */ +class ByteBufferSharingImpl implements ByteBufferSharing { + + static class OpenAttemptTimedOut extends Exception { + } + + private final Lock lock; + private final AtomicBoolean isClosed; + // mutable because in general our ByteBuffer may need to be resized (grown or compacted) + private ByteBuffer buffer; + private final BufferType bufferType; + private final AtomicInteger counter; + private final BufferPool bufferPool; + + /** + * This constructor is for use only by the owner of the shared resource (a {@link ByteBuffer}). + * + * A resource owner must invoke {@link #open()} once for each reference that escapes (is passed + * to an external object or is returned to an external caller.) + * + * This constructor acquires no lock. The reference count will be 1 after this constructor + * completes. + */ + ByteBufferSharingImpl(final ByteBuffer buffer, final BufferType bufferType, + final BufferPool bufferPool) { + this.buffer = buffer; + this.bufferType = bufferType; + this.bufferPool = bufferPool; + lock = new ReentrantLock(); + counter = new AtomicInteger(1); + isClosed = new AtomicBoolean(false); + } + + /** + * The destructor. Called by the resource owner to undo the work of the constructor. + */ + void destruct() { + if (isClosed.compareAndSet(false, true)) { + dropReference(); + } + } + + /** + * This method is for use only by the owner of the shared resource. It's used for handing out + * references to the shared resource. So it does reference counting and also acquires a lock. + * + * Resource owners call this method as the last thing before returning a reference to the caller. + * That caller binds that reference to a variable in a try-with-resources statement and relies on + * the AutoCloseable protocol to invoke {@link #close()} on the object at the end of the block. + */ + ByteBufferSharing open() { + lock.lock(); + addReference(); + return this; + } + + /** + * This variant throws {@link OpenAttemptTimedOut} if it can't acquire the lock in time. + */ + ByteBufferSharing open(final long time, final TimeUnit unit) throws OpenAttemptTimedOut { + try { + if (!lock.tryLock(time, unit)) { + throw new OpenAttemptTimedOut(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new OpenAttemptTimedOut(); + } + addReference(); + return this; + } + + @Override + public ByteBuffer getBuffer() throws IOException { + if (isClosed.get()) { + throw new IOException("NioSslEngine has been closed"); + } else { + return buffer; + } + } + + @Override + public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException { + return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity); + } + + @Override + public void close() { + /* + * We are counting on our ReentrantLock throwing an exception if the current thread + * does not hold the lock. In that case dropReference() will not be called. This + * prevents ill-behaved clients (clients that call close() too many times) from + * corrupting our reference count. + */ + lock.unlock(); + dropReference(); + } + + private int addReference() { + return counter.incrementAndGet(); + } + + private int dropReference() { + final int usages = counter.decrementAndGet(); + if (usages == 0) { + bufferPool.releaseBuffer(bufferType, buffer); + } + return usages; + } + + @VisibleForTesting + public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) { + buffer = newBufferForTesting; + } + +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java new file mode 100644 index 0000000..bd707e3 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.net; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a + * {@link ByteBuffer}) is available (for reading and modification) in the scope of the + * try-with-resources. + * + * This implementation is a "no-op". It performs no actual locking and no reference counting. It's + * meant for use with the {@link NioPlainEngine} only, since that engine keeps no buffers and so, + * needs no reference counting on buffers, nor any synchronization around access to buffers. + * + * See also {@link ByteBufferSharingImpl} + */ +class ByteBufferSharingNoOp implements ByteBufferSharing { + + private final ByteBuffer buffer; + + ByteBufferSharingNoOp(final ByteBuffer buffer) { + this.buffer = buffer; + } + + @Override + public ByteBuffer getBuffer() { + return buffer; + } + + @Override + public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException { + throw new UnsupportedOperationException("Can't expand buffer when using NioPlainEngine"); + } + + @Override + public void close() {} +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java index 9c437ad..eb53f0e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java @@ -19,47 +19,53 @@ import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; /** - * Prior to transmitting a buffer or processing a received buffer - * a NioFilter should be called to wrap (transmit) or unwrap (received) - * the buffer in case SSL is being used.<br> - * Implementations of this class may not be thread-safe in regard to - * the buffers their methods return. These may be internal state that, - * if used concurrently by multiple threads could cause corruption. - * Appropriate external synchronization must be used in order to provide - * thread-safety. Do this by invoking getSynchObject() and synchronizing on - * the returned object while using the buffer. + * Prior to transmitting a buffer or processing a received buffer a NioFilter should be called to + * wrap (transmit) or unwrap (received) the buffer in case SSL is being used.<br> + * Implementations of + * this class may not be thread-safe in regard to the buffers their methods return. These may be + * internal state that, if used concurrently by multiple threads could cause corruption. Appropriate + * external synchronization must be used in order to provide thread-safety. Do this by invoking + * getSynchObject() and synchronizing on the returned object while using the buffer. */ public interface NioFilter { /** * wrap bytes for transmission to another process + * + * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is + * to call this method in a try-with-resources statement. */ - ByteBuffer wrap(ByteBuffer buffer) throws IOException; + ByteBufferSharing wrap(ByteBuffer buffer) throws IOException; /** - * unwrap bytes received from another process. The unwrapped - * buffer should be flipped before reading. When done reading invoke - * doneReading() to reset for future read ops + * unwrap bytes received from another process. The unwrapped buffer should be flipped before + * reading. When done reading invoke doneReading() to reset for future read ops + * + * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is + * to call this method in a try-with-resources statement. */ - ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException; + ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException; /** - * ensure that the wrapped buffer has enough room to read the given amount of data. - * This must be invoked before readAtLeast. A new buffer may be returned by this method. + * ensure that the wrapped buffer has enough room to read the given amount of data. This must be + * invoked before readAtLeast. A new buffer may be returned by this method. */ ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer, BufferPool.BufferType bufferType); /** - * read at least the indicated amount of bytes from the given - * socket. The buffer position will be ready for reading - * the data when this method returns. Note: you must invoke ensureWrappedCapacity - * with the given amount prior to each invocation of this method. + * read at least the indicated amount of bytes from the given socket. The buffer position will be + * ready for reading the data when this method returns. Note: you must invoke + * ensureWrappedCapacity with the given amount prior to each invocation of this method. * <br> * wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br> - * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.) + * unwrappedBuffer + * = filter.readAtLeast(channel, amount, wrappedBuffer, etc.) + * + * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is + * to call this method in a try-with-resources statement. */ - ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer) + ByteBufferSharing readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer) throws IOException; /** @@ -81,28 +87,19 @@ public interface NioFilter { } } - default boolean isClosed() { - return false; - } - /** * invoke this method when you are done using the NioFilter - * */ default void close(SocketChannel socketChannel) { // nothing by default } /** - * returns the unwrapped byte buffer associated with the given wrapped buffer. + * Returns the sharing object for the {@link NioFilter}'s unwrapped buffer, if one exists. + * + * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is + * to call this method in a try-with-resources statement. */ - ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer); + ByteBufferSharing getUnwrappedBuffer(); - /** - * returns an object to be used in synchronizing on the use of buffers returned by - * a NioFilter. - */ - default Object getSynchObject() { - return this; - } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java index 3ebce38..8b5df96 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; +import org.apache.geode.annotations.internal.MakeImmutable; import org.apache.geode.internal.Assert; /** @@ -27,6 +28,12 @@ import org.apache.geode.internal.Assert; * secure communications. */ public class NioPlainEngine implements NioFilter { + + // this variable requires the MakeImmutable annotation but the buffer is empty and + // not really modifiable + @MakeImmutable + private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); + private final BufferPool bufferPool; int lastReadPosition; @@ -38,14 +45,14 @@ public class NioPlainEngine implements NioFilter { } @Override - public ByteBuffer wrap(ByteBuffer buffer) { - return buffer; + public ByteBufferSharing wrap(ByteBuffer buffer) { + return shareBuffer(buffer); } @Override - public ByteBuffer unwrap(ByteBuffer wrappedBuffer) { + public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) { wrappedBuffer.position(wrappedBuffer.limit()); - return wrappedBuffer; + return shareBuffer(wrappedBuffer); } @Override @@ -82,7 +89,7 @@ public class NioPlainEngine implements NioFilter { } @Override - public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer) + public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer) throws IOException { ByteBuffer buffer = wrappedBuffer; @@ -108,7 +115,7 @@ public class NioPlainEngine implements NioFilter { buffer.position(lastProcessedPosition); lastProcessedPosition += bytes; - return buffer; + return shareBuffer(buffer); } public void doneReading(ByteBuffer unwrappedBuffer) { @@ -121,8 +128,12 @@ public class NioPlainEngine implements NioFilter { } @Override - public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) { - return wrappedBuffer; + public ByteBufferSharing getUnwrappedBuffer() { + return shareBuffer(EMPTY_BUFFER); + } + + private ByteBufferSharingNoOp shareBuffer(final ByteBuffer wrappedBuffer) { + return new ByteBufferSharingNoOp(wrappedBuffer); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java index 6f32501..7e642ce 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java @@ -40,24 +40,19 @@ import javax.net.ssl.SSLSession; import org.apache.logging.log4j.Logger; import org.apache.geode.GemFireIOException; -import org.apache.geode.annotations.internal.MakeImmutable; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.internal.net.BufferPool.BufferType; +import org.apache.geode.internal.net.ByteBufferSharingImpl.OpenAttemptTimedOut; import org.apache.geode.logging.internal.log4j.api.LogService; /** - * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This class is not thread - * safe. Its use should be confined to one thread or should be protected by external - * synchronization. + * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This class is not thread safe. + * Its use should be confined to one thread or should be protected by external synchronization. */ public class NioSslEngine implements NioFilter { private static final Logger logger = LogService.getLogger(); - // this variable requires the MakeImmutable annotation but the buffer is empty and - // not really modifiable - @MakeImmutable - private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); - private final BufferPool bufferPool; private boolean closed; @@ -65,23 +60,28 @@ public class NioSslEngine implements NioFilter { SSLEngine engine; /** - * myNetData holds bytes wrapped by the SSLEngine + * holds bytes wrapped by the SSLEngine; a.k.a. myNetData */ - ByteBuffer myNetData; + private final ByteBufferSharingImpl outputSharing; /** - * peerAppData holds the last unwrapped data from a peer + * holds the last unwrapped data from a peer; a.k.a. peerAppData */ - ByteBuffer peerAppData; + private final ByteBufferSharingImpl inputSharing; NioSslEngine(SSLEngine engine, BufferPool bufferPool) { SSLSession session = engine.getSession(); int appBufferSize = session.getApplicationBufferSize(); int packetBufferSize = engine.getSession().getPacketBufferSize(); + closed = false; this.engine = engine; this.bufferPool = bufferPool; - this.myNetData = bufferPool.acquireDirectSenderBuffer(packetBufferSize); - this.peerAppData = bufferPool.acquireNonDirectReceiveBuffer(appBufferSize); + outputSharing = + new ByteBufferSharingImpl(bufferPool.acquireDirectSenderBuffer(packetBufferSize), + TRACKED_SENDER, bufferPool); + inputSharing = + new ByteBufferSharingImpl(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize), + TRACKED_RECEIVER, bufferPool); } /** @@ -135,57 +135,65 @@ public class NioSslEngine implements NioFilter { switch (status) { case NEED_UNWRAP: - // Receive handshaking data from peer - int dataRead = socketChannel.read(handshakeBuffer); - - // Process incoming handshaking data - handshakeBuffer.flip(); - engineResult = engine.unwrap(handshakeBuffer, peerAppData); - handshakeBuffer.compact(); - status = engineResult.getHandshakeStatus(); - - // if we're not finished, there's nothing to process and no data was read let's hang out - // for a little - if (peerAppData.remaining() == 0 && dataRead == 0 && status == NEED_UNWRAP) { - Thread.sleep(10); - } + try (final ByteBufferSharing inputSharing = shareInputBuffer()) { + final ByteBuffer peerAppData = inputSharing.getBuffer(); + + // Receive handshaking data from peer + int dataRead = socketChannel.read(handshakeBuffer); + + // Process incoming handshaking data + handshakeBuffer.flip(); + + + engineResult = engine.unwrap(handshakeBuffer, peerAppData); + handshakeBuffer.compact(); + status = engineResult.getHandshakeStatus(); + + // if we're not finished, there's nothing to process and no data was read let's hang out + // for a little + if (peerAppData.remaining() == 0 && dataRead == 0 && status == NEED_UNWRAP) { + Thread.sleep(10); + } - if (engineResult.getStatus() == BUFFER_OVERFLOW) { - peerAppData = - expandWriteBuffer(TRACKED_RECEIVER, peerAppData, peerAppData.capacity() * 2); + if (engineResult.getStatus() == BUFFER_OVERFLOW) { + inputSharing.expandWriteBufferIfNeeded(peerAppData.capacity() * 2); + } + break; } - break; case NEED_WRAP: - // Empty the local network packet buffer. - myNetData.clear(); - - // Generate handshaking data - engineResult = engine.wrap(myAppData, myNetData); - status = engineResult.getHandshakeStatus(); - - // Check status - switch (engineResult.getStatus()) { - case BUFFER_OVERFLOW: - myNetData = - expandWriteBuffer(TRACKED_SENDER, myNetData, - myNetData.capacity() * 2); - break; - case OK: - myNetData.flip(); - // Send the handshaking data to peer - while (myNetData.hasRemaining()) { - socketChannel.write(myNetData); - } - break; - case CLOSED: - break; - default: - logger.info("handshake terminated with illegal state due to {}", status); - throw new IllegalStateException( - "Unknown SSLEngineResult status: " + engineResult.getStatus()); + try (final ByteBufferSharing outputSharing = shareOutputBuffer()) { + final ByteBuffer myNetData = outputSharing.getBuffer(); + + // Empty the local network packet buffer. + myNetData.clear(); + + // Generate handshaking data + engineResult = engine.wrap(myAppData, myNetData); + status = engineResult.getHandshakeStatus(); + + // Check status + switch (engineResult.getStatus()) { + case BUFFER_OVERFLOW: + // no need to assign return value because we will never reference it + outputSharing.expandWriteBufferIfNeeded(myNetData.capacity() * 2); + break; + case OK: + myNetData.flip(); + // Send the handshaking data to peer + while (myNetData.hasRemaining()) { + socketChannel.write(myNetData); + } + break; + case CLOSED: + break; + default: + logger.info("handshake terminated with illegal state due to {}", status); + throw new IllegalStateException( + "Unknown SSLEngineResult status: " + engineResult.getStatus()); + } + break; } - break; case NEED_TASK: // Handle blocking tasks handleBlockingTasks(); @@ -213,17 +221,6 @@ public class NioSslEngine implements NioFilter { return true; } - ByteBuffer expandWriteBuffer(BufferType type, ByteBuffer existing, - int desiredCapacity) { - return bufferPool.expandWriteBufferIfNeeded(type, existing, desiredCapacity); - } - - synchronized void checkClosed() throws IOException { - if (closed) { - throw new IOException("NioSslEngine has been closed"); - } - } - void handleBlockingTasks() { Runnable task; while ((task = engine.getDelegatedTask()) != null) { @@ -233,72 +230,77 @@ public class NioSslEngine implements NioFilter { } @Override - public synchronized ByteBuffer wrap(ByteBuffer appData) throws IOException { - checkClosed(); + public ByteBufferSharing wrap(ByteBuffer appData) throws IOException { + try (final ByteBufferSharing outputSharing = shareOutputBuffer()) { - myNetData.clear(); + ByteBuffer myNetData = outputSharing.getBuffer(); - while (appData.hasRemaining()) { - // ensure we have lots of capacity since encrypted data might - // be larger than the app data - int remaining = myNetData.capacity() - myNetData.position(); + myNetData.clear(); - if (remaining < (appData.remaining() * 2)) { - int newCapacity = expandedCapacity(appData, myNetData); - myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, newCapacity); - } + while (appData.hasRemaining()) { + // ensure we have lots of capacity since encrypted data might + // be larger than the app data + int remaining = myNetData.capacity() - myNetData.position(); - SSLEngineResult wrapResult = engine.wrap(appData, myNetData); + if (remaining < (appData.remaining() * 2)) { + int newCapacity = expandedCapacity(appData, myNetData); + myNetData = outputSharing.expandWriteBufferIfNeeded(newCapacity); + } - if (wrapResult.getHandshakeStatus() == NEED_TASK) { - handleBlockingTasks(); - } + SSLEngineResult wrapResult = engine.wrap(appData, myNetData); - if (wrapResult.getStatus() != OK) { - throw new SSLException("Error encrypting data: " + wrapResult); + if (wrapResult.getHandshakeStatus() == NEED_TASK) { + handleBlockingTasks(); + } + + if (wrapResult.getStatus() != OK) { + throw new SSLException("Error encrypting data: " + wrapResult); + } } - } - myNetData.flip(); + myNetData.flip(); - return myNetData; + return shareOutputBuffer(); + } } @Override - public synchronized ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException { - checkClosed(); - - // note that we do not clear peerAppData as it may hold a partial - // message. TcpConduit, for instance, uses message chunking to - // transmit large payloads and we may have read a partial chunk - // during the previous unwrap - - peerAppData.limit(peerAppData.capacity()); - while (wrappedBuffer.hasRemaining()) { - SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData); - switch (unwrapResult.getStatus()) { - case BUFFER_OVERFLOW: - // buffer overflow expand and try again - double the available decryption space - int newCapacity = - (peerAppData.capacity() - peerAppData.position()) * 2 + peerAppData.position(); - newCapacity = Math.max(newCapacity, peerAppData.capacity() / 2 * 3); - peerAppData = - bufferPool.expandWriteBufferIfNeeded(TRACKED_RECEIVER, peerAppData, newCapacity); - peerAppData.limit(peerAppData.capacity()); - break; - case BUFFER_UNDERFLOW: - // partial data - need to read more. When this happens the SSLEngine will not have - // changed the buffer position - wrappedBuffer.compact(); - return peerAppData; - case OK: - break; - default: - throw new SSLException("Error decrypting data: " + unwrapResult); + public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException { + try (final ByteBufferSharing inputSharing = shareInputBuffer()) { + + ByteBuffer peerAppData = inputSharing.getBuffer(); + + // note that we do not clear peerAppData as it may hold a partial + // message. TcpConduit, for instance, uses message chunking to + // transmit large payloads and we may have read a partial chunk + // during the previous unwrap + + peerAppData.limit(peerAppData.capacity()); + while (wrappedBuffer.hasRemaining()) { + SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData); + switch (unwrapResult.getStatus()) { + case BUFFER_OVERFLOW: + // buffer overflow expand and try again - double the available decryption space + int newCapacity = + (peerAppData.capacity() - peerAppData.position()) * 2 + peerAppData.position(); + newCapacity = Math.max(newCapacity, peerAppData.capacity() / 2 * 3); + peerAppData = inputSharing.expandWriteBufferIfNeeded(newCapacity); + peerAppData.limit(peerAppData.capacity()); + break; + case BUFFER_UNDERFLOW: + // partial data - need to read more. When this happens the SSLEngine will not have + // changed the buffer position + wrappedBuffer.compact(); + return shareInputBuffer(); + case OK: + break; + default: + throw new SSLException("Error decrypting data: " + unwrapResult); + } } + wrappedBuffer.clear(); + return shareInputBuffer(); } - wrappedBuffer.clear(); - return peerAppData; } @Override @@ -315,50 +317,45 @@ public class NioSslEngine implements NioFilter { } @Override - public ByteBuffer readAtLeast(SocketChannel channel, int bytes, + public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer) throws IOException { - if (peerAppData.capacity() > bytes) { - // we already have a buffer that's big enough - if (peerAppData.capacity() - peerAppData.position() < bytes) { - peerAppData.compact(); - peerAppData.flip(); - } - } + try (final ByteBufferSharing inputSharing = shareInputBuffer()) { - while (peerAppData.remaining() < bytes) { - wrappedBuffer.limit(wrappedBuffer.capacity()); - int amountRead = channel.read(wrappedBuffer); - if (amountRead < 0) { - throw new EOFException(); + ByteBuffer peerAppData = inputSharing.getBuffer(); + + if (peerAppData.capacity() > bytes) { + // we already have a buffer that's big enough + if (peerAppData.capacity() - peerAppData.position() < bytes) { + peerAppData.compact(); + peerAppData.flip(); + } } - if (amountRead > 0) { - wrappedBuffer.flip(); - // prep the decoded buffer for writing - peerAppData.compact(); - peerAppData = unwrap(wrappedBuffer); - // done writing to the decoded buffer - prep it for reading again - peerAppData.flip(); + + while (peerAppData.remaining() < bytes) { + wrappedBuffer.limit(wrappedBuffer.capacity()); + int amountRead = channel.read(wrappedBuffer); + if (amountRead < 0) { + throw new EOFException(); + } + if (amountRead > 0) { + wrappedBuffer.flip(); + // prep the decoded buffer for writing + peerAppData.compact(); + try (final ByteBufferSharing inputSharing2 = unwrap(wrappedBuffer)) { + // done writing to the decoded buffer - prep it for reading again + final ByteBuffer peerAppDataNew = inputSharing2.getBuffer(); + peerAppDataNew.flip(); + peerAppData = peerAppDataNew; // loop needs new reference! + } + } } + return shareInputBuffer(); } - return peerAppData; } @Override - public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) { - return peerAppData; - } - - /** - * ensures that the unwrapped buffer associated with the given wrapped buffer has - * sufficient capacity for the given amount of bytes. This may compact the - * buffer or it may return a new buffer. - */ - public ByteBuffer ensureUnwrappedCapacity(int amount) { - // for TTLS the app-data buffers do not need to be tracked direct-buffers since we - // do not use them for I/O operations - peerAppData = - bufferPool.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, amount); - return peerAppData; + public ByteBufferSharing getUnwrappedBuffer() { + return shareInputBuffer(); } @Override @@ -369,16 +366,14 @@ public class NioSslEngine implements NioFilter { } @Override - public synchronized boolean isClosed() { - return closed; - } - - @Override public synchronized void close(SocketChannel socketChannel) { if (closed) { return; } - try { + closed = true; + inputSharing.destruct(); + try (final ByteBufferSharing outputSharing = shareOutputBuffer(1, TimeUnit.MINUTES)) { + final ByteBuffer myNetData = outputSharing.getBuffer(); if (!engine.isOutboundDone()) { ByteBuffer empty = ByteBuffer.wrap(new byte[0]); @@ -405,14 +400,13 @@ public class NioSslEngine implements NioFilter { // we can't send a close message if the channel is closed } catch (IOException e) { throw new GemFireIOException("exception closing SSL session", e); + } catch (final OpenAttemptTimedOut _unused) { + logger.info(String.format("Couldn't get output lock in time, eliding TLS close message")); + if (!engine.isOutboundDone()) { + engine.closeOutbound(); + } } finally { - ByteBuffer netData = myNetData; - ByteBuffer appData = peerAppData; - myNetData = null; - peerAppData = EMPTY_BUFFER; - bufferPool.releaseBuffer(TRACKED_SENDER, netData); - bufferPool.releaseBuffer(TRACKED_RECEIVER, appData); - this.closed = true; + outputSharing.destruct(); } } @@ -421,4 +415,17 @@ public class NioSslEngine implements NioFilter { targetBuffer.capacity() * 2); } + @VisibleForTesting + public ByteBufferSharing shareOutputBuffer() { + return outputSharing.open(); + } + + private ByteBufferSharing shareOutputBuffer(final long time, final TimeUnit unit) + throws OpenAttemptTimedOut { + return outputSharing.open(time, unit); + } + + public ByteBufferSharing shareInputBuffer() { + return inputSharing.open(); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 29d15e3..844ab11 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -78,6 +78,7 @@ import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.SystemTimer; import org.apache.geode.internal.SystemTimer.SystemTimerTask; import org.apache.geode.internal.net.BufferPool; +import org.apache.geode.internal.net.ByteBufferSharing; import org.apache.geode.internal.net.NioFilter; import org.apache.geode.internal.net.NioPlainEngine; import org.apache.geode.internal.net.SocketCreator; @@ -802,11 +803,12 @@ public class Connection implements Runnable { @VisibleForTesting void clearSSLInputBuffer() { if (getConduit().useSSL() && ioFilter != null) { - synchronized (ioFilter.getSynchObject()) { - if (!ioFilter.isClosed()) { - // clear out any remaining handshake bytes - ByteBuffer buffer = ioFilter.getUnwrappedBuffer(inputBuffer); - buffer.position(0).limit(0); + try (final ByteBufferSharing sharedBuffer = ioFilter.getUnwrappedBuffer()) { + // clear out any remaining handshake bytes + try { + sharedBuffer.getBuffer().position(0).limit(0); + } catch (IOException e) { + // means the NioFilter was already closed } } } @@ -2453,8 +2455,9 @@ public class Connection implements Runnable { long queueTimeoutTarget = now + asyncQueueTimeout; channel.configureBlocking(false); try { - synchronized (ioFilter.getSynchObject()) { - ByteBuffer wrappedBuffer = ioFilter.wrap(buffer); + try (final ByteBufferSharing outputSharing = ioFilter.wrap(buffer)) { + final ByteBuffer wrappedBuffer = outputSharing.getBuffer(); + int waitTime = 1; do { owner.getConduit().getCancelCriterion().checkCancelInProgress(null); @@ -2607,9 +2610,9 @@ public class Connection implements Runnable { } // fall through } - // synchronize on the ioFilter while using its network buffer - synchronized (ioFilter.getSynchObject()) { - ByteBuffer wrappedBuffer = ioFilter.wrap(buffer); + try (final ByteBufferSharing outputSharing = ioFilter.wrap(buffer)) { + final ByteBuffer wrappedBuffer = outputSharing.getBuffer(); + while (wrappedBuffer.remaining() > 0) { int amtWritten = 0; long start = stats.startSocketWrite(true); @@ -2661,10 +2664,12 @@ public class Connection implements Runnable { final KnownVersion version = getRemoteVersion(); try { msgReader = new MsgReader(this, ioFilter, version); + ReplyMessage msg; int len; - synchronized (ioFilter.getSynchObject()) { + // (we have to lock here to protect between reading header and message body) + try (final ByteBufferSharing _unused = ioFilter.getUnwrappedBuffer()) { Header header = msgReader.readHeader(); if (header.getMessageType() == NORMAL_MSG_TYPE) { @@ -2681,7 +2686,7 @@ public class Connection implements Runnable { releaseMsgDestreamer(header.getMessageId(), destreamer); len = destreamer.size(); } - } // sync + } // I'd really just like to call dispatchMessage here. However, // that call goes through a bunch of checks that knock about // 10% of the performance. Since this direct-ack stuff is all @@ -2748,8 +2753,9 @@ public class Connection implements Runnable { private void processInputBuffer() throws ConnectionException, IOException { inputBuffer.flip(); - synchronized (ioFilter.getSynchObject()) { - ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer); + try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) { + final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer(); + peerDataBuffer.flip(); boolean done = false; diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java index 48eb984..42ecf04 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java @@ -26,6 +26,7 @@ import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.internal.Assert; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.net.BufferPool; +import org.apache.geode.internal.net.ByteBufferSharing; import org.apache.geode.internal.net.NioFilter; import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -54,8 +55,8 @@ public class MsgReader { } Header readHeader() throws IOException { - synchronized (ioFilter.getSynchObject()) { - ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES); + try (final ByteBufferSharing sharedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES)) { + ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer(); Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES); @@ -89,8 +90,8 @@ public class MsgReader { */ DistributionMessage readMessage(Header header) throws IOException, ClassNotFoundException { - synchronized (ioFilter.getSynchObject()) { - ByteBuffer nioInputBuffer = readAtLeast(header.messageLength); + try (final ByteBufferSharing sharedBuffer = readAtLeast(header.messageLength)) { + ByteBuffer nioInputBuffer = sharedBuffer.getBuffer(); Assert.assertTrue(nioInputBuffer.remaining() >= header.messageLength); this.getStats().incMessagesBeingReceived(true, header.messageLength); long startSer = this.getStats().startMsgDeserialization(); @@ -112,8 +113,8 @@ public class MsgReader { void readChunk(Header header, MsgDestreamer md) throws IOException { - synchronized (ioFilter.getSynchObject()) { - ByteBuffer unwrappedBuffer = readAtLeast(header.messageLength); + try (final ByteBufferSharing sharedBuffer = readAtLeast(header.messageLength)) { + ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer(); this.getStats().incMessagesBeingReceived(md.size() == 0, header.messageLength); md.addChunk(unwrappedBuffer, header.messageLength); // show that the bytes have been consumed by adjusting the buffer's position @@ -123,7 +124,7 @@ public class MsgReader { - private ByteBuffer readAtLeast(int bytes) throws IOException { + private ByteBufferSharing readAtLeast(int bytes) throws IOException { peerNetData = ioFilter.ensureWrappedCapacity(bytes, peerNetData, BufferPool.BufferType.TRACKED_RECEIVER); return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData); diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java new file mode 100644 index 0000000..bb5a75f --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.net; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; + +import org.junit.Before; +import org.junit.Test; + +public class ByteBufferSharingImplTest { + + private ByteBufferSharingImpl sharing; + private BufferPool poolMock; + private CountDownLatch clientHasOpenedResource; + private CountDownLatch clientMayComplete; + + @Before + public void before() { + poolMock = mock(BufferPool.class); + sharing = + new ByteBufferSharingImpl(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER, + poolMock); + clientHasOpenedResource = new CountDownLatch(1); + clientMayComplete = new CountDownLatch(1); + } + + @Test + public void balancedCloseOwnerIsLastReferenceHolder() throws InterruptedException { + resourceOwnerIsLastReferenceHolder("client with balanced close calls", () -> { + try (final ByteBufferSharing _unused = sharing.open()) { + } + }); + } + + @Test + public void extraCloseOwnerIsLastReferenceHolder() throws InterruptedException { + resourceOwnerIsLastReferenceHolder("client with extra close calls", () -> { + final ByteBufferSharing sharing2 = sharing.open(); + sharing2.close(); + verify(poolMock, times(0)).releaseBuffer(any(), any()); + assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); + verify(poolMock, times(0)).releaseBuffer(any(), any()); + }); + } + + @Test + public void balancedCloseClientIsLastReferenceHolder() throws InterruptedException { + clientIsLastReferenceHolder("client with balanced close calls", () -> { + try (final ByteBufferSharing _unused = sharing.open()) { + clientHasOpenedResource.countDown(); + blockClient(); + } + }); + } + + @Test + public void extraCloseClientIsLastReferenceHolder() throws InterruptedException { + clientIsLastReferenceHolder("client with extra close calls", () -> { + final ByteBufferSharing sharing2 = sharing.open(); + clientHasOpenedResource.countDown(); + blockClient(); + sharing2.close(); + verify(poolMock, times(1)).releaseBuffer(any(), any()); + assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); + System.out.println("here"); + }); + } + + @Test + public void extraCloseDoesNotPrematurelyReturnBufferToPool() { + final ByteBufferSharing sharing2 = sharing.open(); + sharing2.close(); + assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); + verify(poolMock, times(0)).releaseBuffer(any(), any()); + sharing.destruct(); + verify(poolMock, times(1)).releaseBuffer(any(), any()); + } + + @Test + public void extraCloseDoesNotDecrementRefCount() { + final ByteBufferSharing sharing2 = sharing.open(); + sharing2.close(); + assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); + final ByteBufferSharing sharing3 = this.sharing.open(); + sharing.destruct(); + verify(poolMock, times(0)).releaseBuffer(any(), any()); + } + + private void resourceOwnerIsLastReferenceHolder(final String name, final Runnable client) + throws InterruptedException { + /* + * Thread.currentThread() is thread is playing the role of the (ByteBuffer) resource owner + */ + + /* + * clientThread thread is playing the role of the client (of the resource owner) + */ + final Thread clientThread = new Thread(client, name); + clientThread.start(); + clientThread.join(); + + verify(poolMock, times(0)).releaseBuffer(any(), any()); + + sharing.destruct(); + + verify(poolMock, times(1)).releaseBuffer(any(), any()); + } + + private void clientIsLastReferenceHolder(final String name, final Runnable client) + throws InterruptedException { + /* + * Thread.currentThread() is thread is playing the role of the (ByteBuffer) resource owner + */ + + /* + * clientThread thread is playing the role of the client (of the resource owner) + */ + final Thread clientThread = new Thread(client, name); + clientThread.start(); + + clientHasOpenedResource.await(); + + sharing.destruct(); + + verify(poolMock, times(0)).releaseBuffer(any(), any()); + + clientMayComplete.countDown(); // let client finish + + clientThread.join(); + + verify(poolMock, times(1)).releaseBuffer(any(), any()); + } + + private void blockClient() { + try { + clientMayComplete.await(); + } catch (InterruptedException e) { + fail("test client thread interrupted: " + e); + } + } + +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java index 3d394fb..7ab838c 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java @@ -50,7 +50,8 @@ public class NioPlainEngineTest { public void unwrap() { ByteBuffer buffer = ByteBuffer.allocate(100); buffer.position(0).limit(buffer.capacity()); - nioEngine.unwrap(buffer); + try (final ByteBufferSharing unused = nioEngine.unwrap(buffer)) { + } assertThat(buffer.position()).isEqualTo(buffer.limit()); } @@ -116,23 +117,29 @@ public class NioPlainEngineTest { nioEngine.lastReadPosition = 10; - ByteBuffer data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); - verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); - assertThat(data.position()).isEqualTo(0); - assertThat(data.limit()).isEqualTo(amountToRead); - assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes); - assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead); - - data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); - verify(mockChannel, times(5)).read(any(ByteBuffer.class)); - // at end of last readAtLeast data - assertThat(data.position()).isEqualTo(amountToRead); - // we read amountToRead bytes - assertThat(data.limit()).isEqualTo(amountToRead * 2); - // we did 2 more reads from the network - assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 5 + preexistingBytes); - // the next read will start at the end of consumed data - assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead * 2); + try (final ByteBufferSharing sharedBuffer = + nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) { + ByteBuffer data = sharedBuffer.getBuffer(); + verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); + assertThat(data.position()).isEqualTo(0); + assertThat(data.limit()).isEqualTo(amountToRead); + assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes); + assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead); + } + + try (final ByteBufferSharing sharedBuffer = + nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) { + final ByteBuffer data = sharedBuffer.getBuffer(); + verify(mockChannel, times(5)).read(any(ByteBuffer.class)); + // at end of last readAtLeast data + assertThat(data.position()).isEqualTo(amountToRead); + // we read amountToRead bytes + assertThat(data.limit()).isEqualTo(amountToRead * 2); + // we did 2 more reads from the network + assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 5 + preexistingBytes); + // the next read will start at the end of consumed data + assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead * 2); + } } @@ -147,7 +154,9 @@ public class NioPlainEngineTest { nioEngine.lastReadPosition = 10; - nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); + try (final ByteBufferSharing unused = + nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) { + } } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java index ee4aaa3..e9b01cf 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java @@ -69,6 +69,7 @@ public class NioSslEngineTest { private DMStats mockStats; private NioSslEngine nioSslEngine; private NioSslEngine spyNioSslEngine; + private BufferPool spyBufferPool; @Before public void setUp() throws Exception { @@ -81,13 +82,17 @@ public class NioSslEngineTest { mockStats = mock(DMStats.class); - nioSslEngine = new NioSslEngine(mockEngine, new BufferPool(mockStats)); + final BufferPool bufferPool = new BufferPool(mockStats); + spyBufferPool = spy(bufferPool); + nioSslEngine = new NioSslEngine(mockEngine, spyBufferPool); spyNioSslEngine = spy(nioSslEngine); } @Test - public void engineUsesDirectBuffers() { - assertThat(nioSslEngine.myNetData.isDirect()).isTrue(); + public void engineUsesDirectBuffers() throws IOException { + try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { + assertThat(outputSharing.getBuffer().isDirect()).isTrue(); + } } @Test @@ -119,7 +124,7 @@ public class NioSslEngineTest { verify(mockEngine, atLeast(2)).getHandshakeStatus(); verify(mockEngine, times(3)).wrap(any(ByteBuffer.class), any(ByteBuffer.class)); verify(mockEngine, times(3)).unwrap(any(ByteBuffer.class), any(ByteBuffer.class)); - verify(spyNioSslEngine, times(2)).expandWriteBuffer(any(BufferPool.BufferType.class), + verify(spyBufferPool, times(2)).expandWriteBufferIfNeeded(any(BufferPool.BufferType.class), any(ByteBuffer.class), any(Integer.class)); verify(spyNioSslEngine, times(1)).handleBlockingTasks(); verify(mockChannel, times(3)).read(any(ByteBuffer.class)); @@ -183,148 +188,148 @@ public class NioSslEngineTest { .hasMessageContaining("SSL Handshake terminated with status"); } - - @Test - public void checkClosed() throws Exception { - nioSslEngine.checkClosed(); - } - - @Test(expected = IOException.class) - public void checkClosedThrows() throws Exception { - when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn( - new SSLEngineResult(CLOSED, FINISHED, 0, 100)); - nioSslEngine.close(mock(SocketChannel.class)); - nioSslEngine.checkClosed(); - } - - @Test - public void synchObjectIsSelf() { - // for thread-safety the synchronization object given to outside entities - // must be the the engine itself. This allows external manipulation or - // use of the engine's buffers to be protected in the same way as its synchronized - // methods - assertThat(nioSslEngine.getSynchObject()).isSameAs(nioSslEngine); - } - @Test public void wrap() throws Exception { - // make the application data too big to fit into the engine's encryption buffer - ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100); - byte[] appBytes = new byte[appData.capacity()]; - Arrays.fill(appBytes, (byte) 0x1F); - appData.put(appBytes); - appData.flip(); - - // create an engine that will transfer bytes from the application buffer to the encrypted buffer - TestSSLEngine testEngine = new TestSSLEngine(); - testEngine.addReturnResult( - new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining())); - spyNioSslEngine.engine = testEngine; - - ByteBuffer wrappedBuffer = spyNioSslEngine.wrap(appData); - - verify(spyNioSslEngine, times(1)).expandWriteBuffer(any(BufferPool.BufferType.class), - any(ByteBuffer.class), any(Integer.class)); - appData.flip(); - assertThat(wrappedBuffer).isEqualTo(appData); - verify(spyNioSslEngine, times(1)).handleBlockingTasks(); + try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { + + // make the application data too big to fit into the engine's encryption buffer + ByteBuffer appData = + ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100); + byte[] appBytes = new byte[appData.capacity()]; + Arrays.fill(appBytes, (byte) 0x1F); + appData.put(appBytes); + appData.flip(); + + // create an engine that will transfer bytes from the application buffer to the encrypted + // buffer + TestSSLEngine testEngine = new TestSSLEngine(); + testEngine.addReturnResult( + new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining())); + spyNioSslEngine.engine = testEngine; + + try (final ByteBufferSharing outputSharing2 = spyNioSslEngine.wrap(appData)) { + ByteBuffer wrappedBuffer = outputSharing2.getBuffer(); + + verify(spyBufferPool, times(1)).expandWriteBufferIfNeeded(any(BufferPool.BufferType.class), + any(ByteBuffer.class), any(Integer.class)); + appData.flip(); + assertThat(wrappedBuffer).isEqualTo(appData); + } + verify(spyNioSslEngine, times(1)).handleBlockingTasks(); + } } @Test - public void wrapFails() { - // make the application data too big to fit into the engine's encryption buffer - ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100); - byte[] appBytes = new byte[appData.capacity()]; - Arrays.fill(appBytes, (byte) 0x1F); - appData.put(appBytes); - appData.flip(); - - // create an engine that will transfer bytes from the application buffer to the encrypted buffer - TestSSLEngine testEngine = new TestSSLEngine(); - testEngine.addReturnResult( - new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining())); - spyNioSslEngine.engine = testEngine; - - assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class) - .hasMessageContaining("Error encrypting data"); + public void wrapFails() throws IOException { + try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { + // make the application data too big to fit into the engine's encryption buffer + ByteBuffer appData = + ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100); + byte[] appBytes = new byte[appData.capacity()]; + Arrays.fill(appBytes, (byte) 0x1F); + appData.put(appBytes); + appData.flip(); + + // create an engine that will transfer bytes from the application buffer to the encrypted + // buffer + TestSSLEngine testEngine = new TestSSLEngine(); + testEngine.addReturnResult( + new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining())); + spyNioSslEngine.engine = testEngine; + + assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class) + .hasMessageContaining("Error encrypting data"); + } } @Test public void unwrapWithBufferOverflow() throws Exception { - // make the application data too big to fit into the engine's encryption buffer - int originalPeerAppDataCapacity = nioSslEngine.peerAppData.capacity(); - int originalPeerAppDataPosition = originalPeerAppDataCapacity / 2; - nioSslEngine.peerAppData.position(originalPeerAppDataPosition); - ByteBuffer wrappedData = ByteBuffer.allocate(originalPeerAppDataCapacity + 100); - byte[] netBytes = new byte[wrappedData.capacity()]; - Arrays.fill(netBytes, (byte) 0x1F); - wrappedData.put(netBytes); - wrappedData.flip(); - - // create an engine that will transfer bytes from the application buffer to the encrypted buffer - TestSSLEngine testEngine = new TestSSLEngine(); - spyNioSslEngine.engine = testEngine; - - testEngine.addReturnResult( - new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // results in 30,000 byte buffer - new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 50,000 bytes - new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 90,000 bytes - new SSLEngineResult(OK, FINISHED, netBytes.length, netBytes.length)); - - int expectedCapacity = 2 * originalPeerAppDataCapacity - originalPeerAppDataPosition; - expectedCapacity = - 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition; - expectedCapacity = - 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition; - ByteBuffer unwrappedBuffer = spyNioSslEngine.unwrap(wrappedData); - unwrappedBuffer.flip(); - assertThat(unwrappedBuffer.capacity()).isEqualTo(expectedCapacity); + try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { + // make the application data too big to fit into the engine's encryption buffer + final ByteBuffer peerAppData = inputSharing.getBuffer(); + + int originalPeerAppDataCapacity = peerAppData.capacity(); + int originalPeerAppDataPosition = originalPeerAppDataCapacity / 2; + peerAppData.position(originalPeerAppDataPosition); + ByteBuffer wrappedData = ByteBuffer.allocate(originalPeerAppDataCapacity + 100); + byte[] netBytes = new byte[wrappedData.capacity()]; + Arrays.fill(netBytes, (byte) 0x1F); + wrappedData.put(netBytes); + wrappedData.flip(); + + // create an engine that will transfer bytes from the application buffer to the encrypted + // buffer + TestSSLEngine testEngine = new TestSSLEngine(); + spyNioSslEngine.engine = testEngine; + + testEngine.addReturnResult( + new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // results in 30,000 byte buffer + new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 50,000 bytes + new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 90,000 bytes + new SSLEngineResult(OK, FINISHED, netBytes.length, netBytes.length)); + + int expectedCapacity = 2 * originalPeerAppDataCapacity - originalPeerAppDataPosition; + expectedCapacity = + 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition; + expectedCapacity = + 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition; + try (final ByteBufferSharing sharedBuffer = spyNioSslEngine.unwrap(wrappedData)) { + ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer(); + unwrappedBuffer.flip(); + assertThat(unwrappedBuffer.capacity()).isEqualTo(expectedCapacity); + } + } } @Test public void unwrapWithBufferUnderflow() throws Exception { - ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity()); - byte[] netBytes = new byte[wrappedData.capacity() / 2]; - Arrays.fill(netBytes, (byte) 0x1F); - wrappedData.put(netBytes); - wrappedData.flip(); - - // create an engine that will transfer bytes from the application buffer to the encrypted buffer - TestSSLEngine testEngine = new TestSSLEngine(); - testEngine.addReturnResult(new SSLEngineResult(BUFFER_UNDERFLOW, NEED_TASK, 0, 0)); - spyNioSslEngine.engine = testEngine; - - ByteBuffer unwrappedBuffer = spyNioSslEngine.unwrap(wrappedData); - unwrappedBuffer.flip(); - assertThat(unwrappedBuffer.remaining()).isEqualTo(0); - assertThat(wrappedData.position()).isEqualTo(netBytes.length); - } - - @Test - public void unwrapWithDecryptionError() { - // make the application data too big to fit into the engine's encryption buffer - ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity()); - byte[] netBytes = new byte[wrappedData.capacity() / 2]; - Arrays.fill(netBytes, (byte) 0x1F); - wrappedData.put(netBytes); - wrappedData.flip(); - - // create an engine that will transfer bytes from the application buffer to the encrypted buffer - TestSSLEngine testEngine = new TestSSLEngine(); - testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0)); - spyNioSslEngine.engine = testEngine; - - assertThatThrownBy(() -> spyNioSslEngine.unwrap(wrappedData)).isInstanceOf(SSLException.class) - .hasMessageContaining("Error decrypting data"); + try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { + ByteBuffer wrappedData = + ByteBuffer.allocate(inputSharing.getBuffer().capacity()); + byte[] netBytes = new byte[wrappedData.capacity() / 2]; + Arrays.fill(netBytes, (byte) 0x1F); + wrappedData.put(netBytes); + wrappedData.flip(); + + // create an engine that will transfer bytes from the application buffer to the encrypted + // buffer + TestSSLEngine testEngine = new TestSSLEngine(); + testEngine.addReturnResult(new SSLEngineResult(BUFFER_UNDERFLOW, NEED_TASK, 0, 0)); + spyNioSslEngine.engine = testEngine; + + try (final ByteBufferSharing sharedBuffer = spyNioSslEngine.unwrap(wrappedData)) { + ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer(); + unwrappedBuffer.flip(); + assertThat(unwrappedBuffer.remaining()).isEqualTo(0); + } + assertThat(wrappedData.position()).isEqualTo(netBytes.length); + } } @Test - public void ensureUnwrappedCapacity() { - ByteBuffer wrappedBuffer = ByteBuffer.allocate(netBufferSize); - int requestedCapacity = nioSslEngine.getUnwrappedBuffer(wrappedBuffer).capacity() * 2; - ByteBuffer unwrappedBuffer = nioSslEngine.ensureUnwrappedCapacity(requestedCapacity); - assertThat(unwrappedBuffer.capacity()).isGreaterThanOrEqualTo(requestedCapacity); + public void unwrapWithDecryptionError() throws IOException { + try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { + // make the application data too big to fit into the engine's encryption buffer + ByteBuffer wrappedData = + ByteBuffer.allocate(inputSharing.getBuffer().capacity()); + byte[] netBytes = new byte[wrappedData.capacity() / 2]; + Arrays.fill(netBytes, (byte) 0x1F); + wrappedData.put(netBytes); + wrappedData.flip(); + + // create an engine that will transfer bytes from the application buffer to the encrypted + // buffer + TestSSLEngine testEngine = new TestSSLEngine(); + testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0)); + spyNioSslEngine.engine = testEngine; + + assertThatThrownBy(() -> { + try (final ByteBufferSharing unused = spyNioSslEngine.unwrap(wrappedData)) { + } + }).isInstanceOf(SSLException.class) + .hasMessageContaining("Error decrypting data"); + } } @Test @@ -338,7 +343,11 @@ public class NioSslEngineTest { when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn( new SSLEngineResult(CLOSED, FINISHED, 0, 0)); nioSslEngine.close(mockChannel); - assertThatThrownBy(() -> nioSslEngine.checkClosed()).isInstanceOf(IOException.class) + assertThatThrownBy(() -> nioSslEngine.shareOutputBuffer().getBuffer()) + .isInstanceOf(IOException.class) + .hasMessageContaining("NioSslEngine has been closed"); + assertThatThrownBy(() -> nioSslEngine.shareInputBuffer().getBuffer()) + .isInstanceOf(IOException.class) .hasMessageContaining("NioSslEngine has been closed"); nioSslEngine.close(mockChannel); } @@ -367,10 +376,12 @@ public class NioSslEngineTest { when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE); when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> { - // give the NioSslEngine something to write on its socket channel, simulating a TLS close - // message - nioSslEngine.myNetData.put("Goodbye cruel world".getBytes()); - return new SSLEngineResult(CLOSED, FINISHED, 0, 0); + try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { + // give the NioSslEngine something to write on its socket channel, simulating a TLS close + // message + outputSharing.getBuffer().put("Goodbye cruel world".getBytes()); + return new SSLEngineResult(CLOSED, FINISHED, 0, 0); + } }); when(mockChannel.write(any(ByteBuffer.class))).thenThrow(new ClosedChannelException()); nioSslEngine.close(mockChannel); @@ -401,37 +412,42 @@ public class NioSslEngineTest { ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000); SocketChannel mockChannel = mock(SocketChannel.class); - // force a compaction by making the decoded buffer appear near to being full - ByteBuffer unwrappedBuffer = nioSslEngine.peerAppData; - unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead); - unwrappedBuffer.limit(unwrappedBuffer.position() + preexistingBytes); - - // simulate some socket reads - when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() { - @Override - public Integer answer(InvocationOnMock invocation) throws Throwable { - ByteBuffer buffer = invocation.getArgument(0); - buffer.position(buffer.position() + individualRead); - return individualRead; + try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { + // force a compaction by making the decoded buffer appear near to being full + ByteBuffer unwrappedBuffer = inputSharing.getBuffer(); + unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead); + unwrappedBuffer.limit(unwrappedBuffer.position() + preexistingBytes); + + // simulate some socket reads + when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() { + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + ByteBuffer buffer = invocation.getArgument(0); + buffer.position(buffer.position() + individualRead); + return individualRead; + } + }); + + TestSSLEngine testSSLEngine = new TestSSLEngine(); + testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); + nioSslEngine.engine = testSSLEngine; + + try (final ByteBufferSharing sharedBuffer = + nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) { + ByteBuffer data = sharedBuffer.getBuffer(); + verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); + assertThat(data.position()).isEqualTo(0); + assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes); } - }); - - TestSSLEngine testSSLEngine = new TestSSLEngine(); - testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); - nioSslEngine.engine = testSSLEngine; - - ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); - verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); - assertThat(data.position()).isEqualTo(0); - assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes); + } } /** - * This tests the case where a message header has been read and part of a message has been - * read, but the decoded buffer is too small to hold all of the message. In this case - * the readAtLeast method will have to expand the capacity of the decoded buffer and return - * the new, expanded, buffer as the method result. + * This tests the case where a message header has been read and part of a message has been read, + * but the decoded buffer is too small to hold all of the message. In this case the readAtLeast + * method will have to expand the capacity of the decoded buffer and return the new, expanded, + * buffer as the method result. */ @Test public void readAtLeastUsingSmallAppBuffer() throws Exception { @@ -445,7 +461,11 @@ public class NioSslEngineTest { int initialUnwrappedBufferSize = 100; ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize); unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored - nioSslEngine.peerAppData = unwrappedBuffer; + + try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { + final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing; + inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer); + } // simulate some socket reads when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() { @@ -465,22 +485,26 @@ public class NioSslEngineTest { new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); // 130 + 60 bytes = 190 nioSslEngine.engine = testSSLEngine; - ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); - verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); - assertThat(data.position()).isEqualTo(0); - assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes); - // The initial available space in the unwrapped buffer should have doubled - int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes; - assertThat(nioSslEngine.peerAppData.capacity()) - .isEqualTo(2 * initialFreeSpace + preexistingBytes); + try (final ByteBufferSharing sharedBuffer = + nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) { + ByteBuffer data = sharedBuffer.getBuffer(); + verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); + assertThat(data.position()).isEqualTo(0); + assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes); + // The initial available space in the unwrapped buffer should have doubled + int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes; + try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { + assertThat(inputSharing.getBuffer().capacity()) + .isEqualTo(2 * initialFreeSpace + preexistingBytes); + } + } } /** - * This tests the case where a message header has been read and part of a message has been - * read, but the decoded buffer is too small to hold all of the message. In this case - * the buffer is completely full and should only take one overflow response to resolve - * the problem. + * This tests the case where a message header has been read and part of a message has been read, + * but the decoded buffer is too small to hold all of the message. In this case the buffer is + * completely full and should only take one overflow response to resolve the problem. */ @Test public void readAtLeastUsingSmallAppBufferAtWriteLimit() throws Exception { @@ -495,7 +519,10 @@ public class NioSslEngineTest { // force buffer expansion by making a small decoded buffer appear near to being full ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize); unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored - nioSslEngine.peerAppData = unwrappedBuffer; + try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { + final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing; + inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer); + } // simulate some socket reads when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() { @@ -515,11 +542,14 @@ public class NioSslEngineTest { new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); nioSslEngine.engine = testSSLEngine; - ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); - verify(mockChannel, times(1)).read(isA(ByteBuffer.class)); - assertThat(data.position()).isEqualTo(0); - assertThat(data.limit()) - .isEqualTo(individualRead * testSSLEngine.getNumberOfUnwraps() + preexistingBytes); + try (final ByteBufferSharing sharedBuffer = + nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) { + ByteBuffer data = sharedBuffer.getBuffer(); + verify(mockChannel, times(1)).read(isA(ByteBuffer.class)); + assertThat(data.position()).isEqualTo(0); + assertThat(data.limit()) + .isEqualTo(individualRead * testSSLEngine.getNumberOfUnwraps() + preexistingBytes); + } } @@ -657,8 +687,8 @@ public class NioSslEngineTest { } /** - * add an engine operation result to be returned by wrap or unwrap. - * Like Mockito's thenReturn(), the last return result will repeat forever + * add an engine operation result to be returned by wrap or unwrap. Like Mockito's thenReturn(), + * the last return result will repeat forever */ void addReturnResult(SSLEngineResult... sslEngineResult) { for (SSLEngineResult result : sslEngineResult) {