This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 1a8eb5aec580eb75871060793ea65d62f5f2d959
Author: Bill Burcham <bill.burc...@gmail.com>
AuthorDate: Sat Apr 17 09:12:13 2021 -0700

    GEODE-9141: (2 of 2) Handle in-buffer concurrency
    * Connection uses a ByteBufferVendor to mediate access to inputBuffer
    * Prevent return to pool before socket closer is finished
    
    (cherry picked from commit 9d0d4d1d33794d0f6a21c3bcae71e965cbbd7fbd)
    (cherry picked from commit 9e8b3972fcf449eed4d41c254cf3f553e517eaa1)
    (cherry picked from commit c4730deed48bb4513bd04486d4e8c09cdd3bb5a9)
---
 ...LSocketHostNameVerificationIntegrationTest.java |   6 +-
 .../internal/net/SSLSocketIntegrationTest.java     |   3 +-
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   2 +-
 .../geode/internal/net/ByteBufferSharing.java      |  15 +
 .../geode/internal/net/ByteBufferSharingNoOp.java  |   5 +
 .../geode/internal/net/ByteBufferVendor.java       | 144 ++++++---
 .../apache/geode/internal/net/NioSslEngine.java    |  50 ++-
 .../apache/geode/internal/net/SocketCreator.java   |   9 +-
 .../org/apache/geode/internal/tcp/Connection.java  | 334 +++++++++++----------
 .../geode/internal/net/ByteBufferVendorTest.java   |  36 ++-
 .../geode/internal/net/NioSslEngineTest.java       |  41 +--
 .../apache/geode/internal/tcp/ConnectionTest.java  |   1 +
 12 files changed, 361 insertions(+), 285 deletions(-)

diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
index a70f3b1..e86bfea 100755
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
@@ -103,6 +103,9 @@ public class SSLSocketHostNameVerificationIntegrationTest {
 
   @Before
   public void setUp() throws Exception {
+
+    SocketCreatorFactory.close(); // to clear socket creators made in previous 
tests
+
     IgnoredException.addIgnoredException("javax.net.ssl.SSLException: Read 
timed out");
 
     this.localHost = InetAddress.getLoopbackAddress();
@@ -172,7 +175,7 @@ public class SSLSocketHostNameVerificationIntegrationTest {
 
     try {
       this.socketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(),
-          sslEngine, 0, true,
+          sslEngine, 0,
           ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize()),
           new BufferPool(mock(DMStats.class)));
 
@@ -205,7 +208,6 @@ public class SSLSocketHostNameVerificationIntegrationTest {
             sc.handshakeSSLSocketChannel(socket.getChannel(),
                 sslEngine,
                 timeoutMillis,
-                false,
                 
ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize()),
                 new BufferPool(mock(DMStats.class)));
       } catch (Throwable throwable) {
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index e7ac191..13e9d5b 100755
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -217,7 +217,7 @@ public class SSLSocketIntegrationTest {
     clientSocket = clientChannel.socket();
     NioSslEngine engine =
         
clusterSocketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(),
-            clusterSocketCreator.createSSLEngine("localhost", 1234, true), 0, 
true,
+            clusterSocketCreator.createSSLEngine("localhost", 1234, true), 0,
             ByteBuffer.allocate(65535), new BufferPool(mock(DMStats.class)));
     clientChannel.configureBlocking(true);
 
@@ -267,7 +267,6 @@ public class SSLSocketIntegrationTest {
             sc.handshakeSSLSocketChannel(socket.getChannel(), 
sc.createSSLEngine("localhost", 1234,
                 false),
                 timeoutMillis,
-                false,
                 ByteBuffer.allocate(65535),
                 new BufferPool(mock(DMStats.class)));
 
diff --git 
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
 
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index af3bd1e..cd1af3a 100644
--- 
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ 
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -104,4 +104,4 @@ org/apache/geode/cache/query/internal/xml/ElementType
 org/apache/geode/cache/query/internal/xml/ElementType$1
 org/apache/geode/cache/query/internal/xml/ElementType$2
 org/apache/geode/cache/query/internal/xml/ElementType$3
-org/apache/geode/internal/net/ByteBufferSharingImpl$OpenAttemptTimedOut
\ No newline at end of file
+org/apache/geode/internal/net/ByteBufferVendor$OpenAttemptTimedOut
\ No newline at end of file
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
index cdfa897..c8a94ce 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
@@ -42,12 +42,27 @@ public interface ByteBufferSharing extends AutoCloseable {
    *
    * Subsequent calls to {@link #getBuffer()} will return that new buffer too.
    *
+   * This variant is for use when the buffer is being written to.
+   *
    * @return the same buffer or a different (bigger) buffer
    * @throws IOException if the buffer is no longer accessible
    */
   ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws 
IOException;
 
   /**
+   * Expand the buffer if needed. This may return a different object so be 
sure to pay attention to
+   * the return value if you need access to the potentially- expanded buffer.
+   *
+   * Subsequent calls to {@link #getBuffer()} will return that new buffer too.
+   *
+   * This variant is for use when the buffer is being read from.
+   *
+   * @return the same buffer or a different (bigger) buffer
+   * @throws IOException if the buffer is no longer accessible
+   */
+  ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws 
IOException;
+
+  /**
    * Override {@link AutoCloseable#close()} without throws clause since we 
don't need one.
    */
   @Override
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
 
b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
index 4a8bc49..4f36e5b 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
@@ -48,5 +48,10 @@ class ByteBufferSharingNoOp implements ByteBufferSharing {
   }
 
   @Override
+  public ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws 
IOException {
+    throw new UnsupportedOperationException("Can't expand buffer when using 
NioPlainEngine");
+  }
+
+  @Override
   public void close() {}
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
index 4933247..1dc74f0 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.net;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -27,49 +28,49 @@ import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.internal.net.BufferPool.BufferType;
 
 /**
- * An {@link AutoCloseable} meant to be acquired in a try-with-resources 
statement. The resource (a
- * {@link ByteBuffer}) is available (for reading and modification) in the 
scope of the
+ * Produces (via {@link #open()}) an {@link ByteBufferSharing} meant to used 
only within a
+ * try-with-resources block. The resource controls access to a secondary 
resource
+ * (via {@link ByteBufferSharing#getBuffer()}) within the scope of 
try-with-resources.
+ * Neither the object returned by {@link #open()}, nor the object returned by 
invoking
+ * {@link ByteBufferSharing#getBuffer()} on that object may be used outside 
the scope of
  * try-with-resources.
  */
-class ByteBufferVendor implements ByteBufferSharing {
+public class ByteBufferVendor {
 
   static class OpenAttemptTimedOut extends Exception {
   }
 
-  private final Lock lock;
-  private final AtomicBoolean isDestructed;
-  // mutable because in general our ByteBuffer may need to be resized (grown 
or compacted)
-  private volatile ByteBuffer buffer;
-  private final BufferType bufferType;
-  private final AtomicInteger counter;
-  private final BufferPool bufferPool;
+  private interface ByteBufferSharingInternal extends ByteBufferSharing {
+    void releaseBuffer();
+  }
 
-  /**
-   * This constructor is for use only by the owner of the shared resource (a 
{@link ByteBuffer}).
+  private final Lock lock = new ReentrantLock();
+  private final AtomicBoolean isDestructed = new AtomicBoolean(false);
+  private final AtomicInteger counter = new AtomicInteger(1);
+  // the object referenced by sharing is guarded by lock
+  private final ByteBufferSharingInternal sharing;
+
+  /*
+   * These constructors are for use only by the owner of the shared resource.
    *
    * A resource owner must invoke {@link #open()} once for each reference that 
escapes (is passed
    * to an external object or is returned to an external caller.)
    *
-   * This constructor acquires no lock. The reference count will be 1 after 
this constructor
+   * Constructors acquire no locks. The reference count will be 1 after a 
constructor
    * completes.
    */
-  ByteBufferVendor(final ByteBuffer buffer, final BufferType bufferType,
-                   final BufferPool bufferPool) {
-    this.buffer = buffer;
-    this.bufferType = bufferType;
-    this.bufferPool = bufferPool;
-    lock = new ReentrantLock();
-    counter = new AtomicInteger(1);
-    isDestructed = new AtomicBoolean(false);
-  }
 
   /**
-   * The destructor. Called by the resource owner to undo the work of the 
constructor.
+   * When you have a ByteBuffer available before construction, use this 
constructor.
+   *
+   * @param bufferArg is the ByteBuffer
+   * @param bufferType needed for freeing the buffer later
+   * @param bufferPool needed for freeing the buffer later
    */
-  void destruct() {
-    if (isDestructed.compareAndSet(false, true)) {
-      dropReference();
-    }
+  public ByteBufferVendor(final ByteBuffer bufferArg,
+      final BufferType bufferType,
+      final BufferPool bufferPool) {
+    sharing = new ByteBufferSharingInternalImpl(bufferArg, bufferType, 
bufferPool);
   }
 
   /**
@@ -78,18 +79,19 @@ class ByteBufferVendor implements ByteBufferSharing {
    *
    * Resource owners call this method as the last thing before returning a 
reference to the caller.
    * That caller binds that reference to a variable in a try-with-resources 
statement and relies on
-   * the AutoCloseable protocol to invoke {@link #close()} on the object at 
the end of the block.
+   * the AutoCloseable protocol to invoke {@link AutoCloseable#close()} on the 
object at
+   * the end of the block.
    */
-  ByteBufferSharing open() throws IOException {
+  public ByteBufferSharing open() throws IOException {
     lock.lock();
     addReferenceAfterLock();
-    return this;
+    return sharing;
   }
 
   /**
    * This variant throws {@link OpenAttemptTimedOut} if it can't acquire the 
lock in time.
    */
-  ByteBufferSharing open(final long time, final TimeUnit unit)
+  public ByteBufferSharing open(final long time, final TimeUnit unit)
       throws OpenAttemptTimedOut, IOException {
     try {
       if (!lock.tryLock(time, unit)) {
@@ -100,24 +102,25 @@ class ByteBufferVendor implements ByteBufferSharing {
       throw new OpenAttemptTimedOut();
     }
     addReferenceAfterLock();
-    return this;
+    return sharing;
   }
 
-  @Override
-  public ByteBuffer getBuffer() throws IOException {
-    if (isDestructed.get()) {
-      throwClosed();
+  /**
+   * The destructor. Called by the resource owner to undo the work of the 
constructor.
+   */
+  public void destruct() {
+    if (isDestructed.compareAndSet(false, true)) {
+      dropReference();
     }
-    return buffer;
   }
 
-  @Override
-  public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws 
IOException {
-    return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, 
getBuffer(), newCapacity);
+  private void exposingResource() throws IOException {
+    if (isDestructed.get()) {
+      throwClosed();
+    }
   }
 
-  @Override
-  public void close() {
+  private void close() {
     /*
      * We are counting on our ReentrantLock throwing an exception if the 
current thread
      * does not hold the lock. In that case dropReference() will not be 
called. This
@@ -142,16 +145,11 @@ class ByteBufferVendor implements ByteBufferSharing {
   private int dropReference() {
     final int usages = counter.decrementAndGet();
     if (usages == 0) {
-      bufferPool.releaseBuffer(bufferType, buffer);
+      sharing.releaseBuffer();
     }
     return usages;
   }
 
-  @VisibleForTesting
-  public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) {
-    buffer = newBufferForTesting;
-  }
-
   private void addReferenceAfterLock() throws IOException {
     try {
       addReference();
@@ -165,4 +163,54 @@ class ByteBufferVendor implements ByteBufferSharing {
     throw new IOException("NioSslEngine has been closed");
   }
 
+  private class ByteBufferSharingInternalImpl implements 
ByteBufferSharingInternal {
+
+    /*
+     * mutable because in general our ByteBuffer may need to be resized (grown 
or compacted)
+     * no concurrency concerns since ByteBufferSharingNotNull is guarded by 
ByteBufferVendor.lock
+     */
+    private ByteBuffer buffer;
+    private final BufferType bufferType;
+    private final BufferPool bufferPool;
+
+    public ByteBufferSharingInternalImpl(final ByteBuffer buffer,
+        final BufferType bufferType,
+        final BufferPool bufferPool) {
+      Objects.requireNonNull(buffer);
+      this.buffer = buffer;
+      this.bufferType = bufferType;
+      this.bufferPool = bufferPool;
+    }
+
+    @Override
+    public ByteBuffer getBuffer() throws IOException {
+      exposingResource();
+      return buffer;
+    }
+
+    @Override
+    public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws 
IOException {
+      return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, 
getBuffer(), newCapacity);
+    }
+
+    @Override
+    public ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws 
IOException {
+      return buffer = bufferPool.expandReadBufferIfNeeded(bufferType, 
getBuffer(), newCapacity);
+    }
+
+    @Override
+    public void close() {
+      ByteBufferVendor.this.close();
+    }
+
+    @Override
+    public void releaseBuffer() {
+      bufferPool.releaseBuffer(bufferType, buffer);
+    }
+  }
+
+  @VisibleForTesting
+  public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) {
+    ((ByteBufferSharingInternalImpl) sharing).buffer = newBufferForTesting;
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index fc91a31..4c603a0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -62,12 +62,12 @@ public class NioSslEngine implements NioFilter {
   /**
    * holds bytes wrapped by the SSLEngine; a.k.a. myNetData
    */
-  private final ByteBufferVendor outputSharing;
+  private final ByteBufferVendor outputBufferVendor;
 
   /**
    * holds the last unwrapped data from a peer; a.k.a. peerAppData
    */
-  private final ByteBufferVendor inputSharing;
+  private final ByteBufferVendor inputBufferVendor;
 
   NioSslEngine(SSLEngine engine, BufferPool bufferPool) {
     SSLSession session = engine.getSession();
@@ -76,10 +76,10 @@ public class NioSslEngine implements NioFilter {
     closed = false;
     this.engine = engine;
     this.bufferPool = bufferPool;
-    outputSharing =
+    outputBufferVendor =
         new 
ByteBufferVendor(bufferPool.acquireDirectSenderBuffer(packetBufferSize),
             TRACKED_SENDER, bufferPool);
-    inputSharing =
+    inputBufferVendor =
         new 
ByteBufferVendor(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize),
             TRACKED_RECEIVER, bufferPool);
   }
@@ -98,7 +98,7 @@ public class NioSslEngine implements NioFilter {
           peerNetData.capacity(), engine.getSession().getPacketBufferSize()));
     }
 
-    ByteBuffer handshakeBuffer = peerNetData;
+    final ByteBuffer handshakeBuffer = peerNetData;
     handshakeBuffer.clear();
 
     ByteBuffer myAppData = ByteBuffer.wrap(new byte[0]);
@@ -135,7 +135,7 @@ public class NioSslEngine implements NioFilter {
 
       switch (status) {
         case NEED_UNWRAP:
-          try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
+          try (final ByteBufferSharing inputSharing = 
inputBufferVendor.open()) {
             final ByteBuffer peerAppData = inputSharing.getBuffer();
 
             // Receive handshaking data from peer
@@ -162,7 +162,7 @@ public class NioSslEngine implements NioFilter {
           }
 
         case NEED_WRAP:
-          try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
+          try (final ByteBufferSharing outputSharing = 
outputBufferVendor.open()) {
             final ByteBuffer myNetData = outputSharing.getBuffer();
 
             // Empty the local network packet buffer.
@@ -231,7 +231,7 @@ public class NioSslEngine implements NioFilter {
 
   @Override
   public ByteBufferSharing wrap(ByteBuffer appData) throws IOException {
-    try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
+    try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) {
 
       ByteBuffer myNetData = outputSharing.getBuffer();
 
@@ -260,13 +260,13 @@ public class NioSslEngine implements NioFilter {
 
       myNetData.flip();
 
-      return shareOutputBuffer();
+      return outputBufferVendor.open();
     }
   }
 
   @Override
   public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException 
{
-    try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
+    try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
 
       ByteBuffer peerAppData = inputSharing.getBuffer();
 
@@ -292,7 +292,7 @@ public class NioSslEngine implements NioFilter {
             // partial data - need to read more. When this happens the 
SSLEngine will not have
             // changed the buffer position
             wrappedBuffer.compact();
-            return shareInputBuffer();
+            return inputBufferVendor.open();
           case OK:
             break;
           default:// if there is data in the decrypted buffer return it. 
Otherwise signal that we're
@@ -305,7 +305,7 @@ public class NioSslEngine implements NioFilter {
         }
       }
       wrappedBuffer.clear();
-      return shareInputBuffer();
+      return inputBufferVendor.open();
     }
   }
 
@@ -325,7 +325,7 @@ public class NioSslEngine implements NioFilter {
   @Override
   public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes,
       ByteBuffer wrappedBuffer) throws IOException {
-    try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
+    try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
 
       ByteBuffer peerAppData = inputSharing.getBuffer();
 
@@ -355,13 +355,13 @@ public class NioSslEngine implements NioFilter {
           }
         }
       }
-      return shareInputBuffer();
+      return inputBufferVendor.open();
     }
   }
 
   @Override
   public ByteBufferSharing getUnwrappedBuffer() throws IOException {
-    return shareInputBuffer();
+    return inputBufferVendor.open();
   }
 
   @Override
@@ -377,8 +377,8 @@ public class NioSslEngine implements NioFilter {
       return;
     }
     closed = true;
-    inputSharing.destruct();
-    try (final ByteBufferSharing outputSharing = shareOutputBuffer(1, 
TimeUnit.MINUTES)) {
+    inputBufferVendor.destruct();
+    try (final ByteBufferSharing outputSharing = outputBufferVendor.open(1, 
TimeUnit.MINUTES)) {
       final ByteBuffer myNetData = outputSharing.getBuffer();
 
       if (!engine.isOutboundDone()) {
@@ -412,7 +412,7 @@ public class NioSslEngine implements NioFilter {
         engine.closeOutbound();
       }
     } finally {
-      outputSharing.destruct();
+      outputBufferVendor.destruct();
     }
   }
 
@@ -422,16 +422,12 @@ public class NioSslEngine implements NioFilter {
   }
 
   @VisibleForTesting
-  public ByteBufferSharing shareOutputBuffer() throws IOException {
-    return outputSharing.open();
+  public ByteBufferVendor getOutputBufferVendorForTestingOnly() throws 
IOException {
+    return outputBufferVendor;
   }
 
-  private ByteBufferSharing shareOutputBuffer(final long time, final TimeUnit 
unit)
-      throws OpenAttemptTimedOut, IOException {
-    return outputSharing.open(time, unit);
-  }
-
-  public ByteBufferSharing shareInputBuffer() throws IOException {
-    return inputSharing.open();
+  @VisibleForTesting
+  public ByteBufferVendor getInputBufferVendorForTestingOnly() throws 
IOException {
+    return inputBufferVendor;
   }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
index a232fca..a31bbb2 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
@@ -803,21 +803,16 @@ public class SocketCreator extends TcpSocketCreatorImpl {
    * @param socketChannel the socket's NIO channel
    * @param engine the sslEngine (see createSSLEngine)
    * @param timeout handshake timeout in milliseconds. No timeout if <= 0
-   * @param clientSocket set to true if you initiated the connect(), false if 
you accepted it
    * @param peerNetBuffer the buffer to use in reading data fron 
socketChannel. This should also be
    *        used in subsequent I/O operations
    * @return The SSLEngine to be used in processing data for sending/receiving 
from the channel
    */
-  public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel, 
SSLEngine engine,
+  public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel,
+      SSLEngine engine,
       int timeout,
-      boolean clientSocket,
       ByteBuffer peerNetBuffer,
       BufferPool bufferPool)
       throws IOException {
-    engine.setUseClientMode(clientSocket);
-    if (!clientSocket) {
-      engine.setNeedClientAuth(sslConfig.isRequireAuth());
-    }
     while (!socketChannel.finishConnect()) {
       try {
         Thread.sleep(50);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index f5a1886..107aa9f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -18,6 +18,7 @@ import static java.lang.Boolean.FALSE;
 import static java.lang.ThreadLocal.withInitial;
 import static 
org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
 import static 
org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
+import static 
org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_RECEIVER;
 import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX;
 
 import java.io.DataInput;
@@ -79,6 +80,7 @@ import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.SystemTimer.SystemTimerTask;
 import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.ByteBufferSharing;
+import org.apache.geode.internal.net.ByteBufferVendor;
 import org.apache.geode.internal.net.NioFilter;
 import org.apache.geode.internal.net.NioPlainEngine;
 import org.apache.geode.internal.net.SocketCreator;
@@ -117,7 +119,7 @@ public class Connection implements Runnable {
    * Small buffer used for send socket buffer on receiver connections and 
receive buffer on sender
    * connections.
    */
-  static final int SMALL_BUFFER_SIZE =
+  public static final int SMALL_BUFFER_SIZE =
       Integer.getInteger(GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096);
 
   /**
@@ -309,11 +311,14 @@ public class Connection implements Runnable {
   /** name of thread that we're currently performing an operation in (may be 
null) */
   private String ackThreadName;
 
-  /** the buffer used for message receipt */
-  private ByteBuffer inputBuffer;
-
-  /** Lock used to protect the input buffer */
-  public final Object inputBufferLock = new Object();
+  /*
+   * This object mediates access to the input ByteBuffer and ensures its 
return to
+   * pool after last use. This reference couldn't be final since it is 
initialized
+   * in createIoFilter() not in the constructors. It had to be initialized 
there
+   * because in general we have to construct an SSLEngine before we know the 
buffer
+   * size and createIoFilter() is where we create that object.
+   */
+  private ByteBufferVendor inputBufferVendor;
 
   /** the length of the next message to be dispatched */
   private int messageLength;
@@ -889,27 +894,28 @@ public class Connection implements Runnable {
     waitForAddressCompletion();
 
     InternalDistributedMember myAddr = owner.getConduit().getMemberId();
-    final MsgOutputStream connectHandshake = new 
MsgOutputStream(CONNECT_HANDSHAKE_SIZE);
-    /*
-     * Note a byte of zero is always written because old products serialized a 
member id with always
-     * sends an ip address. My reading of the ip-address specs indicated that 
the first byte of a
-     * valid address would never be 0.
-     */
-    connectHandshake.writeByte(0);
-    connectHandshake.writeByte(HANDSHAKE_VERSION);
-    // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
-    InternalDataSerializer.invokeToData(myAddr, connectHandshake);
-    connectHandshake.writeBoolean(sharedResource);
-    connectHandshake.writeBoolean(preserveOrder);
-    connectHandshake.writeLong(uniqueId);
-    // write the product version ordinal
-    Version.CURRENT.writeOrdinal(connectHandshake, true);
-    connectHandshake.writeInt(dominoCount.get() + 1);
-    // this writes the sending member + thread name that is stored in 
senderName
-    // on the receiver to show the cause of reader thread creation
-    connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, 
OperationExecutors.STANDARD_EXECUTOR,
-        MsgIdGenerator.NO_MSG_ID);
-    writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), 
false, null);
+    try (final MsgOutputStream connectHandshake = new 
MsgOutputStream(CONNECT_HANDSHAKE_SIZE)) {
+      /*
+       * Note a byte of zero is always written because old products serialized 
a member id with
+       * always sends an ip address. My reading of the ip-address specs 
indicated that the first
+       * byte of a valid address would never be 0.
+       */
+      connectHandshake.writeByte(0);
+      connectHandshake.writeByte(HANDSHAKE_VERSION);
+      // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
+      InternalDataSerializer.invokeToData(myAddr, connectHandshake);
+      connectHandshake.writeBoolean(sharedResource);
+      connectHandshake.writeBoolean(preserveOrder);
+      connectHandshake.writeLong(uniqueId);
+      // write the product version ordinal
+      Version.CURRENT.writeOrdinal(connectHandshake, true);
+      connectHandshake.writeInt(dominoCount.get() + 1);
+      // this writes the sending member + thread name that is stored in 
senderName
+      // on the receiver to show the cause of reader thread creation
+      connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, 
OperationExecutors.STANDARD_EXECUTOR,
+          MsgIdGenerator.NO_MSG_ID);
+      writeFully(getSocket().getChannel(), 
connectHandshake.getContentBuffer(), false, null);
+    }
   }
 
   /**
@@ -1353,7 +1359,7 @@ public class Connection implements Runnable {
         if (!isReceiver && !hasResidualReaderThread()) {
           // receivers release the input buffer when exiting run(). Senders 
use the
           // inputBuffer for reading direct-reply responses
-          releaseInputBuffer();
+          inputBufferVendor.destruct();
         }
         lengthSet = false;
       }
@@ -1494,7 +1500,7 @@ public class Connection implements Runnable {
         }
       }
 
-      releaseInputBuffer();
+      inputBufferVendor.destruct();
 
       // make sure that if the reader thread exits we notify a thread waiting 
for the handshake.
       notifyHandshakeWaiter(false);
@@ -1506,16 +1512,6 @@ public class Connection implements Runnable {
     }
   }
 
-  private void releaseInputBuffer() {
-    synchronized (inputBufferLock) {
-      ByteBuffer tmp = inputBuffer;
-      if (tmp != null) {
-        inputBuffer = null;
-        getBufferPool().releaseReceiveBuffer(tmp);
-      }
-    }
-  }
-
   BufferPool getBufferPool() {
     return owner.getBufferPool();
   }
@@ -1537,9 +1533,6 @@ public class Connection implements Runnable {
   }
 
   private void readMessages() {
-    if (closing.get()) {
-      return;
-    }
     // take a snapshot of uniqueId to detect reconnect attempts
     SocketChannel channel;
     try {
@@ -1585,8 +1578,6 @@ public class Connection implements Runnable {
     // we should not change the state of the connection if we are a handshake 
reader thread
     // as there is a race between this thread and the application thread doing 
direct ack
     boolean handshakeHasBeenRead = false;
-    // if we're using SSL/TLS the input buffer may already have data to process
-    boolean skipInitialRead = getInputBuffer().position() > 0;
     try {
       for (boolean isInitialRead = true;;) {
         if (stopped) {
@@ -1606,8 +1597,9 @@ public class Connection implements Runnable {
           break;
         }
 
-        try {
-          ByteBuffer buff = getInputBuffer();
+        try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
+          ByteBuffer buff = inputSharing.getBuffer();
+
           synchronized (stateLock) {
             connectionState = STATE_READING;
           }
@@ -1616,6 +1608,8 @@ public class Connection implements Runnable {
             amountRead = channel.read(buff);
           } else {
             isInitialRead = false;
+            // if we're using SSL/TLS the input buffer may already have data 
to process
+            final boolean skipInitialRead = buff.position() > 0;
             if (!skipInitialRead) {
               amountRead = channel.read(buff);
             } else {
@@ -1680,11 +1674,11 @@ public class Connection implements Runnable {
         } catch (IOException e) {
           // "Socket closed" check needed for Solaris jdk 1.4.2_08
           if (!isSocketClosed() && !"Socket 
closed".equalsIgnoreCase(e.getMessage())) {
-            if (logger.isDebugEnabled() && !isIgnorableIOException(e)) {
-              logger.debug("{} io exception for {}", p2pReaderName(), this, e);
+            if (logger.isInfoEnabled() && !isIgnorableIOException(e)) {
+              logger.info("{} io exception for {}", p2pReaderName(), this, e);
             }
-            if (e.getMessage().contains("interrupted by a call to 
WSACancelBlockingCall")) {
-              if (logger.isDebugEnabled()) {
+            if (logger.isDebugEnabled()) {
+              if (e.getMessage().contains("interrupted by a call to 
WSACancelBlockingCall")) {
                 logger.debug(
                     "{} received unexpected WSACancelBlockingCall exception, 
which may result in a hang",
                     p2pReaderName());
@@ -1728,28 +1722,55 @@ public class Connection implements Runnable {
   private void createIoFilter(SocketChannel channel, boolean clientSocket) 
throws IOException {
     if (getConduit().useSSL() && channel != null) {
       InetSocketAddress address = (InetSocketAddress) 
channel.getRemoteAddress();
+      String hostName;
+      if (remoteAddr != null) {
+        hostName = remoteAddr.getHostName();
+      } else {
+        hostName = SocketCreator.getHostName(address.getAddress());
+      }
       SSLEngine engine =
-          
getConduit().getSocketCreator().createSSLEngine(address.getHostName(), 
address.getPort(),
-              clientSocket);
+          getConduit().getSocketCreator().createSSLEngine(hostName,
+              address.getPort(), clientSocket);
+
+      final int packetBufferSize = engine.getSession().getPacketBufferSize();
+
+      inputBufferVendor =
+          new ByteBufferVendor(
+              getBufferPool().acquireDirectReceiveBuffer(packetBufferSize),
+              TRACKED_RECEIVER,
+              getBufferPool());
 
-      int packetBufferSize = engine.getSession().getPacketBufferSize();
-      if (inputBuffer == null || inputBuffer.capacity() < packetBufferSize) {
-        // TLS has a minimum input buffer size constraint
-        if (inputBuffer != null) {
-          getBufferPool().releaseReceiveBuffer(inputBuffer);
-        }
-        inputBuffer = 
getBufferPool().acquireDirectReceiveBuffer(packetBufferSize);
-      }
       if (channel.socket().getReceiveBufferSize() < packetBufferSize) {
         channel.socket().setReceiveBufferSize(packetBufferSize);
       }
       if (channel.socket().getSendBufferSize() < packetBufferSize) {
         channel.socket().setSendBufferSize(packetBufferSize);
       }
-      ioFilter = 
getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine,
-          getConduit().idleConnectionTimeout, clientSocket, inputBuffer,
-          getBufferPool());
+      try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
+        final ByteBuffer inputBuffer = inputSharing.getBuffer();
+        /*
+         * It's ok to share the inputBuffer with handshakeSSLSocketChannel() 
since that method
+         * accesses the referenced buffer for the handshake which completes 
before returning
+         * control here. The NioSslEngine retains no reference to the buffer.
+         */
+        ioFilter = 
getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine,
+            getConduit().idleConnectionTimeout, inputBuffer,
+            getBufferPool());
+      }
     } else {
+      final int allocSize;
+      if (recvBufferSize == -1) {
+        allocSize = owner.getConduit().tcpBufferSize;
+      } else {
+        allocSize = recvBufferSize;
+      }
+
+      inputBufferVendor =
+          new ByteBufferVendor(
+              getBufferPool().acquireDirectReceiveBuffer(allocSize),
+              TRACKED_RECEIVER,
+              getBufferPool());
+
       ioFilter = new NioPlainEngine(getBufferPool());
     }
   }
@@ -1781,9 +1802,13 @@ public class Connection implements Runnable {
     }
 
     msg = msg.toLowerCase();
-    return msg.contains("forcibly closed")
-        || msg.contains("reset by peer")
-        || msg.contains("connection reset");
+
+    if (e instanceof SSLException && msg.contains("status = closed")) {
+      return true; // engine has been closed - this is normal
+    }
+
+    return (msg.contains("forcibly closed") || msg.contains("reset by peer")
+        || msg.contains("connection reset") || msg.contains("socket is 
closed"));
   }
 
   private static boolean validMsgType(int msgType) {
@@ -2627,20 +2652,6 @@ public class Connection implements Runnable {
   }
 
   /**
-   * gets the buffer for receiving message length bytes
-   */
-  private ByteBuffer getInputBuffer() {
-    if (inputBuffer == null) {
-      int allocSize = recvBufferSize;
-      if (allocSize == -1) {
-        allocSize = owner.getConduit().tcpBufferSize;
-      }
-      inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize);
-    }
-    return inputBuffer;
-  }
-
-  /**
    * @throws SocketTimeoutException if wait expires.
    * @throws ConnectionException if ack is not received
    */
@@ -2747,72 +2758,93 @@ public class Connection implements Runnable {
    * deserialized and passed to TCPConduit for further processing
    */
   private void processInputBuffer() throws ConnectionException, IOException {
-    inputBuffer.flip();
+    try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
+      // can't be final because in some cases we expand the buffer (resulting 
in a new object)
+      ByteBuffer inputBuffer = inputSharing.getBuffer();
+      inputBuffer.flip();
 
-    try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) {
-      final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer();
+      try (final ByteBufferSharing sharedBuffer = 
ioFilter.unwrap(inputBuffer)) {
+        final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer();
 
-      peerDataBuffer.flip();
+        peerDataBuffer.flip();
 
-      boolean done = false;
+        boolean done = false;
 
-      while (!done && connected) {
-        owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
-        int remaining = peerDataBuffer.remaining();
-        if (lengthSet || remaining >= MSG_HEADER_BYTES) {
-          if (!lengthSet) {
-            if (readMessageHeader(peerDataBuffer)) {
-              break;
+        while (!done && connected) {
+          owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+          int remaining = peerDataBuffer.remaining();
+          if (lengthSet || remaining >= MSG_HEADER_BYTES) {
+            if (!lengthSet) {
+              if (readMessageHeader(peerDataBuffer)) {
+                break;
+              }
             }
-          }
-          if (remaining >= messageLength + MSG_HEADER_BYTES) {
-            lengthSet = false;
-            peerDataBuffer.position(peerDataBuffer.position() + 
MSG_HEADER_BYTES);
-            // don't trust the message deserialization to leave the position in
-            // the correct spot. Some of the serialization uses buffered
-            // streams that can leave the position at the wrong spot
-            int startPos = peerDataBuffer.position();
-            int oldLimit = peerDataBuffer.limit();
-            peerDataBuffer.limit(startPos + messageLength);
-
-            if (handshakeRead) {
-              try {
-                readMessage(peerDataBuffer);
-              } catch (SerializationException e) {
-                logger.info("input buffer startPos {} oldLimit {}", startPos, 
oldLimit);
-                throw e;
+            if (remaining >= messageLength + MSG_HEADER_BYTES) {
+              lengthSet = false;
+              peerDataBuffer.position(peerDataBuffer.position() + 
MSG_HEADER_BYTES);
+              // don't trust the message deserialization to leave the position 
in
+              // the correct spot. Some of the serialization uses buffered
+              // streams that can leave the position at the wrong spot
+              int startPos = peerDataBuffer.position();
+              int oldLimit = peerDataBuffer.limit();
+              peerDataBuffer.limit(startPos + messageLength);
+
+              if (handshakeRead) {
+                try {
+                  readMessage(peerDataBuffer);
+                } catch (SerializationException e) {
+                  logger.info("input buffer startPos {} oldLimit {}", 
startPos, oldLimit);
+                  throw e;
+                }
+              } else {
+                try (ByteBufferInputStream bbis = new 
ByteBufferInputStream(peerDataBuffer);
+                    DataInputStream dis = new DataInputStream(bbis)) {
+                  if (!isReceiver) {
+                    // we read the handshake and then stop processing since we 
don't want
+                    // to process the input buffer anymore in a handshake 
thread
+                    readHandshakeForSender(dis, peerDataBuffer);
+                    return;
+                  }
+                  if (readHandshakeForReceiver(dis)) {
+                    ioFilter.doneReading(peerDataBuffer);
+                    return;
+                  }
+                }
               }
-            } else {
-              ByteBufferInputStream bbis = new 
ByteBufferInputStream(peerDataBuffer);
-              DataInputStream dis = new DataInputStream(bbis);
-              if (!isReceiver) {
-                // we read the handshake and then stop processing since we 
don't want
-                // to process the input buffer anymore in a handshake thread
-                readHandshakeForSender(dis, peerDataBuffer);
-                return;
+              if (!connected) {
+                continue;
               }
-              if (readHandshakeForReceiver(dis)) {
+              accessed();
+              peerDataBuffer.limit(oldLimit);
+              peerDataBuffer.position(startPos + messageLength);
+            } else {
+              done = true;
+              if (getConduit().useSSL()) {
                 ioFilter.doneReading(peerDataBuffer);
-                return;
+              } else {
+                // compact or resize the buffer
+                final int oldBufferSize = inputBuffer.capacity();
+                final int allocSize = messageLength + MSG_HEADER_BYTES;
+                if (oldBufferSize < allocSize) {
+                  // need a bigger buffer
+                  logger.info(
+                      "Allocating larger network read buffer, new size is {} 
old size was {}.",
+                      allocSize, oldBufferSize);
+                  inputBuffer = 
inputSharing.expandReadBufferIfNeeded(allocSize);
+                } else {
+                  if (inputBuffer.position() != 0) {
+                    inputBuffer.compact();
+                  } else {
+                    inputBuffer.position(inputBuffer.limit());
+                    inputBuffer.limit(inputBuffer.capacity());
+                  }
+                }
               }
             }
-            if (!connected) {
-              continue;
-            }
-            accessed();
-            peerDataBuffer.limit(oldLimit);
-            peerDataBuffer.position(startPos + messageLength);
           } else {
+            ioFilter.doneReading(peerDataBuffer);
             done = true;
-            if (getConduit().useSSL()) {
-              ioFilter.doneReading(peerDataBuffer);
-            } else {
-              compactOrResizeBuffer(messageLength);
-            }
           }
-        } else {
-          ioFilter.doneReading(peerDataBuffer);
-          done = true;
         }
       }
     }
@@ -2947,10 +2979,9 @@ public class Connection implements Runnable {
   private void readMessage(ByteBuffer peerDataBuffer) {
     if (messageType == NORMAL_MSG_TYPE) {
       owner.getConduit().getStats().incMessagesBeingReceived(true, 
messageLength);
-      ByteBufferInputStream bbis =
+      try (ByteBufferInputStream bbis =
           remoteVersion == null ? new ByteBufferInputStream(peerDataBuffer)
-              : new VersionedByteBufferInputStream(peerDataBuffer, 
remoteVersion);
-      try {
+              : new VersionedByteBufferInputStream(peerDataBuffer, 
remoteVersion)) {
         ReplyProcessor21.initMessageRPId();
         // add serialization stats
         long startSer = 
owner.getConduit().getStats().startMsgDeserialization();
@@ -3203,34 +3234,8 @@ public class Connection implements Runnable {
   private void setThreadName(int dominoNumber) {
     Thread.currentThread().setName(THREAD_KIND_IDENTIFIER + " for " + 
remoteAddr + " "
         + (sharedResource ? "" : "un") + "shared" + " " + (preserveOrder ? "" 
: "un")
-        + "ordered" + " uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + 
dominoNumber : "")
-        + " port=" + socket.getPort());
-  }
-
-  private void compactOrResizeBuffer(int messageLength) {
-    final int oldBufferSize = inputBuffer.capacity();
-    int allocSize = messageLength + MSG_HEADER_BYTES;
-    if (oldBufferSize < allocSize) {
-      // need a bigger buffer
-      logger.info("Allocating larger network read buffer, new size is {} old 
size was {}.",
-          allocSize, oldBufferSize);
-      ByteBuffer oldBuffer = inputBuffer;
-      inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize);
-
-      if (oldBuffer != null) {
-        int oldByteCount = oldBuffer.remaining();
-        inputBuffer.put(oldBuffer);
-        inputBuffer.position(oldByteCount);
-        getBufferPool().releaseReceiveBuffer(oldBuffer);
-      }
-    } else {
-      if (inputBuffer.position() != 0) {
-        inputBuffer.compact();
-      } else {
-        inputBuffer.position(inputBuffer.limit());
-        inputBuffer.limit(inputBuffer.capacity());
-      }
-    }
+        + "ordered sender uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + 
dominoNumber : "")
+        + " local port=" + socket.getLocalPort() + " remote port=" + 
socket.getPort());
   }
 
   private boolean dispatchMessage(DistributionMessage msg, int bytesRead, 
boolean directAck)
@@ -3315,6 +3320,7 @@ public class Connection implements Runnable {
    * socket is properly closed at this end. When that is the case 
isResidualReaderThread
    * will return true.
    */
+  @VisibleForTesting
   public boolean hasResidualReaderThread() {
     return hasResidualReaderThread;
   }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
index 359d8aa..fe4f08b 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
@@ -32,11 +32,11 @@ import org.junit.Test;
 public class ByteBufferVendorTest {
 
   @FunctionalInterface
-  private static interface Foo {
+  private interface Foo {
     void run() throws IOException;
   }
 
-  private ByteBufferVendor sharing;
+  private ByteBufferVendor sharingVendor;
   private BufferPool poolMock;
   private CountDownLatch clientHasOpenedResource;
   private CountDownLatch clientMayComplete;
@@ -44,7 +44,7 @@ public class ByteBufferVendorTest {
   @Before
   public void before() {
     poolMock = mock(BufferPool.class);
-    sharing =
+    sharingVendor =
         new ByteBufferVendor(mock(ByteBuffer.class), 
BufferPool.BufferType.TRACKED_SENDER,
             poolMock);
     clientHasOpenedResource = new CountDownLatch(1);
@@ -54,7 +54,7 @@ public class ByteBufferVendorTest {
   @Test
   public void balancedCloseOwnerIsLastReferenceHolder() throws 
InterruptedException {
     resourceOwnerIsLastReferenceHolder("client with balanced close calls", () 
-> {
-      try (final ByteBufferSharing _unused = sharing.open()) {
+      try (final ByteBufferSharing _unused = sharingVendor.open()) {
       }
     });
   }
@@ -62,7 +62,7 @@ public class ByteBufferVendorTest {
   @Test
   public void extraCloseOwnerIsLastReferenceHolder() throws 
InterruptedException {
     resourceOwnerIsLastReferenceHolder("client with extra close calls", () -> {
-      final ByteBufferSharing sharing2 = sharing.open();
+      final ByteBufferSharing sharing2 = sharingVendor.open();
       sharing2.close();
       verify(poolMock, times(0)).releaseBuffer(any(), any());
       assertThatThrownBy(() -> 
sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
@@ -73,7 +73,7 @@ public class ByteBufferVendorTest {
   @Test
   public void balancedCloseClientIsLastReferenceHolder() throws 
InterruptedException {
     clientIsLastReferenceHolder("client with balanced close calls", () -> {
-      try (final ByteBufferSharing _unused = sharing.open()) {
+      try (final ByteBufferSharing _unused = sharingVendor.open()) {
         clientHasOpenedResource.countDown();
         blockClient();
       }
@@ -83,36 +83,42 @@ public class ByteBufferVendorTest {
   @Test
   public void extraCloseClientIsLastReferenceHolder() throws 
InterruptedException {
     clientIsLastReferenceHolder("client with extra close calls", () -> {
-      final ByteBufferSharing sharing2 = sharing.open();
+      final ByteBufferSharing sharing2 = sharingVendor.open();
       clientHasOpenedResource.countDown();
       blockClient();
       sharing2.close();
       verify(poolMock, times(1)).releaseBuffer(any(), any());
       assertThatThrownBy(() -> 
sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
-      System.out.println("here");
     });
   }
 
   @Test
   public void extraCloseDoesNotPrematurelyReturnBufferToPool() throws 
IOException {
-    final ByteBufferSharing sharing2 = sharing.open();
+    final ByteBufferSharing sharing2 = sharingVendor.open();
     sharing2.close();
     assertThatThrownBy(() -> 
sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
     verify(poolMock, times(0)).releaseBuffer(any(), any());
-    sharing.destruct();
+    sharingVendor.destruct();
     verify(poolMock, times(1)).releaseBuffer(any(), any());
   }
 
   @Test
   public void extraCloseDoesNotDecrementRefCount() throws IOException {
-    final ByteBufferSharing sharing2 = sharing.open();
+    final ByteBufferSharing sharing2 = sharingVendor.open();
     sharing2.close();
     assertThatThrownBy(() -> 
sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
-    final ByteBufferSharing sharing3 = this.sharing.open();
-    sharing.destruct();
+    final ByteBufferSharing sharing3 = this.sharingVendor.open();
+    sharingVendor.destruct();
     verify(poolMock, times(0)).releaseBuffer(any(), any());
   }
 
+  @Test
+  public void destructIsIdempotent() {
+    sharingVendor.destruct();
+    sharingVendor.destruct();
+    verify(poolMock, times(1)).releaseBuffer(any(), any());
+  }
+
   private void resourceOwnerIsLastReferenceHolder(final String name, final Foo 
client)
       throws InterruptedException {
     /*
@@ -128,7 +134,7 @@ public class ByteBufferVendorTest {
 
     verify(poolMock, times(0)).releaseBuffer(any(), any());
 
-    sharing.destruct();
+    sharingVendor.destruct();
 
     verify(poolMock, times(1)).releaseBuffer(any(), any());
   }
@@ -147,7 +153,7 @@ public class ByteBufferVendorTest {
 
     clientHasOpenedResource.await();
 
-    sharing.destruct();
+    sharingVendor.destruct();
 
     verify(poolMock, times(0)).releaseBuffer(any(), any());
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index 62a858c..d6b9aa6 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -90,7 +90,8 @@ public class NioSslEngineTest {
 
   @Test
   public void engineUsesDirectBuffers() throws IOException {
-    try (final ByteBufferSharing outputSharing = 
nioSslEngine.shareOutputBuffer()) {
+    try (final ByteBufferSharing outputSharing =
+        nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
       assertThat(outputSharing.getBuffer().isDirect()).isTrue();
     }
   }
@@ -190,7 +191,8 @@ public class NioSslEngineTest {
 
   @Test
   public void wrap() throws Exception {
-    try (final ByteBufferSharing outputSharing = 
nioSslEngine.shareOutputBuffer()) {
+    try (final ByteBufferSharing outputSharing =
+        nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
 
       // make the application data too big to fit into the engine's encryption 
buffer
       ByteBuffer appData =
@@ -221,7 +223,8 @@ public class NioSslEngineTest {
 
   @Test
   public void wrapFails() throws IOException {
-    try (final ByteBufferSharing outputSharing = 
nioSslEngine.shareOutputBuffer()) {
+    try (final ByteBufferSharing outputSharing =
+        nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
       // make the application data too big to fit into the engine's encryption 
buffer
       ByteBuffer appData =
           ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100);
@@ -244,7 +247,8 @@ public class NioSslEngineTest {
 
   @Test
   public void unwrapWithBufferOverflow() throws Exception {
-    try (final ByteBufferSharing inputSharing = 
nioSslEngine.shareInputBuffer()) {
+    try (final ByteBufferSharing inputSharing =
+        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
       // make the application data too big to fit into the engine's encryption 
buffer
       final ByteBuffer peerAppData = inputSharing.getBuffer();
 
@@ -284,7 +288,8 @@ public class NioSslEngineTest {
 
   @Test
   public void unwrapWithBufferUnderflow() throws Exception {
-    try (final ByteBufferSharing inputSharing = 
nioSslEngine.shareInputBuffer()) {
+    try (final ByteBufferSharing inputSharing =
+        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
       ByteBuffer wrappedData =
           ByteBuffer.allocate(inputSharing.getBuffer().capacity());
       byte[] netBytes = new byte[wrappedData.capacity() / 2];
@@ -309,7 +314,8 @@ public class NioSslEngineTest {
 
   @Test
   public void unwrapWithDecryptionError() throws IOException {
-    try (final ByteBufferSharing inputSharing = 
nioSslEngine.shareInputBuffer()) {
+    try (final ByteBufferSharing inputSharing =
+        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
       // make the application data too big to fit into the engine's encryption 
buffer
       ByteBuffer wrappedData =
           ByteBuffer.allocate(inputSharing.getBuffer().capacity());
@@ -368,10 +374,10 @@ public class NioSslEngineTest {
     when(mockEngine.wrap(any(ByteBuffer.class), 
any(ByteBuffer.class))).thenReturn(
         new SSLEngineResult(CLOSED, FINISHED, 0, 0));
     nioSslEngine.close(mockChannel);
-    assertThatThrownBy(() -> nioSslEngine.shareOutputBuffer().getBuffer())
+    assertThatThrownBy(() -> 
nioSslEngine.getOutputBufferVendorForTestingOnly().open().getBuffer())
         .isInstanceOf(IOException.class)
         .hasMessageContaining("NioSslEngine has been closed");
-    assertThatThrownBy(() -> nioSslEngine.shareInputBuffer().getBuffer())
+    assertThatThrownBy(() -> 
nioSslEngine.getInputBufferVendorForTestingOnly().open().getBuffer())
         .isInstanceOf(IOException.class)
         .hasMessageContaining("NioSslEngine has been closed");
     nioSslEngine.close(mockChannel);
@@ -401,7 +407,8 @@ public class NioSslEngineTest {
 
     when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
     when(mockEngine.wrap(any(ByteBuffer.class), 
any(ByteBuffer.class))).thenAnswer((x) -> {
-      try (final ByteBufferSharing outputSharing = 
nioSslEngine.shareOutputBuffer()) {
+      try (final ByteBufferSharing outputSharing =
+          nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
         // give the NioSslEngine something to write on its socket channel, 
simulating a TLS close
         // message
         outputSharing.getBuffer().put("Goodbye cruel world".getBytes());
@@ -437,7 +444,8 @@ public class NioSslEngineTest {
     ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000);
     SocketChannel mockChannel = mock(SocketChannel.class);
 
-    try (final ByteBufferSharing inputSharing = 
nioSslEngine.shareInputBuffer()) {
+    try (final ByteBufferSharing inputSharing =
+        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
       // force a compaction by making the decoded buffer appear near to being 
full
       ByteBuffer unwrappedBuffer = inputSharing.getBuffer();
       unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead);
@@ -487,10 +495,7 @@ public class NioSslEngineTest {
     ByteBuffer unwrappedBuffer = 
ByteBuffer.allocate(initialUnwrappedBufferSize);
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of 
message header - ignored
 
-    try (final ByteBufferSharing inputSharing = 
nioSslEngine.shareInputBuffer()) {
-      final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) 
inputSharing;
-      inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
-    }
+    
nioSslEngine.getInputBufferVendorForTestingOnly().setBufferForTestingOnly(unwrappedBuffer);
 
     // simulate some socket reads
     when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new 
Answer<Integer>() {
@@ -518,7 +523,8 @@ public class NioSslEngineTest {
       assertThat(data.limit()).isEqualTo(individualRead * 3 + 
preexistingBytes);
       // The initial available space in the unwrapped buffer should have 
doubled
       int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes;
-      try (final ByteBufferSharing inputSharing = 
nioSslEngine.shareInputBuffer()) {
+      try (final ByteBufferSharing inputSharing =
+          nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
         assertThat(inputSharing.getBuffer().capacity())
             .isEqualTo(2 * initialFreeSpace + preexistingBytes);
       }
@@ -544,10 +550,7 @@ public class NioSslEngineTest {
     // force buffer expansion by making a small decoded buffer appear near to 
being full
     ByteBuffer unwrappedBuffer = 
ByteBuffer.allocate(initialUnwrappedBufferSize);
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of 
message header - ignored
-    try (final ByteBufferSharing inputSharing = 
nioSslEngine.shareInputBuffer()) {
-      final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) 
inputSharing;
-      inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
-    }
+    
nioSslEngine.getInputBufferVendorForTestingOnly().setBufferForTestingOnly(unwrappedBuffer);
 
     // simulate some socket reads
     when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new 
Answer<Integer>() {
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
index c064afb..40f1ed3 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
@@ -103,6 +103,7 @@ public class ConnectionTest {
     TCPConduit tcpConduit = mock(TCPConduit.class);
 
     when(connectionTable.getConduit()).thenReturn(tcpConduit);
+    when(connectionTable.getBufferPool()).thenReturn(mock(BufferPool.class));
     when(distributionConfig.getMemberTimeout()).thenReturn(100);
     when(tcpConduit.getSocketId()).thenReturn(new 
InetSocketAddress(getLocalHost(), 12345));
 

Reply via email to