This is an automated email from the ASF dual-hosted git repository. burcham pushed a commit to branch support/1.12 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 1a8eb5aec580eb75871060793ea65d62f5f2d959 Author: Bill Burcham <bill.burc...@gmail.com> AuthorDate: Sat Apr 17 09:12:13 2021 -0700 GEODE-9141: (2 of 2) Handle in-buffer concurrency * Connection uses a ByteBufferVendor to mediate access to inputBuffer * Prevent return to pool before socket closer is finished (cherry picked from commit 9d0d4d1d33794d0f6a21c3bcae71e965cbbd7fbd) (cherry picked from commit 9e8b3972fcf449eed4d41c254cf3f553e517eaa1) (cherry picked from commit c4730deed48bb4513bd04486d4e8c09cdd3bb5a9) --- ...LSocketHostNameVerificationIntegrationTest.java | 6 +- .../internal/net/SSLSocketIntegrationTest.java | 3 +- .../apache/geode/codeAnalysis/excludedClasses.txt | 2 +- .../geode/internal/net/ByteBufferSharing.java | 15 + .../geode/internal/net/ByteBufferSharingNoOp.java | 5 + .../geode/internal/net/ByteBufferVendor.java | 144 ++++++--- .../apache/geode/internal/net/NioSslEngine.java | 50 ++- .../apache/geode/internal/net/SocketCreator.java | 9 +- .../org/apache/geode/internal/tcp/Connection.java | 334 +++++++++++---------- .../geode/internal/net/ByteBufferVendorTest.java | 36 ++- .../geode/internal/net/NioSslEngineTest.java | 41 +-- .../apache/geode/internal/tcp/ConnectionTest.java | 1 + 12 files changed, 361 insertions(+), 285 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java index a70f3b1..e86bfea 100755 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java @@ -103,6 +103,9 @@ public class SSLSocketHostNameVerificationIntegrationTest { @Before public void setUp() throws Exception { + + SocketCreatorFactory.close(); // to clear socket creators made in previous tests + IgnoredException.addIgnoredException("javax.net.ssl.SSLException: Read timed out"); this.localHost = InetAddress.getLoopbackAddress(); @@ -172,7 +175,7 @@ public class SSLSocketHostNameVerificationIntegrationTest { try { this.socketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(), - sslEngine, 0, true, + sslEngine, 0, ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize()), new BufferPool(mock(DMStats.class))); @@ -205,7 +208,6 @@ public class SSLSocketHostNameVerificationIntegrationTest { sc.handshakeSSLSocketChannel(socket.getChannel(), sslEngine, timeoutMillis, - false, ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize()), new BufferPool(mock(DMStats.class))); } catch (Throwable throwable) { diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java index e7ac191..13e9d5b 100755 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java @@ -217,7 +217,7 @@ public class SSLSocketIntegrationTest { clientSocket = clientChannel.socket(); NioSslEngine engine = clusterSocketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(), - clusterSocketCreator.createSSLEngine("localhost", 1234, true), 0, true, + clusterSocketCreator.createSSLEngine("localhost", 1234, true), 0, ByteBuffer.allocate(65535), new BufferPool(mock(DMStats.class))); clientChannel.configureBlocking(true); @@ -267,7 +267,6 @@ public class SSLSocketIntegrationTest { sc.handshakeSSLSocketChannel(socket.getChannel(), sc.createSSLEngine("localhost", 1234, false), timeoutMillis, - false, ByteBuffer.allocate(65535), new BufferPool(mock(DMStats.class))); diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt index af3bd1e..cd1af3a 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt @@ -104,4 +104,4 @@ org/apache/geode/cache/query/internal/xml/ElementType org/apache/geode/cache/query/internal/xml/ElementType$1 org/apache/geode/cache/query/internal/xml/ElementType$2 org/apache/geode/cache/query/internal/xml/ElementType$3 -org/apache/geode/internal/net/ByteBufferSharingImpl$OpenAttemptTimedOut \ No newline at end of file +org/apache/geode/internal/net/ByteBufferVendor$OpenAttemptTimedOut \ No newline at end of file diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java index cdfa897..c8a94ce 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java @@ -42,12 +42,27 @@ public interface ByteBufferSharing extends AutoCloseable { * * Subsequent calls to {@link #getBuffer()} will return that new buffer too. * + * This variant is for use when the buffer is being written to. + * * @return the same buffer or a different (bigger) buffer * @throws IOException if the buffer is no longer accessible */ ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException; /** + * Expand the buffer if needed. This may return a different object so be sure to pay attention to + * the return value if you need access to the potentially- expanded buffer. + * + * Subsequent calls to {@link #getBuffer()} will return that new buffer too. + * + * This variant is for use when the buffer is being read from. + * + * @return the same buffer or a different (bigger) buffer + * @throws IOException if the buffer is no longer accessible + */ + ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException; + + /** * Override {@link AutoCloseable#close()} without throws clause since we don't need one. */ @Override diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java index 4a8bc49..4f36e5b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java @@ -48,5 +48,10 @@ class ByteBufferSharingNoOp implements ByteBufferSharing { } @Override + public ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException { + throw new UnsupportedOperationException("Can't expand buffer when using NioPlainEngine"); + } + + @Override public void close() {} } diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java index 4933247..1dc74f0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java @@ -17,6 +17,7 @@ package org.apache.geode.internal.net; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -27,49 +28,49 @@ import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.internal.net.BufferPool.BufferType; /** - * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a - * {@link ByteBuffer}) is available (for reading and modification) in the scope of the + * Produces (via {@link #open()}) an {@link ByteBufferSharing} meant to used only within a + * try-with-resources block. The resource controls access to a secondary resource + * (via {@link ByteBufferSharing#getBuffer()}) within the scope of try-with-resources. + * Neither the object returned by {@link #open()}, nor the object returned by invoking + * {@link ByteBufferSharing#getBuffer()} on that object may be used outside the scope of * try-with-resources. */ -class ByteBufferVendor implements ByteBufferSharing { +public class ByteBufferVendor { static class OpenAttemptTimedOut extends Exception { } - private final Lock lock; - private final AtomicBoolean isDestructed; - // mutable because in general our ByteBuffer may need to be resized (grown or compacted) - private volatile ByteBuffer buffer; - private final BufferType bufferType; - private final AtomicInteger counter; - private final BufferPool bufferPool; + private interface ByteBufferSharingInternal extends ByteBufferSharing { + void releaseBuffer(); + } - /** - * This constructor is for use only by the owner of the shared resource (a {@link ByteBuffer}). + private final Lock lock = new ReentrantLock(); + private final AtomicBoolean isDestructed = new AtomicBoolean(false); + private final AtomicInteger counter = new AtomicInteger(1); + // the object referenced by sharing is guarded by lock + private final ByteBufferSharingInternal sharing; + + /* + * These constructors are for use only by the owner of the shared resource. * * A resource owner must invoke {@link #open()} once for each reference that escapes (is passed * to an external object or is returned to an external caller.) * - * This constructor acquires no lock. The reference count will be 1 after this constructor + * Constructors acquire no locks. The reference count will be 1 after a constructor * completes. */ - ByteBufferVendor(final ByteBuffer buffer, final BufferType bufferType, - final BufferPool bufferPool) { - this.buffer = buffer; - this.bufferType = bufferType; - this.bufferPool = bufferPool; - lock = new ReentrantLock(); - counter = new AtomicInteger(1); - isDestructed = new AtomicBoolean(false); - } /** - * The destructor. Called by the resource owner to undo the work of the constructor. + * When you have a ByteBuffer available before construction, use this constructor. + * + * @param bufferArg is the ByteBuffer + * @param bufferType needed for freeing the buffer later + * @param bufferPool needed for freeing the buffer later */ - void destruct() { - if (isDestructed.compareAndSet(false, true)) { - dropReference(); - } + public ByteBufferVendor(final ByteBuffer bufferArg, + final BufferType bufferType, + final BufferPool bufferPool) { + sharing = new ByteBufferSharingInternalImpl(bufferArg, bufferType, bufferPool); } /** @@ -78,18 +79,19 @@ class ByteBufferVendor implements ByteBufferSharing { * * Resource owners call this method as the last thing before returning a reference to the caller. * That caller binds that reference to a variable in a try-with-resources statement and relies on - * the AutoCloseable protocol to invoke {@link #close()} on the object at the end of the block. + * the AutoCloseable protocol to invoke {@link AutoCloseable#close()} on the object at + * the end of the block. */ - ByteBufferSharing open() throws IOException { + public ByteBufferSharing open() throws IOException { lock.lock(); addReferenceAfterLock(); - return this; + return sharing; } /** * This variant throws {@link OpenAttemptTimedOut} if it can't acquire the lock in time. */ - ByteBufferSharing open(final long time, final TimeUnit unit) + public ByteBufferSharing open(final long time, final TimeUnit unit) throws OpenAttemptTimedOut, IOException { try { if (!lock.tryLock(time, unit)) { @@ -100,24 +102,25 @@ class ByteBufferVendor implements ByteBufferSharing { throw new OpenAttemptTimedOut(); } addReferenceAfterLock(); - return this; + return sharing; } - @Override - public ByteBuffer getBuffer() throws IOException { - if (isDestructed.get()) { - throwClosed(); + /** + * The destructor. Called by the resource owner to undo the work of the constructor. + */ + public void destruct() { + if (isDestructed.compareAndSet(false, true)) { + dropReference(); } - return buffer; } - @Override - public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException { - return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity); + private void exposingResource() throws IOException { + if (isDestructed.get()) { + throwClosed(); + } } - @Override - public void close() { + private void close() { /* * We are counting on our ReentrantLock throwing an exception if the current thread * does not hold the lock. In that case dropReference() will not be called. This @@ -142,16 +145,11 @@ class ByteBufferVendor implements ByteBufferSharing { private int dropReference() { final int usages = counter.decrementAndGet(); if (usages == 0) { - bufferPool.releaseBuffer(bufferType, buffer); + sharing.releaseBuffer(); } return usages; } - @VisibleForTesting - public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) { - buffer = newBufferForTesting; - } - private void addReferenceAfterLock() throws IOException { try { addReference(); @@ -165,4 +163,54 @@ class ByteBufferVendor implements ByteBufferSharing { throw new IOException("NioSslEngine has been closed"); } + private class ByteBufferSharingInternalImpl implements ByteBufferSharingInternal { + + /* + * mutable because in general our ByteBuffer may need to be resized (grown or compacted) + * no concurrency concerns since ByteBufferSharingNotNull is guarded by ByteBufferVendor.lock + */ + private ByteBuffer buffer; + private final BufferType bufferType; + private final BufferPool bufferPool; + + public ByteBufferSharingInternalImpl(final ByteBuffer buffer, + final BufferType bufferType, + final BufferPool bufferPool) { + Objects.requireNonNull(buffer); + this.buffer = buffer; + this.bufferType = bufferType; + this.bufferPool = bufferPool; + } + + @Override + public ByteBuffer getBuffer() throws IOException { + exposingResource(); + return buffer; + } + + @Override + public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException { + return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity); + } + + @Override + public ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException { + return buffer = bufferPool.expandReadBufferIfNeeded(bufferType, getBuffer(), newCapacity); + } + + @Override + public void close() { + ByteBufferVendor.this.close(); + } + + @Override + public void releaseBuffer() { + bufferPool.releaseBuffer(bufferType, buffer); + } + } + + @VisibleForTesting + public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) { + ((ByteBufferSharingInternalImpl) sharing).buffer = newBufferForTesting; + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java index fc91a31..4c603a0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java @@ -62,12 +62,12 @@ public class NioSslEngine implements NioFilter { /** * holds bytes wrapped by the SSLEngine; a.k.a. myNetData */ - private final ByteBufferVendor outputSharing; + private final ByteBufferVendor outputBufferVendor; /** * holds the last unwrapped data from a peer; a.k.a. peerAppData */ - private final ByteBufferVendor inputSharing; + private final ByteBufferVendor inputBufferVendor; NioSslEngine(SSLEngine engine, BufferPool bufferPool) { SSLSession session = engine.getSession(); @@ -76,10 +76,10 @@ public class NioSslEngine implements NioFilter { closed = false; this.engine = engine; this.bufferPool = bufferPool; - outputSharing = + outputBufferVendor = new ByteBufferVendor(bufferPool.acquireDirectSenderBuffer(packetBufferSize), TRACKED_SENDER, bufferPool); - inputSharing = + inputBufferVendor = new ByteBufferVendor(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize), TRACKED_RECEIVER, bufferPool); } @@ -98,7 +98,7 @@ public class NioSslEngine implements NioFilter { peerNetData.capacity(), engine.getSession().getPacketBufferSize())); } - ByteBuffer handshakeBuffer = peerNetData; + final ByteBuffer handshakeBuffer = peerNetData; handshakeBuffer.clear(); ByteBuffer myAppData = ByteBuffer.wrap(new byte[0]); @@ -135,7 +135,7 @@ public class NioSslEngine implements NioFilter { switch (status) { case NEED_UNWRAP: - try (final ByteBufferSharing inputSharing = shareInputBuffer()) { + try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) { final ByteBuffer peerAppData = inputSharing.getBuffer(); // Receive handshaking data from peer @@ -162,7 +162,7 @@ public class NioSslEngine implements NioFilter { } case NEED_WRAP: - try (final ByteBufferSharing outputSharing = shareOutputBuffer()) { + try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) { final ByteBuffer myNetData = outputSharing.getBuffer(); // Empty the local network packet buffer. @@ -231,7 +231,7 @@ public class NioSslEngine implements NioFilter { @Override public ByteBufferSharing wrap(ByteBuffer appData) throws IOException { - try (final ByteBufferSharing outputSharing = shareOutputBuffer()) { + try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) { ByteBuffer myNetData = outputSharing.getBuffer(); @@ -260,13 +260,13 @@ public class NioSslEngine implements NioFilter { myNetData.flip(); - return shareOutputBuffer(); + return outputBufferVendor.open(); } } @Override public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException { - try (final ByteBufferSharing inputSharing = shareInputBuffer()) { + try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) { ByteBuffer peerAppData = inputSharing.getBuffer(); @@ -292,7 +292,7 @@ public class NioSslEngine implements NioFilter { // partial data - need to read more. When this happens the SSLEngine will not have // changed the buffer position wrappedBuffer.compact(); - return shareInputBuffer(); + return inputBufferVendor.open(); case OK: break; default:// if there is data in the decrypted buffer return it. Otherwise signal that we're @@ -305,7 +305,7 @@ public class NioSslEngine implements NioFilter { } } wrappedBuffer.clear(); - return shareInputBuffer(); + return inputBufferVendor.open(); } } @@ -325,7 +325,7 @@ public class NioSslEngine implements NioFilter { @Override public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer) throws IOException { - try (final ByteBufferSharing inputSharing = shareInputBuffer()) { + try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) { ByteBuffer peerAppData = inputSharing.getBuffer(); @@ -355,13 +355,13 @@ public class NioSslEngine implements NioFilter { } } } - return shareInputBuffer(); + return inputBufferVendor.open(); } } @Override public ByteBufferSharing getUnwrappedBuffer() throws IOException { - return shareInputBuffer(); + return inputBufferVendor.open(); } @Override @@ -377,8 +377,8 @@ public class NioSslEngine implements NioFilter { return; } closed = true; - inputSharing.destruct(); - try (final ByteBufferSharing outputSharing = shareOutputBuffer(1, TimeUnit.MINUTES)) { + inputBufferVendor.destruct(); + try (final ByteBufferSharing outputSharing = outputBufferVendor.open(1, TimeUnit.MINUTES)) { final ByteBuffer myNetData = outputSharing.getBuffer(); if (!engine.isOutboundDone()) { @@ -412,7 +412,7 @@ public class NioSslEngine implements NioFilter { engine.closeOutbound(); } } finally { - outputSharing.destruct(); + outputBufferVendor.destruct(); } } @@ -422,16 +422,12 @@ public class NioSslEngine implements NioFilter { } @VisibleForTesting - public ByteBufferSharing shareOutputBuffer() throws IOException { - return outputSharing.open(); + public ByteBufferVendor getOutputBufferVendorForTestingOnly() throws IOException { + return outputBufferVendor; } - private ByteBufferSharing shareOutputBuffer(final long time, final TimeUnit unit) - throws OpenAttemptTimedOut, IOException { - return outputSharing.open(time, unit); - } - - public ByteBufferSharing shareInputBuffer() throws IOException { - return inputSharing.open(); + @VisibleForTesting + public ByteBufferVendor getInputBufferVendorForTestingOnly() throws IOException { + return inputBufferVendor; } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java index a232fca..a31bbb2 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java @@ -803,21 +803,16 @@ public class SocketCreator extends TcpSocketCreatorImpl { * @param socketChannel the socket's NIO channel * @param engine the sslEngine (see createSSLEngine) * @param timeout handshake timeout in milliseconds. No timeout if <= 0 - * @param clientSocket set to true if you initiated the connect(), false if you accepted it * @param peerNetBuffer the buffer to use in reading data fron socketChannel. This should also be * used in subsequent I/O operations * @return The SSLEngine to be used in processing data for sending/receiving from the channel */ - public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel, SSLEngine engine, + public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel, + SSLEngine engine, int timeout, - boolean clientSocket, ByteBuffer peerNetBuffer, BufferPool bufferPool) throws IOException { - engine.setUseClientMode(clientSocket); - if (!clientSocket) { - engine.setNeedClientAuth(sslConfig.isRequireAuth()); - } while (!socketChannel.finishConnect()) { try { Thread.sleep(50); diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index f5a1886..107aa9f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -18,6 +18,7 @@ import static java.lang.Boolean.FALSE; import static java.lang.ThreadLocal.withInitial; import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT; import static org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX; +import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_RECEIVER; import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX; import java.io.DataInput; @@ -79,6 +80,7 @@ import org.apache.geode.internal.SystemTimer; import org.apache.geode.internal.SystemTimer.SystemTimerTask; import org.apache.geode.internal.net.BufferPool; import org.apache.geode.internal.net.ByteBufferSharing; +import org.apache.geode.internal.net.ByteBufferVendor; import org.apache.geode.internal.net.NioFilter; import org.apache.geode.internal.net.NioPlainEngine; import org.apache.geode.internal.net.SocketCreator; @@ -117,7 +119,7 @@ public class Connection implements Runnable { * Small buffer used for send socket buffer on receiver connections and receive buffer on sender * connections. */ - static final int SMALL_BUFFER_SIZE = + public static final int SMALL_BUFFER_SIZE = Integer.getInteger(GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096); /** @@ -309,11 +311,14 @@ public class Connection implements Runnable { /** name of thread that we're currently performing an operation in (may be null) */ private String ackThreadName; - /** the buffer used for message receipt */ - private ByteBuffer inputBuffer; - - /** Lock used to protect the input buffer */ - public final Object inputBufferLock = new Object(); + /* + * This object mediates access to the input ByteBuffer and ensures its return to + * pool after last use. This reference couldn't be final since it is initialized + * in createIoFilter() not in the constructors. It had to be initialized there + * because in general we have to construct an SSLEngine before we know the buffer + * size and createIoFilter() is where we create that object. + */ + private ByteBufferVendor inputBufferVendor; /** the length of the next message to be dispatched */ private int messageLength; @@ -889,27 +894,28 @@ public class Connection implements Runnable { waitForAddressCompletion(); InternalDistributedMember myAddr = owner.getConduit().getMemberId(); - final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE); - /* - * Note a byte of zero is always written because old products serialized a member id with always - * sends an ip address. My reading of the ip-address specs indicated that the first byte of a - * valid address would never be 0. - */ - connectHandshake.writeByte(0); - connectHandshake.writeByte(HANDSHAKE_VERSION); - // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION - InternalDataSerializer.invokeToData(myAddr, connectHandshake); - connectHandshake.writeBoolean(sharedResource); - connectHandshake.writeBoolean(preserveOrder); - connectHandshake.writeLong(uniqueId); - // write the product version ordinal - Version.CURRENT.writeOrdinal(connectHandshake, true); - connectHandshake.writeInt(dominoCount.get() + 1); - // this writes the sending member + thread name that is stored in senderName - // on the receiver to show the cause of reader thread creation - connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, OperationExecutors.STANDARD_EXECUTOR, - MsgIdGenerator.NO_MSG_ID); - writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null); + try (final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE)) { + /* + * Note a byte of zero is always written because old products serialized a member id with + * always sends an ip address. My reading of the ip-address specs indicated that the first + * byte of a valid address would never be 0. + */ + connectHandshake.writeByte(0); + connectHandshake.writeByte(HANDSHAKE_VERSION); + // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION + InternalDataSerializer.invokeToData(myAddr, connectHandshake); + connectHandshake.writeBoolean(sharedResource); + connectHandshake.writeBoolean(preserveOrder); + connectHandshake.writeLong(uniqueId); + // write the product version ordinal + Version.CURRENT.writeOrdinal(connectHandshake, true); + connectHandshake.writeInt(dominoCount.get() + 1); + // this writes the sending member + thread name that is stored in senderName + // on the receiver to show the cause of reader thread creation + connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, OperationExecutors.STANDARD_EXECUTOR, + MsgIdGenerator.NO_MSG_ID); + writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null); + } } /** @@ -1353,7 +1359,7 @@ public class Connection implements Runnable { if (!isReceiver && !hasResidualReaderThread()) { // receivers release the input buffer when exiting run(). Senders use the // inputBuffer for reading direct-reply responses - releaseInputBuffer(); + inputBufferVendor.destruct(); } lengthSet = false; } @@ -1494,7 +1500,7 @@ public class Connection implements Runnable { } } - releaseInputBuffer(); + inputBufferVendor.destruct(); // make sure that if the reader thread exits we notify a thread waiting for the handshake. notifyHandshakeWaiter(false); @@ -1506,16 +1512,6 @@ public class Connection implements Runnable { } } - private void releaseInputBuffer() { - synchronized (inputBufferLock) { - ByteBuffer tmp = inputBuffer; - if (tmp != null) { - inputBuffer = null; - getBufferPool().releaseReceiveBuffer(tmp); - } - } - } - BufferPool getBufferPool() { return owner.getBufferPool(); } @@ -1537,9 +1533,6 @@ public class Connection implements Runnable { } private void readMessages() { - if (closing.get()) { - return; - } // take a snapshot of uniqueId to detect reconnect attempts SocketChannel channel; try { @@ -1585,8 +1578,6 @@ public class Connection implements Runnable { // we should not change the state of the connection if we are a handshake reader thread // as there is a race between this thread and the application thread doing direct ack boolean handshakeHasBeenRead = false; - // if we're using SSL/TLS the input buffer may already have data to process - boolean skipInitialRead = getInputBuffer().position() > 0; try { for (boolean isInitialRead = true;;) { if (stopped) { @@ -1606,8 +1597,9 @@ public class Connection implements Runnable { break; } - try { - ByteBuffer buff = getInputBuffer(); + try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) { + ByteBuffer buff = inputSharing.getBuffer(); + synchronized (stateLock) { connectionState = STATE_READING; } @@ -1616,6 +1608,8 @@ public class Connection implements Runnable { amountRead = channel.read(buff); } else { isInitialRead = false; + // if we're using SSL/TLS the input buffer may already have data to process + final boolean skipInitialRead = buff.position() > 0; if (!skipInitialRead) { amountRead = channel.read(buff); } else { @@ -1680,11 +1674,11 @@ public class Connection implements Runnable { } catch (IOException e) { // "Socket closed" check needed for Solaris jdk 1.4.2_08 if (!isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage())) { - if (logger.isDebugEnabled() && !isIgnorableIOException(e)) { - logger.debug("{} io exception for {}", p2pReaderName(), this, e); + if (logger.isInfoEnabled() && !isIgnorableIOException(e)) { + logger.info("{} io exception for {}", p2pReaderName(), this, e); } - if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) { - if (logger.isDebugEnabled()) { + if (logger.isDebugEnabled()) { + if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) { logger.debug( "{} received unexpected WSACancelBlockingCall exception, which may result in a hang", p2pReaderName()); @@ -1728,28 +1722,55 @@ public class Connection implements Runnable { private void createIoFilter(SocketChannel channel, boolean clientSocket) throws IOException { if (getConduit().useSSL() && channel != null) { InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress(); + String hostName; + if (remoteAddr != null) { + hostName = remoteAddr.getHostName(); + } else { + hostName = SocketCreator.getHostName(address.getAddress()); + } SSLEngine engine = - getConduit().getSocketCreator().createSSLEngine(address.getHostName(), address.getPort(), - clientSocket); + getConduit().getSocketCreator().createSSLEngine(hostName, + address.getPort(), clientSocket); + + final int packetBufferSize = engine.getSession().getPacketBufferSize(); + + inputBufferVendor = + new ByteBufferVendor( + getBufferPool().acquireDirectReceiveBuffer(packetBufferSize), + TRACKED_RECEIVER, + getBufferPool()); - int packetBufferSize = engine.getSession().getPacketBufferSize(); - if (inputBuffer == null || inputBuffer.capacity() < packetBufferSize) { - // TLS has a minimum input buffer size constraint - if (inputBuffer != null) { - getBufferPool().releaseReceiveBuffer(inputBuffer); - } - inputBuffer = getBufferPool().acquireDirectReceiveBuffer(packetBufferSize); - } if (channel.socket().getReceiveBufferSize() < packetBufferSize) { channel.socket().setReceiveBufferSize(packetBufferSize); } if (channel.socket().getSendBufferSize() < packetBufferSize) { channel.socket().setSendBufferSize(packetBufferSize); } - ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine, - getConduit().idleConnectionTimeout, clientSocket, inputBuffer, - getBufferPool()); + try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) { + final ByteBuffer inputBuffer = inputSharing.getBuffer(); + /* + * It's ok to share the inputBuffer with handshakeSSLSocketChannel() since that method + * accesses the referenced buffer for the handshake which completes before returning + * control here. The NioSslEngine retains no reference to the buffer. + */ + ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine, + getConduit().idleConnectionTimeout, inputBuffer, + getBufferPool()); + } } else { + final int allocSize; + if (recvBufferSize == -1) { + allocSize = owner.getConduit().tcpBufferSize; + } else { + allocSize = recvBufferSize; + } + + inputBufferVendor = + new ByteBufferVendor( + getBufferPool().acquireDirectReceiveBuffer(allocSize), + TRACKED_RECEIVER, + getBufferPool()); + ioFilter = new NioPlainEngine(getBufferPool()); } } @@ -1781,9 +1802,13 @@ public class Connection implements Runnable { } msg = msg.toLowerCase(); - return msg.contains("forcibly closed") - || msg.contains("reset by peer") - || msg.contains("connection reset"); + + if (e instanceof SSLException && msg.contains("status = closed")) { + return true; // engine has been closed - this is normal + } + + return (msg.contains("forcibly closed") || msg.contains("reset by peer") + || msg.contains("connection reset") || msg.contains("socket is closed")); } private static boolean validMsgType(int msgType) { @@ -2627,20 +2652,6 @@ public class Connection implements Runnable { } /** - * gets the buffer for receiving message length bytes - */ - private ByteBuffer getInputBuffer() { - if (inputBuffer == null) { - int allocSize = recvBufferSize; - if (allocSize == -1) { - allocSize = owner.getConduit().tcpBufferSize; - } - inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize); - } - return inputBuffer; - } - - /** * @throws SocketTimeoutException if wait expires. * @throws ConnectionException if ack is not received */ @@ -2747,72 +2758,93 @@ public class Connection implements Runnable { * deserialized and passed to TCPConduit for further processing */ private void processInputBuffer() throws ConnectionException, IOException { - inputBuffer.flip(); + try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) { + // can't be final because in some cases we expand the buffer (resulting in a new object) + ByteBuffer inputBuffer = inputSharing.getBuffer(); + inputBuffer.flip(); - try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) { - final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer(); + try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) { + final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer(); - peerDataBuffer.flip(); + peerDataBuffer.flip(); - boolean done = false; + boolean done = false; - while (!done && connected) { - owner.getConduit().getCancelCriterion().checkCancelInProgress(null); - int remaining = peerDataBuffer.remaining(); - if (lengthSet || remaining >= MSG_HEADER_BYTES) { - if (!lengthSet) { - if (readMessageHeader(peerDataBuffer)) { - break; + while (!done && connected) { + owner.getConduit().getCancelCriterion().checkCancelInProgress(null); + int remaining = peerDataBuffer.remaining(); + if (lengthSet || remaining >= MSG_HEADER_BYTES) { + if (!lengthSet) { + if (readMessageHeader(peerDataBuffer)) { + break; + } } - } - if (remaining >= messageLength + MSG_HEADER_BYTES) { - lengthSet = false; - peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES); - // don't trust the message deserialization to leave the position in - // the correct spot. Some of the serialization uses buffered - // streams that can leave the position at the wrong spot - int startPos = peerDataBuffer.position(); - int oldLimit = peerDataBuffer.limit(); - peerDataBuffer.limit(startPos + messageLength); - - if (handshakeRead) { - try { - readMessage(peerDataBuffer); - } catch (SerializationException e) { - logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit); - throw e; + if (remaining >= messageLength + MSG_HEADER_BYTES) { + lengthSet = false; + peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES); + // don't trust the message deserialization to leave the position in + // the correct spot. Some of the serialization uses buffered + // streams that can leave the position at the wrong spot + int startPos = peerDataBuffer.position(); + int oldLimit = peerDataBuffer.limit(); + peerDataBuffer.limit(startPos + messageLength); + + if (handshakeRead) { + try { + readMessage(peerDataBuffer); + } catch (SerializationException e) { + logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit); + throw e; + } + } else { + try (ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer); + DataInputStream dis = new DataInputStream(bbis)) { + if (!isReceiver) { + // we read the handshake and then stop processing since we don't want + // to process the input buffer anymore in a handshake thread + readHandshakeForSender(dis, peerDataBuffer); + return; + } + if (readHandshakeForReceiver(dis)) { + ioFilter.doneReading(peerDataBuffer); + return; + } + } } - } else { - ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer); - DataInputStream dis = new DataInputStream(bbis); - if (!isReceiver) { - // we read the handshake and then stop processing since we don't want - // to process the input buffer anymore in a handshake thread - readHandshakeForSender(dis, peerDataBuffer); - return; + if (!connected) { + continue; } - if (readHandshakeForReceiver(dis)) { + accessed(); + peerDataBuffer.limit(oldLimit); + peerDataBuffer.position(startPos + messageLength); + } else { + done = true; + if (getConduit().useSSL()) { ioFilter.doneReading(peerDataBuffer); - return; + } else { + // compact or resize the buffer + final int oldBufferSize = inputBuffer.capacity(); + final int allocSize = messageLength + MSG_HEADER_BYTES; + if (oldBufferSize < allocSize) { + // need a bigger buffer + logger.info( + "Allocating larger network read buffer, new size is {} old size was {}.", + allocSize, oldBufferSize); + inputBuffer = inputSharing.expandReadBufferIfNeeded(allocSize); + } else { + if (inputBuffer.position() != 0) { + inputBuffer.compact(); + } else { + inputBuffer.position(inputBuffer.limit()); + inputBuffer.limit(inputBuffer.capacity()); + } + } } } - if (!connected) { - continue; - } - accessed(); - peerDataBuffer.limit(oldLimit); - peerDataBuffer.position(startPos + messageLength); } else { + ioFilter.doneReading(peerDataBuffer); done = true; - if (getConduit().useSSL()) { - ioFilter.doneReading(peerDataBuffer); - } else { - compactOrResizeBuffer(messageLength); - } } - } else { - ioFilter.doneReading(peerDataBuffer); - done = true; } } } @@ -2947,10 +2979,9 @@ public class Connection implements Runnable { private void readMessage(ByteBuffer peerDataBuffer) { if (messageType == NORMAL_MSG_TYPE) { owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength); - ByteBufferInputStream bbis = + try (ByteBufferInputStream bbis = remoteVersion == null ? new ByteBufferInputStream(peerDataBuffer) - : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion); - try { + : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion)) { ReplyProcessor21.initMessageRPId(); // add serialization stats long startSer = owner.getConduit().getStats().startMsgDeserialization(); @@ -3203,34 +3234,8 @@ public class Connection implements Runnable { private void setThreadName(int dominoNumber) { Thread.currentThread().setName(THREAD_KIND_IDENTIFIER + " for " + remoteAddr + " " + (sharedResource ? "" : "un") + "shared" + " " + (preserveOrder ? "" : "un") - + "ordered" + " uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + dominoNumber : "") - + " port=" + socket.getPort()); - } - - private void compactOrResizeBuffer(int messageLength) { - final int oldBufferSize = inputBuffer.capacity(); - int allocSize = messageLength + MSG_HEADER_BYTES; - if (oldBufferSize < allocSize) { - // need a bigger buffer - logger.info("Allocating larger network read buffer, new size is {} old size was {}.", - allocSize, oldBufferSize); - ByteBuffer oldBuffer = inputBuffer; - inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize); - - if (oldBuffer != null) { - int oldByteCount = oldBuffer.remaining(); - inputBuffer.put(oldBuffer); - inputBuffer.position(oldByteCount); - getBufferPool().releaseReceiveBuffer(oldBuffer); - } - } else { - if (inputBuffer.position() != 0) { - inputBuffer.compact(); - } else { - inputBuffer.position(inputBuffer.limit()); - inputBuffer.limit(inputBuffer.capacity()); - } - } + + "ordered sender uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + dominoNumber : "") + + " local port=" + socket.getLocalPort() + " remote port=" + socket.getPort()); } private boolean dispatchMessage(DistributionMessage msg, int bytesRead, boolean directAck) @@ -3315,6 +3320,7 @@ public class Connection implements Runnable { * socket is properly closed at this end. When that is the case isResidualReaderThread * will return true. */ + @VisibleForTesting public boolean hasResidualReaderThread() { return hasResidualReaderThread; } diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java index 359d8aa..fe4f08b 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java @@ -32,11 +32,11 @@ import org.junit.Test; public class ByteBufferVendorTest { @FunctionalInterface - private static interface Foo { + private interface Foo { void run() throws IOException; } - private ByteBufferVendor sharing; + private ByteBufferVendor sharingVendor; private BufferPool poolMock; private CountDownLatch clientHasOpenedResource; private CountDownLatch clientMayComplete; @@ -44,7 +44,7 @@ public class ByteBufferVendorTest { @Before public void before() { poolMock = mock(BufferPool.class); - sharing = + sharingVendor = new ByteBufferVendor(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER, poolMock); clientHasOpenedResource = new CountDownLatch(1); @@ -54,7 +54,7 @@ public class ByteBufferVendorTest { @Test public void balancedCloseOwnerIsLastReferenceHolder() throws InterruptedException { resourceOwnerIsLastReferenceHolder("client with balanced close calls", () -> { - try (final ByteBufferSharing _unused = sharing.open()) { + try (final ByteBufferSharing _unused = sharingVendor.open()) { } }); } @@ -62,7 +62,7 @@ public class ByteBufferVendorTest { @Test public void extraCloseOwnerIsLastReferenceHolder() throws InterruptedException { resourceOwnerIsLastReferenceHolder("client with extra close calls", () -> { - final ByteBufferSharing sharing2 = sharing.open(); + final ByteBufferSharing sharing2 = sharingVendor.open(); sharing2.close(); verify(poolMock, times(0)).releaseBuffer(any(), any()); assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); @@ -73,7 +73,7 @@ public class ByteBufferVendorTest { @Test public void balancedCloseClientIsLastReferenceHolder() throws InterruptedException { clientIsLastReferenceHolder("client with balanced close calls", () -> { - try (final ByteBufferSharing _unused = sharing.open()) { + try (final ByteBufferSharing _unused = sharingVendor.open()) { clientHasOpenedResource.countDown(); blockClient(); } @@ -83,36 +83,42 @@ public class ByteBufferVendorTest { @Test public void extraCloseClientIsLastReferenceHolder() throws InterruptedException { clientIsLastReferenceHolder("client with extra close calls", () -> { - final ByteBufferSharing sharing2 = sharing.open(); + final ByteBufferSharing sharing2 = sharingVendor.open(); clientHasOpenedResource.countDown(); blockClient(); sharing2.close(); verify(poolMock, times(1)).releaseBuffer(any(), any()); assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); - System.out.println("here"); }); } @Test public void extraCloseDoesNotPrematurelyReturnBufferToPool() throws IOException { - final ByteBufferSharing sharing2 = sharing.open(); + final ByteBufferSharing sharing2 = sharingVendor.open(); sharing2.close(); assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); verify(poolMock, times(0)).releaseBuffer(any(), any()); - sharing.destruct(); + sharingVendor.destruct(); verify(poolMock, times(1)).releaseBuffer(any(), any()); } @Test public void extraCloseDoesNotDecrementRefCount() throws IOException { - final ByteBufferSharing sharing2 = sharing.open(); + final ByteBufferSharing sharing2 = sharingVendor.open(); sharing2.close(); assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); - final ByteBufferSharing sharing3 = this.sharing.open(); - sharing.destruct(); + final ByteBufferSharing sharing3 = this.sharingVendor.open(); + sharingVendor.destruct(); verify(poolMock, times(0)).releaseBuffer(any(), any()); } + @Test + public void destructIsIdempotent() { + sharingVendor.destruct(); + sharingVendor.destruct(); + verify(poolMock, times(1)).releaseBuffer(any(), any()); + } + private void resourceOwnerIsLastReferenceHolder(final String name, final Foo client) throws InterruptedException { /* @@ -128,7 +134,7 @@ public class ByteBufferVendorTest { verify(poolMock, times(0)).releaseBuffer(any(), any()); - sharing.destruct(); + sharingVendor.destruct(); verify(poolMock, times(1)).releaseBuffer(any(), any()); } @@ -147,7 +153,7 @@ public class ByteBufferVendorTest { clientHasOpenedResource.await(); - sharing.destruct(); + sharingVendor.destruct(); verify(poolMock, times(0)).releaseBuffer(any(), any()); diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java index 62a858c..d6b9aa6 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java @@ -90,7 +90,8 @@ public class NioSslEngineTest { @Test public void engineUsesDirectBuffers() throws IOException { - try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { + try (final ByteBufferSharing outputSharing = + nioSslEngine.getOutputBufferVendorForTestingOnly().open()) { assertThat(outputSharing.getBuffer().isDirect()).isTrue(); } } @@ -190,7 +191,8 @@ public class NioSslEngineTest { @Test public void wrap() throws Exception { - try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { + try (final ByteBufferSharing outputSharing = + nioSslEngine.getOutputBufferVendorForTestingOnly().open()) { // make the application data too big to fit into the engine's encryption buffer ByteBuffer appData = @@ -221,7 +223,8 @@ public class NioSslEngineTest { @Test public void wrapFails() throws IOException { - try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { + try (final ByteBufferSharing outputSharing = + nioSslEngine.getOutputBufferVendorForTestingOnly().open()) { // make the application data too big to fit into the engine's encryption buffer ByteBuffer appData = ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100); @@ -244,7 +247,8 @@ public class NioSslEngineTest { @Test public void unwrapWithBufferOverflow() throws Exception { - try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { + try (final ByteBufferSharing inputSharing = + nioSslEngine.getInputBufferVendorForTestingOnly().open()) { // make the application data too big to fit into the engine's encryption buffer final ByteBuffer peerAppData = inputSharing.getBuffer(); @@ -284,7 +288,8 @@ public class NioSslEngineTest { @Test public void unwrapWithBufferUnderflow() throws Exception { - try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { + try (final ByteBufferSharing inputSharing = + nioSslEngine.getInputBufferVendorForTestingOnly().open()) { ByteBuffer wrappedData = ByteBuffer.allocate(inputSharing.getBuffer().capacity()); byte[] netBytes = new byte[wrappedData.capacity() / 2]; @@ -309,7 +314,8 @@ public class NioSslEngineTest { @Test public void unwrapWithDecryptionError() throws IOException { - try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { + try (final ByteBufferSharing inputSharing = + nioSslEngine.getInputBufferVendorForTestingOnly().open()) { // make the application data too big to fit into the engine's encryption buffer ByteBuffer wrappedData = ByteBuffer.allocate(inputSharing.getBuffer().capacity()); @@ -368,10 +374,10 @@ public class NioSslEngineTest { when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn( new SSLEngineResult(CLOSED, FINISHED, 0, 0)); nioSslEngine.close(mockChannel); - assertThatThrownBy(() -> nioSslEngine.shareOutputBuffer().getBuffer()) + assertThatThrownBy(() -> nioSslEngine.getOutputBufferVendorForTestingOnly().open().getBuffer()) .isInstanceOf(IOException.class) .hasMessageContaining("NioSslEngine has been closed"); - assertThatThrownBy(() -> nioSslEngine.shareInputBuffer().getBuffer()) + assertThatThrownBy(() -> nioSslEngine.getInputBufferVendorForTestingOnly().open().getBuffer()) .isInstanceOf(IOException.class) .hasMessageContaining("NioSslEngine has been closed"); nioSslEngine.close(mockChannel); @@ -401,7 +407,8 @@ public class NioSslEngineTest { when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE); when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> { - try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { + try (final ByteBufferSharing outputSharing = + nioSslEngine.getOutputBufferVendorForTestingOnly().open()) { // give the NioSslEngine something to write on its socket channel, simulating a TLS close // message outputSharing.getBuffer().put("Goodbye cruel world".getBytes()); @@ -437,7 +444,8 @@ public class NioSslEngineTest { ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000); SocketChannel mockChannel = mock(SocketChannel.class); - try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { + try (final ByteBufferSharing inputSharing = + nioSslEngine.getInputBufferVendorForTestingOnly().open()) { // force a compaction by making the decoded buffer appear near to being full ByteBuffer unwrappedBuffer = inputSharing.getBuffer(); unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead); @@ -487,10 +495,7 @@ public class NioSslEngineTest { ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize); unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored - try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { - final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing; - inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer); - } + nioSslEngine.getInputBufferVendorForTestingOnly().setBufferForTestingOnly(unwrappedBuffer); // simulate some socket reads when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() { @@ -518,7 +523,8 @@ public class NioSslEngineTest { assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes); // The initial available space in the unwrapped buffer should have doubled int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes; - try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { + try (final ByteBufferSharing inputSharing = + nioSslEngine.getInputBufferVendorForTestingOnly().open()) { assertThat(inputSharing.getBuffer().capacity()) .isEqualTo(2 * initialFreeSpace + preexistingBytes); } @@ -544,10 +550,7 @@ public class NioSslEngineTest { // force buffer expansion by making a small decoded buffer appear near to being full ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize); unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored - try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { - final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing; - inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer); - } + nioSslEngine.getInputBufferVendorForTestingOnly().setBufferForTestingOnly(unwrappedBuffer); // simulate some socket reads when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() { diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java index c064afb..40f1ed3 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java @@ -103,6 +103,7 @@ public class ConnectionTest { TCPConduit tcpConduit = mock(TCPConduit.class); when(connectionTable.getConduit()).thenReturn(tcpConduit); + when(connectionTable.getBufferPool()).thenReturn(mock(BufferPool.class)); when(distributionConfig.getMemberTimeout()).thenReturn(100); when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress(getLocalHost(), 12345));