This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-5591 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 75a760b62c5d3c245879e829e267a3abc17c7142 Author: zhouxh <gz...@pivotal.io> AuthorDate: Wed Sep 5 15:29:40 2018 -0700 GEODE-5591: fixed 2 issues: 1) handle IO exception 2) try with a random port, then retry with next port --- .../geode/internal/cache/wan/WANTestBase.java | 14 ++-- .../cache/wan/misc/WANConfigurationJUnitTest.java | 16 ++--- .../internal/cache/wan/GatewayReceiverImpl.java | 80 +++++++++++----------- .../cache/wan/GatewayReceiverImplJUnitTest.java | 66 ++++++++++++++++++ 4 files changed, 114 insertions(+), 62 deletions(-) diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java index 0b1af66..a091942 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -37,6 +37,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATO import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR; import static org.apache.geode.test.dunit.Host.getHost; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -2017,16 +2018,9 @@ public class WANTestBase extends DistributedTestCase { fact.setManualStart(true); fact.setBindAddress("200.112.204.10"); GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - fail("Expected GatewayReceiver Exception"); - } catch (GatewayReceiverException gRE) { - logger.debug("Got the GatewayReceiverException", gRE); - assertTrue(gRE.getMessage().contains("Failed to create server socket on")); - } catch (IOException e) { - e.printStackTrace(); - fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + port); - } + assertThatThrownBy(receiver::start) + .isInstanceOf(GatewayReceiverException.class) + .hasMessageContaining("No available free port found in the given range"); } public static int createReceiverWithSSL(int locPort) { diff --git a/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java b/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java index edcd55f..0566698 100644 --- a/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java +++ b/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java @@ -437,25 +437,19 @@ public class WANConfigurationJUnitTest { public void test_ValidateGatewayReceiverAttributes_WrongBindAddress() { cache = new CacheFactory().set(MCAST_PORT, "0").create(); GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - fact.setStartPort(50504); + fact.setStartPort(50505); fact.setMaximumTimeBetweenPings(1000); fact.setSocketBufferSize(4000); - fact.setEndPort(70707); + fact.setEndPort(50505); fact.setManualStart(true); fact.setBindAddress("200.112.204.10"); GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); fact.addGatewayTransportFilter(myStreamFilter1); + long then = System.currentTimeMillis(); GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - fail("Expected GatewayReceiverException"); - } catch (GatewayReceiverException gRE) { - assertTrue(gRE.getMessage().contains("Failed to create server socket on")); - } catch (IOException e) { - e.printStackTrace(); - fail("The test failed with IOException"); - } + assertThatThrownBy(() -> receiver.start()).isInstanceOf(GatewayReceiverException.class) + .hasMessageContaining("No available free port found in the given range"); } @Test diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java index 0f0fc63..bc7d006 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java @@ -15,8 +15,6 @@ package org.apache.geode.internal.cache.wan; import java.io.IOException; -import java.net.BindException; -import java.net.SocketException; import java.net.UnknownHostException; import java.util.Arrays; import java.util.List; @@ -138,6 +136,32 @@ public class GatewayReceiverImpl implements GatewayReceiver { return receiver; } + private boolean tryToStart(int port) { + if (!AvailablePort.isPortAvailable(port, AvailablePort.SOCKET, + AvailablePort.getAddress(AvailablePort.SOCKET))) { + return false; + } + + receiver.setPort(port); + receiver.setSocketBufferSize(socketBufferSize); + receiver.setMaximumTimeBetweenPings(timeBetPings); + if (hostnameForSenders != null && !hostnameForSenders.isEmpty()) { + receiver.setHostnameForClients(hostnameForSenders); + } + receiver.setBindAddress(bindAdd); + receiver.setGroups(new String[] {GatewayReceiver.RECEIVER_GROUP}); + ((CacheServerImpl) receiver).setGatewayTransportFilter(this.filters); + try { + receiver.start(); + this.port = port; + return true; + } catch (IOException e) { + logger.info(LocalizedStrings.SocketCreator_FAILED_TO_CREATE_SERVER_SOCKET_ON_0_1 + .toLocalizedString(new Object[] {bindAdd, port})); + return false; + } + } + public void start() throws IOException { if (receiver == null) { receiver = this.cache.addCacheServer(true); @@ -146,53 +170,27 @@ public class GatewayReceiverImpl implements GatewayReceiver { logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_IS_ALREADY_RUNNING)); return; } - boolean started = false; - this.port = getPortToStart(); - while (!started && this.port != -1) { - receiver.setPort(this.port); - receiver.setSocketBufferSize(socketBufferSize); - receiver.setMaximumTimeBetweenPings(timeBetPings); - if (hostnameForSenders != null && !hostnameForSenders.isEmpty()) { - receiver.setHostnameForClients(hostnameForSenders); + + int loopStartPort = getPortToStart(); + int port = loopStartPort; + while (!tryToStart(port)) { + // get next port to try + if (port == endPort && startPort != endPort) { + port = startPort; + } else { + port++; } - receiver.setBindAddress(bindAdd); - receiver.setGroups(new String[] {GatewayReceiver.RECEIVER_GROUP}); - ((CacheServerImpl) receiver).setGatewayTransportFilter(this.filters); - try { - receiver.start(); - started = true; - } catch (BindException be) { - if (be.getCause() != null - && be.getCause().getMessage().contains("assign requested address")) { - throw new GatewayReceiverException( - LocalizedStrings.SocketCreator_FAILED_TO_CREATE_SERVER_SOCKET_ON_0_1 - .toLocalizedString(new Object[] {bindAdd, this.port})); - } - // ignore as this port might have been used by other threads. - logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, - this.port)); - this.port = getPortToStart(); - } catch (SocketException se) { - if (se.getMessage().contains("Address already in use")) { - logger.warn(LocalizedMessage - .create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port)); - this.port = getPortToStart(); - - } else { - throw se; - } + if (port == loopStartPort || port > endPort) { + throw new GatewayReceiverException("No available free port found in the given range (" + + this.startPort + "-" + this.endPort + ")"); } - - } - if (!started) { - throw new IllegalStateException("No available free port found in the given range."); } + logger .info(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_STARTED_ON_PORT, this.port)); InternalDistributedSystem system = this.cache.getInternalDistributedSystem(); system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_START, this); - } private int getPortToStart() { diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java index 3fd732a..c95c41d 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java @@ -14,16 +14,22 @@ */ package org.apache.geode.internal.cache.wan; +import static org.apache.geode.internal.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.net.SocketException; import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; @@ -110,4 +116,64 @@ public class GatewayReceiverImplJUnitTest { verify(cache, times(1)).removeGatewayReceiver(gateway); } + @Test + public void testFailToStartWith2NextPorts() throws IOException { + InternalCache cache = mock(InternalCache.class); + CacheServerImpl server = mock(CacheServerImpl.class); + when(cache.addCacheServer(eq(true))).thenReturn(server); + doThrow(new SocketException("Address already in use")).when(server).start(); + GatewayReceiverImpl gateway = + new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true); + assertThatThrownBy(() -> gateway.start()).isInstanceOf(GatewayReceiverException.class) + .hasMessageContaining("No available free port found in the given range"); + } + + @Test + public void testFailToStartWithSamePort() throws IOException { + InternalCache cache = mock(InternalCache.class); + CacheServerImpl server = mock(CacheServerImpl.class); + when(cache.addCacheServer(eq(true))).thenReturn(server); + doThrow(new SocketException("Address already in use")).when(server).start(); + GatewayReceiverImpl gateway = + new GatewayReceiverImpl(cache, 2000, 2000, 5, 100, null, null, null, true); + assertThatThrownBy(() -> gateway.start()).isInstanceOf(GatewayReceiverException.class) + .hasMessageContaining("No available free port found in the given range"); + } + + @Test + public void testFailToStartWithARangeOfPorts() throws IOException { + InternalCache cache = mock(InternalCache.class); + CacheServerImpl server = mock(CacheServerImpl.class); + when(cache.addCacheServer(eq(true))).thenReturn(server); + doThrow(new SocketException("Address already in use")).when(server).start(); + GatewayReceiverImpl gateway = + new GatewayReceiverImpl(cache, 2000, 2100, 5, 100, null, null, null, true); + assertThatThrownBy(() -> gateway.start()).isInstanceOf(GatewayReceiverException.class) + .hasMessageContaining("No available free port found in the given range"); + assertTrue(gateway.getPort() == 0); + } + + @Test + public void testSuccessToStartAtSpecifiedPort() throws IOException { + InternalCache cache = mock(InternalCache.class); + CacheServerImpl server = mock(CacheServerImpl.class); + InternalDistributedSystem system = mock(InternalDistributedSystem.class); + when(cache.getInternalDistributedSystem()).thenReturn(system); + when(cache.addCacheServer(eq(true))).thenReturn(server); + AtomicInteger callCount = new AtomicInteger(); + doAnswer(invocation -> { + // only throw IOException for 10 times + if (callCount.get() < 10) { + callCount.incrementAndGet(); + throw new SocketException("Address already in use"); + } + return 0; + }).when(server).start(); + GatewayReceiverImpl gateway = + new GatewayReceiverImpl(cache, 2000, 2100, 5, 100, null, null, null, true); + gateway.start(); + System.out.println("GGG:" + gateway.getPort() + ":" + callCount.get()); + assertTrue(gateway.getPort() > 2000); + assertEquals(10, callCount.get()); + } }