GEODE-3286: Failing to cleanup connections from ConnectionTable

In previous fix, we were checking "if thread is stopped then add
connection to receiver list". Instead, it should be if thread is stopped
then we should not add connection to receiver list. As a fix, we add
connection to reciver list if connection is not closed or thread is not
stopped.

Added unit for it.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/2c255933
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/2c255933
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/2c255933

Branch: refs/heads/feature/GEODE-3299
Commit: 2c255933ec3bf49601321a5cb80baea89541e820
Parents: f52ebca
Author: Hitesh Khamesra <hkhame...@pivotal.io>
Authored: Thu Aug 3 12:58:37 2017 -0700
Committer: Hitesh Khamesra <hkhame...@pivotal.io>
Committed: Thu Aug 3 16:11:48 2017 -0700

----------------------------------------------------------------------
 .../apache/geode/internal/tcp/Connection.java   |  4 ++
 .../geode/internal/tcp/ConnectionTable.java     |  2 +-
 .../geode/internal/tcp/ConnectionTableTest.java | 46 ++++++++++++++++----
 3 files changed, 42 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/2c255933/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index c3ad596..0ecb3bf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -3990,6 +3990,10 @@ public class Connection implements Runnable {
     return this.socket.isClosed() || !this.socket.isConnected();
   }
 
+  public boolean isReceiverStopped() {
+    return this.stopped;
+  }
+
   private boolean isSocketInUse() {
     return this.socketInUse;
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/2c255933/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 69fb7a2..044ab42 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -298,7 +298,7 @@ public class ConnectionTable {
         }
         // If connection.stopped is false, any connection cleanup thread will 
not yet have acquired
         // the receiver synchronization to remove the receiver. Therefore we 
can safely add it here.
-        if (!connection.isSocketClosed() || connection.stopped) {
+        if (!(connection.isSocketClosed() || connection.isReceiverStopped())) {
           this.receivers.add(connection);
         }
       }

http://git-wip-us.apache.org/repos/asf/geode/blob/2c255933/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
index 312c64d..0f5a7b9 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
@@ -20,6 +20,7 @@ import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -32,9 +33,13 @@ import static org.mockito.Mockito.when;
 
 @Category(UnitTest.class)
 public class ConnectionTableTest {
+  private ConnectionTable connectionTable;
+  private Socket socket;
+  private PeerConnectionFactory factory;
+  private Connection connection;
 
-  @Test
-  public void testConnectionsClosedDuringCreateAreNotAddedAsReceivers() throws 
Exception {
+  @Before
+  public void initConnectionTable() throws Exception {
     InternalDistributedSystem system = mock(InternalDistributedSystem.class);
     when(system.isShareSockets()).thenReturn(false);
 
@@ -49,18 +54,41 @@ public class ConnectionTableTest {
     when(tcpConduit.getCancelCriterion()).thenReturn(cancelCriterion);
     when(tcpConduit.getStats()).thenReturn(dmStats);
 
-    Connection connection = mock(Connection.class);
+    connection = mock(Connection.class);
+
+    socket = mock(Socket.class);
+
+    connectionTable = ConnectionTable.create(tcpConduit);
+
+    factory = mock(PeerConnectionFactory.class);
+    when(factory.createReceiver(connectionTable, 
socket)).thenReturn(connection);
+  }
+
+  @Test
+  public void testConnectionsClosedDuringCreateAreNotAddedAsReceivers() throws 
Exception {
+    when(connection.isReceiverStopped()).thenReturn(false);
     when(connection.isSocketClosed()).thenReturn(true); // Pretend this closed 
as soon at it was
                                                         // created
 
-    Socket socket = mock(Socket.class);
+    connectionTable.acceptConnection(socket, factory);
+    assertEquals(0, connectionTable.getNumberOfReceivers());
+  }
+
+  @Test
+  public void testThreadStoppedNotAddedAsReceivers() throws Exception {
+    when(connection.isSocketClosed()).thenReturn(false); // connection is not 
closed
 
-    ConnectionTable table = ConnectionTable.create(tcpConduit);
+    when(connection.isReceiverStopped()).thenReturn(true);// but receiver is 
stopped
 
-    PeerConnectionFactory factory = mock(PeerConnectionFactory.class);
-    when(factory.createReceiver(table, socket)).thenReturn(connection);
+    connectionTable.acceptConnection(socket, factory);
+    assertEquals(0, connectionTable.getNumberOfReceivers());
+  }
+
+  @Test
+  public void testSocketNotClosedAddedAsReceivers() throws Exception {
+    when(connection.isSocketClosed()).thenReturn(false);// connection is not 
closed
 
-    table.acceptConnection(socket, factory);
-    assertEquals(0, table.getNumberOfReceivers());
+    connectionTable.acceptConnection(socket, factory);
+    assertEquals(1, connectionTable.getNumberOfReceivers());
   }
 }

Reply via email to