GEODE-3286: Failing to cleanup connections from ConnectionTable In previous fix, we were checking "if thread is stopped then add connection to receiver list". Instead, it should be if thread is stopped then we should not add connection to receiver list. As a fix, we add connection to reciver list if connection is not closed or thread is not stopped.
Added unit for it. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/2c255933 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/2c255933 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/2c255933 Branch: refs/heads/feature/GEODE-3299 Commit: 2c255933ec3bf49601321a5cb80baea89541e820 Parents: f52ebca Author: Hitesh Khamesra <hkhame...@pivotal.io> Authored: Thu Aug 3 12:58:37 2017 -0700 Committer: Hitesh Khamesra <hkhame...@pivotal.io> Committed: Thu Aug 3 16:11:48 2017 -0700 ---------------------------------------------------------------------- .../apache/geode/internal/tcp/Connection.java | 4 ++ .../geode/internal/tcp/ConnectionTable.java | 2 +- .../geode/internal/tcp/ConnectionTableTest.java | 46 ++++++++++++++++---- 3 files changed, 42 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/2c255933/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index c3ad596..0ecb3bf 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -3990,6 +3990,10 @@ public class Connection implements Runnable { return this.socket.isClosed() || !this.socket.isConnected(); } + public boolean isReceiverStopped() { + return this.stopped; + } + private boolean isSocketInUse() { return this.socketInUse; } http://git-wip-us.apache.org/repos/asf/geode/blob/2c255933/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java index 69fb7a2..044ab42 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java @@ -298,7 +298,7 @@ public class ConnectionTable { } // If connection.stopped is false, any connection cleanup thread will not yet have acquired // the receiver synchronization to remove the receiver. Therefore we can safely add it here. - if (!connection.isSocketClosed() || connection.stopped) { + if (!(connection.isSocketClosed() || connection.isReceiverStopped())) { this.receivers.add(connection); } } http://git-wip-us.apache.org/repos/asf/geode/blob/2c255933/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java index 312c64d..0f5a7b9 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java @@ -20,6 +20,7 @@ import org.apache.geode.distributed.internal.DM; import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.test.junit.categories.UnitTest; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -32,9 +33,13 @@ import static org.mockito.Mockito.when; @Category(UnitTest.class) public class ConnectionTableTest { + private ConnectionTable connectionTable; + private Socket socket; + private PeerConnectionFactory factory; + private Connection connection; - @Test - public void testConnectionsClosedDuringCreateAreNotAddedAsReceivers() throws Exception { + @Before + public void initConnectionTable() throws Exception { InternalDistributedSystem system = mock(InternalDistributedSystem.class); when(system.isShareSockets()).thenReturn(false); @@ -49,18 +54,41 @@ public class ConnectionTableTest { when(tcpConduit.getCancelCriterion()).thenReturn(cancelCriterion); when(tcpConduit.getStats()).thenReturn(dmStats); - Connection connection = mock(Connection.class); + connection = mock(Connection.class); + + socket = mock(Socket.class); + + connectionTable = ConnectionTable.create(tcpConduit); + + factory = mock(PeerConnectionFactory.class); + when(factory.createReceiver(connectionTable, socket)).thenReturn(connection); + } + + @Test + public void testConnectionsClosedDuringCreateAreNotAddedAsReceivers() throws Exception { + when(connection.isReceiverStopped()).thenReturn(false); when(connection.isSocketClosed()).thenReturn(true); // Pretend this closed as soon at it was // created - Socket socket = mock(Socket.class); + connectionTable.acceptConnection(socket, factory); + assertEquals(0, connectionTable.getNumberOfReceivers()); + } + + @Test + public void testThreadStoppedNotAddedAsReceivers() throws Exception { + when(connection.isSocketClosed()).thenReturn(false); // connection is not closed - ConnectionTable table = ConnectionTable.create(tcpConduit); + when(connection.isReceiverStopped()).thenReturn(true);// but receiver is stopped - PeerConnectionFactory factory = mock(PeerConnectionFactory.class); - when(factory.createReceiver(table, socket)).thenReturn(connection); + connectionTable.acceptConnection(socket, factory); + assertEquals(0, connectionTable.getNumberOfReceivers()); + } + + @Test + public void testSocketNotClosedAddedAsReceivers() throws Exception { + when(connection.isSocketClosed()).thenReturn(false);// connection is not closed - table.acceptConnection(socket, factory); - assertEquals(0, table.getNumberOfReceivers()); + connectionTable.acceptConnection(socket, factory); + assertEquals(1, connectionTable.getNumberOfReceivers()); } }