This is an automated email from the ASF dual-hosted git repository. mcmellawatt pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 41dbe2e GEODE-5747: Handling SocketException consistently (#2504) 41dbe2e is described below commit 41dbe2e5804939df0b33ae47e57da50f6c7d9152 Author: Ryan McMahon <rmcma...@pivotal.io> AuthorDate: Tue Oct 2 12:48:11 2018 -0700 GEODE-5747: Handling SocketException consistently (#2504) --- .../internal/tcpserver/TcpServerJUnitTest.java | 73 ++++++++++++++++++++-- .../distributed/internal/tcpserver/TcpServer.java | 3 +- .../geode/internal/InternalDataSerializer.java | 44 ++++--------- .../internal/InternalDataSerializerJUnitTest.java | 31 +++++++++ 4 files changed, 111 insertions(+), 40 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java index 71a1eba..4f83560 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java @@ -18,12 +18,18 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import java.io.DataInput; import java.io.DataOutput; +import java.io.EOFException; import java.io.IOException; import java.net.ConnectException; import java.net.InetAddress; +import java.net.SocketException; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -34,6 +40,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.apache.geode.DataSerializable; import org.apache.geode.cache.GemFireCache; @@ -79,18 +87,20 @@ public class TcpServerJUnitTest { EchoHandler handler = new EchoHandler(); start(handler); + TcpClient tcpClient = new TcpClient(); + TestObject test = new TestObject(); test.id = 5; TestObject result = - (TestObject) new TcpClient().requestToServer(localhost, port, test, 60 * 1000); + (TestObject) tcpClient.requestToServer(localhost, port, test, 60 * 1000); assertEquals(test.id, result.id); - String[] info = new TcpClient().getInfo(localhost, port); + String[] info = tcpClient.getInfo(localhost, port); assertNotNull(info); assertTrue(info.length > 1); try { - new TcpClient().stop(localhost, port); + tcpClient.stop(localhost, port); } catch (ConnectException ignore) { // must not be running } @@ -109,12 +119,14 @@ public class TcpServerJUnitTest { DelayHandler handler = new DelayHandler(latch); start(handler); + TcpClient tcpClient = new TcpClient(); + final AtomicBoolean done = new AtomicBoolean(); Thread delayedThread = new Thread() { public void run() { Boolean delay = Boolean.valueOf(true); try { - new TcpClient().requestToServer(localhost, port, delay, 60 * 1000); + tcpClient.requestToServer(localhost, port, delay, 60 * 1000); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { @@ -127,7 +139,7 @@ public class TcpServerJUnitTest { try { Thread.sleep(500); assertFalse(done.get()); - new TcpClient().requestToServer(localhost, port, Boolean.valueOf(false), 60 * 1000); + tcpClient.requestToServer(localhost, port, Boolean.valueOf(false), 60 * 1000); assertFalse(done.get()); latch.countDown(); @@ -138,7 +150,7 @@ public class TcpServerJUnitTest { delayedThread.join(60 * 1000); assertTrue(!delayedThread.isAlive()); // GemStoneAddition try { - new TcpClient().stop(localhost, port); + tcpClient.stop(localhost, port); } catch (ConnectException ignore) { // must not be running } @@ -146,6 +158,55 @@ public class TcpServerJUnitTest { } } + @Test + public void testNewConnectionsAcceptedAfterSocketException() throws IOException, + ClassNotFoundException, InterruptedException { + // Initially mock the handler to throw a SocketException. We want to verify that the server + // can recover and serve new client requests after a SocketException is thrown. + TcpHandler mockTcpHandler = mock(TcpHandler.class); + doThrow(SocketException.class).when(mockTcpHandler).processRequest(any(Object.class)); + start(mockTcpHandler); + + TcpClient tcpClient = new TcpClient(); + + // Due to the mocked handler, an EOFException will be thrown on the client. This is expected, + // so we just catch it. + try { + tcpClient.requestToServer(localhost, port, new TestObject(), 60 * 1000); + } catch (EOFException eofEx) { + } + + // Change the mock handler behavior to echo the request back + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return invocation.getArgument(0); + } + }).when(mockTcpHandler).processRequest(any(Object.class)); + + // Perform another request and validate that it was served successfully + TestObject test = new TestObject(); + test.id = 5; + TestObject result = + (TestObject) tcpClient.requestToServer(localhost, port, test, 60 * 1000); + + assertEquals(test.id, result.id); + String[] info = tcpClient.getInfo(localhost, port); + assertNotNull(info); + assertTrue(info.length > 1); + + try { + tcpClient.stop(localhost, port); + } catch (ConnectException ignore) { + // must not be running + } + server.join(60 * 1000); + assertFalse(server.isAlive()); + + assertEquals(5, stats.started.get()); + assertEquals(5, stats.ended.get()); + } + private static class TestObject implements DataSerializable { int id; diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java index 266e6a1..eaffc03 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java @@ -24,6 +24,7 @@ import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; +import java.net.SocketException; import java.net.URL; import java.util.Date; import java.util.HashMap; @@ -386,7 +387,7 @@ public class TcpServer { } else { rejectUnknownProtocolConnection(socket, firstByte); } - } catch (EOFException ignore) { + } catch (EOFException | SocketException ignore) { // client went away - ignore } catch (CancelException ignore) { // ignore diff --git a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java index 90921b9..dfa5f07 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java +++ b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java @@ -2480,6 +2480,13 @@ public abstract class InternalDataSerializer extends DataSerializer { } else { ((DataSerializable) ds).fromData(in); } + + if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) { + logger.trace(LogMarker.SERIALIZER_VERBOSE, "Read {} {}", + ds instanceof DataSerializableFixedID ? "DataSerializableFixedID" + : "DataSerializable", + ds); + } } } catch (EOFException | ClassNotFoundException | CacheClosedException | SocketException ex) { // client went away - ignore @@ -2499,16 +2506,11 @@ public abstract class InternalDataSerializer extends DataSerializer { Constructor init = c.getConstructor(new Class[0]); init.setAccessible(true); Object o = init.newInstance(new Object[0]); - Assert.assertTrue(o instanceof DataSerializable); - invokeFromData(o, in); - if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) { - logger.trace(LogMarker.SERIALIZER_VERBOSE, "Read DataSerializable {}", o); - } + invokeFromData(o, in); return o; - - } catch (EOFException ex) { + } catch (EOFException | SocketException ex) { // client went away - ignore throw ex; } catch (Exception ex) { @@ -2519,30 +2521,6 @@ public abstract class InternalDataSerializer extends DataSerializer { } } - private static Object readDataSerializableFixedID(final DataInput in) - throws IOException, ClassNotFoundException { - Class c = readClass(in); - try { - Constructor init = c.getConstructor(new Class[0]); - init.setAccessible(true); - Object o = init.newInstance(new Object[0]); - - invokeFromData(o, in); - - if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) { - logger.trace(LogMarker.SERIALIZER_VERBOSE, "Read DataSerializableFixedID {}", o); - } - - return o; - - } catch (Exception ex) { - throw new SerializationException( - LocalizedStrings.DataSerializer_COULD_NOT_CREATE_AN_INSTANCE_OF_0 - .toLocalizedString(c.getName()), - ex); - } - } - /** * Get the {@link Version} of the peer or disk store that created this {@link DataInput}. */ @@ -2643,7 +2621,7 @@ public abstract class InternalDataSerializer extends DataSerializer { case DS_FIXED_ID_SHORT: return DSFIDFactory.create(in.readShort(), in); case DS_NO_FIXED_ID: - return readDataSerializableFixedID(in); + return readDataSerializable(in); case DS_FIXED_ID_INT: return DSFIDFactory.create(in.readInt(), in); default: @@ -2784,7 +2762,7 @@ public abstract class InternalDataSerializer extends DataSerializer { case DS_FIXED_ID_INT: return DSFIDFactory.create(in.readInt(), in); case DS_NO_FIXED_ID: - return readDataSerializableFixedID(in); + return readDataSerializable(in); case NULL: return null; case NULL_STRING: diff --git a/geode-core/src/test/java/org/apache/geode/internal/InternalDataSerializerJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/InternalDataSerializerJUnitTest.java index 2ef8e73..f787141 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/InternalDataSerializerJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/InternalDataSerializerJUnitTest.java @@ -17,10 +17,12 @@ package org.apache.geode.internal; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.net.SocketException; import java.util.Properties; @@ -70,6 +72,20 @@ public class InternalDataSerializerJUnitTest { .isInstanceOf(SocketException.class); } + @Test + public void testBasicReadObject_SocketExceptionReThrown() + throws IOException, ClassNotFoundException { + DataInput in = mock(DataInput.class); + doReturn(DSCODE.DS_NO_FIXED_ID.toByte()).doReturn(DSCODE.CLASS.toByte()) + .doReturn(DSCODE.STRING.toByte()).when(in).readByte(); + doReturn( + "org.apache.geode.internal.InternalDataSerializerJUnitTest$SocketExceptionThrowingDataSerializable") + .when(in).readUTF(); + + assertThatThrownBy(() -> InternalDataSerializer.basicReadObject(in)) + .isInstanceOf(SocketException.class); + } + class TestFunction implements Function { @Override public void execute(FunctionContext context) { @@ -79,4 +95,19 @@ public class InternalDataSerializerJUnitTest { class TestPdxSerializerObject implements PdxSerializerObject { } + + // Class must be static in order to call the constructor via reflection in the serializer + public static class SocketExceptionThrowingDataSerializable implements DataSerializable { + public SocketExceptionThrowingDataSerializable() {} + + @Override + public void toData(DataOutput out) throws IOException { + // Not needed for test + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + throw new SocketException(); + } + } }