This is an automated email from the ASF dual-hosted git repository. hiteshkhamesra pushed a commit to branch feature/GEODE-3082 in repository https://gitbox.apache.org/repos/asf/geode.git
commit e7ca79659a80134a780b426aedc843551b276a84 Author: Alexander Murmann <amurm...@pivotal.io> AuthorDate: Wed Sep 6 17:03:40 2017 -0700 GEODE-3082 Integrated GenericProtocolServerConnection with ClientHealthMonitor. 1. Now GenericProtocolServerConnection creates ClientProxyMembershipId. 2. added test where CHM closes the connection. 3. added test where CHM doesn't close the connection Signed-off-by: Hitesh Khamesra <hkhame...@pivotal.io> --- .../membership/InternalDistributedMember.java | 2 +- .../cache/tier/sockets/ClientHealthMonitor.java | 2 +- .../sockets/GenericProtocolServerConnection.java | 31 ++++++++++++- .../cache/tier/sockets/ServerConnection.java | 2 +- .../RoundTripCacheConnectionJUnitTest.java | 52 ++++++++++++++++++++++ 5 files changed, 85 insertions(+), 4 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java index e152756..3deb13c 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java @@ -117,7 +117,7 @@ public class InternalDistributedMember implements DistributedMember, Externaliza * member for use in the P2P cache. Use of other constructors can break * network-partition-detection. * - * @param i + * @param i the intet address * @param p the membership port * @param splitBrainEnabled whether this feature is enabled for the member * @param canBeCoordinator whether the member is eligible to be the membership coordinator diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java index 226da8a..c3f9529 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java @@ -100,7 +100,7 @@ public class ClientHealthMonitor { /** * The interval between client monitor iterations */ - final protected static long CLIENT_MONITOR_INTERVAL = 1000; + final public static long CLIENT_MONITOR_INTERVAL = 1000; final private CacheClientNotifierStats stats; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java index 6c81028..f4c4c3e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java @@ -15,9 +15,13 @@ package org.apache.geode.internal.cache.tier.sockets; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.Acceptor; import org.apache.geode.internal.cache.tier.CachedRegionHelper; +import org.apache.geode.internal.cache.tier.CommunicationMode; import org.apache.geode.internal.security.SecurityService; import org.apache.geode.security.SecurityManager; import org.apache.geode.security.server.Authenticator; @@ -26,6 +30,8 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.Socket; /** @@ -36,6 +42,7 @@ public class GenericProtocolServerConnection extends ServerConnection { private final ClientProtocolMessageHandler messageHandler; private final SecurityManager securityManager; private final Authenticator authenticator; + private ClientProxyMembershipID clientProxyMembershipID; /** * Creates a new <code>GenericProtocolServerConnection</code> that processes messages received @@ -50,6 +57,10 @@ public class GenericProtocolServerConnection extends ServerConnection { securityManager = securityService.getSecurityManager(); this.messageHandler = newClientProtocol; this.authenticator = authenticator; + + setClientProxyMembershipId(); + + doHandShake(CommunicationMode.ProtobufClientServerProtocol.getModeNumber(), 0); } @Override @@ -72,16 +83,34 @@ public class GenericProtocolServerConnection extends ServerConnection { logger.warn(e); this.setFlagProcessMessagesAsFalse(); setClientDisconnectedException(e); + } finally { + acceptor.getClientHealthMonitor().receivedPing(this.clientProxyMembershipID); } } + private void setClientProxyMembershipId() { + ServerLocation serverLocation = new ServerLocation( + ((InetSocketAddress) this.getSocket().getRemoteSocketAddress()).getHostName(), + this.getSocketPort()); + DistributedMember distributedMember = new InternalDistributedMember(serverLocation); + // no handshake for new client protocol. + clientProxyMembershipID = new ClientProxyMembershipID(distributedMember); + } + @Override protected boolean doHandShake(byte epType, int qSize) { - // no handshake for new client protocol. + getAcceptor().getClientHealthMonitor().registerClient(clientProxyMembershipID); + getAcceptor().getClientHealthMonitor().addConnection(clientProxyMembershipID, this); + return true; } @Override + protected int getClientReadTimeout() { + return 10000; + } + + @Override public boolean isClientServerConnection() { return true; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index b243d8e..3aa6cd6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -1265,7 +1265,7 @@ public abstract class ServerConnection implements Runnable { this.requestSpecificTimeout = -1; } - int getClientReadTimeout() { + protected int getClientReadTimeout() { if (this.requestSpecificTimeout == -1) return this.handshake.getClientReadTimeout(); else diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java index 9819a4d..21a7db7 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java @@ -24,6 +24,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTOR import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -57,6 +58,7 @@ import org.apache.geode.internal.AvailablePortHelper; import org.apache.geode.internal.admin.SSLConfig; import org.apache.geode.internal.cache.CacheServerImpl; import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl; +import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor; import org.apache.geode.internal.cache.tier.sockets.GenericProtocolServerConnection; import org.apache.geode.internal.net.SocketCreator; import org.apache.geode.internal.net.SocketCreatorFactory; @@ -117,6 +119,7 @@ public class RoundTripCacheConnectionJUnitTest { boolean useSSL = testName.getMethodName().startsWith("useSSL_"); Properties properties = new Properties(); + properties.put("log-level", "debug"); if (useSSL) { updatePropertiesForSSLCache(properties); } @@ -125,13 +128,17 @@ public class RoundTripCacheConnectionJUnitTest { cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0"); cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, "false"); cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, "false"); + + cache = cacheFactory.create(); CacheServer cacheServer = cache.addCacheServer(); cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort(); cacheServer.setPort(cacheServerPort); + cacheServer.setMaximumTimeBetweenPings(200); cacheServer.start(); + RegionFactory<Object, Object> regionFactory = cache.createRegionFactory(); regionFactory.create(TEST_REGION); @@ -151,9 +158,54 @@ public class RoundTripCacheConnectionJUnitTest { @After public void cleanup() throws IOException { + System.out.println("beginning of after"); cache.close(); socket.close(); SocketCreatorFactory.close(); + System.out.println("done with after"); + } + + @Test + public void testUnresponsiveClientsGetDisconnected() throws Exception { + ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); + ClientProtocol.Message putMessage = + MessageUtil.makePutRequestMessage(serializationService, TEST_KEY, TEST_VALUE, TEST_REGION, + ProtobufUtilities.createMessageHeader(TEST_PUT_CORRELATION_ID)); + + + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + // send a PUT message + protobufProtocolSerializer.serialize(putMessage, outputStream); + assertEquals(-1, socket.getInputStream().read()); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + + Awaitility.await().atMost(1500, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS) + .pollDelay(ClientHealthMonitor.CLIENT_MONITOR_INTERVAL + 1, TimeUnit.MILLISECONDS) + .until(runnable); + } + + @Test + public void testResponsiveClientsStaysConnected() throws Exception { + ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); + ClientProtocol.Message putMessage = + MessageUtil.makePutRequestMessage(serializationService, TEST_KEY, TEST_VALUE, TEST_REGION, + ProtobufUtilities.createMessageHeader(TEST_PUT_CORRELATION_ID)); + + int timeout = 1500; + int interval = 100; + for (int i = 0; i < timeout; i += interval) { + // send a PUT message + protobufProtocolSerializer.serialize(putMessage, outputStream); + assertNotEquals(-1, socket.getInputStream().read()); + Thread.sleep(interval); + } } @Test -- To stop receiving notification emails like this one, please contact "commits@geode.apache.org" <commits@geode.apache.org>.