Repository: incubator-geode Updated Branches: refs/heads/develop 616676e13 -> 20c202d95
GEODE-1174 CI failure: UniversalMembershipListenerAdapterDUnitTest.testSystemClientEventsInServer When a client closes its connection Pool a CloseConnection request is sent to the server and then sockets are closed. Unfortunately this sometimes results in the socket being closed before the operation reaches the server and the server sees the client as having crashed. This is mostly a problem if the client is sending keepAlive==true but it is also what is causing this test to fail intermittently. I've changed CloseConnectionOp to wait for a reply and have changed the server to send this reply. It's possible for the client to get an EOFException instead of a reply, but at least this causes the client to wait for the server to terminate the connection so that we know that the server has received the request and acted on it. Since old GemFire clients might be used with Geode I've added checks for them and avoid sending a reply in that case. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/20c202d9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/20c202d9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/20c202d9 Branch: refs/heads/develop Commit: 20c202d95b502d1511f37cea9a77a892fb20834c Parents: 616676e Author: Bruce Schuchardt <bschucha...@pivotal.io> Authored: Mon Apr 18 08:49:50 2016 -0700 Committer: Bruce Schuchardt <bschucha...@pivotal.io> Committed: Mon Apr 18 08:49:50 2016 -0700 ---------------------------------------------------------------------- .../client/internal/CloseConnectionOp.java | 26 +++++++++++--------- .../tier/sockets/command/CloseConnection.java | 17 ++++++++++--- .../client/internal/GatewaySenderBatchOp.java | 9 +++++-- 3 files changed, 35 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/20c202d9/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java index cbfa3a6..36e08f5 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java @@ -19,6 +19,8 @@ package com.gemstone.gemfire.cache.client.internal; import com.gemstone.gemfire.internal.cache.tier.MessageType; import com.gemstone.gemfire.internal.cache.tier.sockets.Message; +import java.io.EOFException; + /** * Tell a server that a connection is being closed * @since 5.7 @@ -33,7 +35,11 @@ public class CloseConnectionOp { throws Exception { AbstractOp op = new CloseConnectionOpImpl(keepAlive); - con.execute(op); + try { + con.execute(op); + } catch (EOFException e) { + // expected + } } private CloseConnectionOp() { @@ -48,13 +54,7 @@ public class CloseConnectionOp { super(MessageType.CLOSE_CONNECTION, 1); getMessage().addRawPart(new byte[]{(byte)(keepAlive?1:0)}, false); } - @Override - protected Message createResponseMessage() { - // no response is sent - return null; - } - - @Override + @Override protected void processSecureBytes(Connection cnx, Message message) throws Exception { } @@ -70,11 +70,15 @@ public class CloseConnectionOp { getMessage().send(false); } - @Override + @Override protected Object processResponse(Message msg) throws Exception { - throw new IllegalStateException("should never be called"); + // CloseConnectionOp doesn't return anything - we wait for a response + // so that we know that the server has processed the request before + // we return from execute(); + return null; } - @Override + + @Override protected boolean isErrorResponse(int msgType) { return false; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/20c202d9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseConnection.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseConnection.java index 59e1ac4..e487592 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseConnection.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseConnection.java @@ -19,6 +19,7 @@ */ package com.gemstone.gemfire.internal.cache.tier.sockets.command; +import com.gemstone.gemfire.internal.Version; import com.gemstone.gemfire.internal.cache.tier.Command; import com.gemstone.gemfire.internal.cache.tier.sockets.*; import com.gemstone.gemfire.distributed.internal.DistributionStats; @@ -40,11 +41,16 @@ public class CloseConnection extends BaseCommand { public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { CacheServerStats stats = servConn.getCacheServerStats(); - { - long oldStart = start; - start = DistributionStats.getStatTime(); - stats.incReadCloseConnectionRequestTime(start - oldStart); + long oldStart = start; + boolean respondToClient = servConn.getClientVersion().compareTo(Version.GFE_90) >= 0; + start = DistributionStats.getStatTime(); + stats.incReadCloseConnectionRequestTime(start - oldStart); + + if (respondToClient) { + // newer clients will wait for a response or EOFException + servConn.setAsTrue(REQUIRES_RESPONSE); } + try { // clientHost = theSocket.getInetAddress().getCanonicalHostName(); servConn.setClientDisconnectCleanly(); @@ -66,6 +72,9 @@ public class CloseConnection extends BaseCommand { } } finally { + if (respondToClient) { + writeReply(msg, servConn); + } servConn.setFlagProcessMessagesAsFalse(); stats.incProcessCloseConnectionTime(DistributionStats.getStatTime() http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/20c202d9/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java old mode 100644 new mode 100755 index 73b3fc0..be0a4f7 --- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java +++ b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java @@ -254,12 +254,17 @@ public class GatewaySenderBatchOp { switch (msg.getMessageType()) { case MessageType.REPLY: // Read the chunk - int batchId = msg.getPart(0).getInt(); + Part part0 = msg.getPart(0); + if (part0.isBytes() && part0.getLength() == 1 && part0.getSerializedForm()[0] == 0) { + // REPLY_OKAY from a CloseConnection + break; + } + int batchId = part0.getInt(); int numEvents = msg.getPart(1).getInt(); ack = new GatewayAck(batchId, numEvents); break; case MessageType.EXCEPTION: - Part part0 = msg.getPart(0); + part0 = msg.getPart(0); Object obj = part0.getObject(); if (obj instanceof List) {