This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-5591
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 75a760b62c5d3c245879e829e267a3abc17c7142
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Wed Sep 5 15:29:40 2018 -0700

    GEODE-5591: fixed 2 issues:
                1) handle IO exception
                2) try with a random port, then retry with next port
---
 .../geode/internal/cache/wan/WANTestBase.java      | 14 ++--
 .../cache/wan/misc/WANConfigurationJUnitTest.java  | 16 ++---
 .../internal/cache/wan/GatewayReceiverImpl.java    | 80 +++++++++++-----------
 .../cache/wan/GatewayReceiverImplJUnitTest.java    | 66 ++++++++++++++++++
 4 files changed, 114 insertions(+), 62 deletions(-)

diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 0b1af66..a091942 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -37,6 +37,7 @@ import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATO
 import static 
org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
 import static org.apache.geode.test.dunit.Host.getHost;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -2017,16 +2018,9 @@ public class WANTestBase extends DistributedTestCase {
     fact.setManualStart(true);
     fact.setBindAddress("200.112.204.10");
     GatewayReceiver receiver = fact.create();
-    try {
-      receiver.start();
-      fail("Expected GatewayReceiver Exception");
-    } catch (GatewayReceiverException gRE) {
-      logger.debug("Got the GatewayReceiverException", gRE);
-      assertTrue(gRE.getMessage().contains("Failed to create server socket 
on"));
-    } catch (IOException e) {
-      e.printStackTrace();
-      fail("Test " + test.getName() + " failed to start GatewayReceiver on 
port " + port);
-    }
+    assertThatThrownBy(receiver::start)
+        .isInstanceOf(GatewayReceiverException.class)
+        .hasMessageContaining("No available free port found in the given 
range");
   }
 
   public static int createReceiverWithSSL(int locPort) {
diff --git 
a/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java
 
b/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java
index edcd55f..0566698 100644
--- 
a/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java
+++ 
b/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java
@@ -437,25 +437,19 @@ public class WANConfigurationJUnitTest {
   public void test_ValidateGatewayReceiverAttributes_WrongBindAddress() {
     cache = new CacheFactory().set(MCAST_PORT, "0").create();
     GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
-    fact.setStartPort(50504);
+    fact.setStartPort(50505);
     fact.setMaximumTimeBetweenPings(1000);
     fact.setSocketBufferSize(4000);
-    fact.setEndPort(70707);
+    fact.setEndPort(50505);
     fact.setManualStart(true);
     fact.setBindAddress("200.112.204.10");
     GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
     fact.addGatewayTransportFilter(myStreamFilter1);
 
+    long then = System.currentTimeMillis();
     GatewayReceiver receiver = fact.create();
-    try {
-      receiver.start();
-      fail("Expected GatewayReceiverException");
-    } catch (GatewayReceiverException gRE) {
-      assertTrue(gRE.getMessage().contains("Failed to create server socket 
on"));
-    } catch (IOException e) {
-      e.printStackTrace();
-      fail("The test failed with IOException");
-    }
+    assertThatThrownBy(() -> 
receiver.start()).isInstanceOf(GatewayReceiverException.class)
+        .hasMessageContaining("No available free port found in the given 
range");
   }
 
   @Test
diff --git 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
index 0f0fc63..bc7d006 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
@@ -15,8 +15,6 @@
 package org.apache.geode.internal.cache.wan;
 
 import java.io.IOException;
-import java.net.BindException;
-import java.net.SocketException;
 import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.List;
@@ -138,6 +136,32 @@ public class GatewayReceiverImpl implements 
GatewayReceiver {
     return receiver;
   }
 
+  private boolean tryToStart(int port) {
+    if (!AvailablePort.isPortAvailable(port, AvailablePort.SOCKET,
+        AvailablePort.getAddress(AvailablePort.SOCKET))) {
+      return false;
+    }
+
+    receiver.setPort(port);
+    receiver.setSocketBufferSize(socketBufferSize);
+    receiver.setMaximumTimeBetweenPings(timeBetPings);
+    if (hostnameForSenders != null && !hostnameForSenders.isEmpty()) {
+      receiver.setHostnameForClients(hostnameForSenders);
+    }
+    receiver.setBindAddress(bindAdd);
+    receiver.setGroups(new String[] {GatewayReceiver.RECEIVER_GROUP});
+    ((CacheServerImpl) receiver).setGatewayTransportFilter(this.filters);
+    try {
+      receiver.start();
+      this.port = port;
+      return true;
+    } catch (IOException e) {
+      
logger.info(LocalizedStrings.SocketCreator_FAILED_TO_CREATE_SERVER_SOCKET_ON_0_1
+          .toLocalizedString(new Object[] {bindAdd, port}));
+      return false;
+    }
+  }
+
   public void start() throws IOException {
     if (receiver == null) {
       receiver = this.cache.addCacheServer(true);
@@ -146,53 +170,27 @@ public class GatewayReceiverImpl implements 
GatewayReceiver {
       
logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_IS_ALREADY_RUNNING));
       return;
     }
-    boolean started = false;
-    this.port = getPortToStart();
-    while (!started && this.port != -1) {
-      receiver.setPort(this.port);
-      receiver.setSocketBufferSize(socketBufferSize);
-      receiver.setMaximumTimeBetweenPings(timeBetPings);
-      if (hostnameForSenders != null && !hostnameForSenders.isEmpty()) {
-        receiver.setHostnameForClients(hostnameForSenders);
+
+    int loopStartPort = getPortToStart();
+    int port = loopStartPort;
+    while (!tryToStart(port)) {
+      // get next port to try
+      if (port == endPort && startPort != endPort) {
+        port = startPort;
+      } else {
+        port++;
       }
-      receiver.setBindAddress(bindAdd);
-      receiver.setGroups(new String[] {GatewayReceiver.RECEIVER_GROUP});
-      ((CacheServerImpl) receiver).setGatewayTransportFilter(this.filters);
-      try {
-        receiver.start();
-        started = true;
-      } catch (BindException be) {
-        if (be.getCause() != null
-            && be.getCause().getMessage().contains("assign requested 
address")) {
-          throw new GatewayReceiverException(
-              
LocalizedStrings.SocketCreator_FAILED_TO_CREATE_SERVER_SOCKET_ON_0_1
-                  .toLocalizedString(new Object[] {bindAdd, this.port}));
-        }
-        // ignore as this port might have been used by other threads.
-        
logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use,
-            this.port));
-        this.port = getPortToStart();
-      } catch (SocketException se) {
-        if (se.getMessage().contains("Address already in use")) {
-          logger.warn(LocalizedMessage
-              .create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, 
this.port));
-          this.port = getPortToStart();
-
-        } else {
-          throw se;
-        }
+      if (port == loopStartPort || port > endPort) {
+        throw new GatewayReceiverException("No available free port found in 
the given range (" +
+            this.startPort + "-" + this.endPort + ")");
       }
-
-    }
-    if (!started) {
-      throw new IllegalStateException("No available free port found in the 
given range.");
     }
+
     logger
         
.info(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_STARTED_ON_PORT, 
this.port));
 
     InternalDistributedSystem system = 
this.cache.getInternalDistributedSystem();
     system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_START, this);
-
   }
 
   private int getPortToStart() {
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java
index 3fd732a..c95c41d 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java
@@ -14,16 +14,22 @@
  */
 package org.apache.geode.internal.cache.wan;
 
+import static org.apache.geode.internal.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.net.SocketException;
 import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.Test;
 
@@ -110,4 +116,64 @@ public class GatewayReceiverImplJUnitTest {
     verify(cache, times(1)).removeGatewayReceiver(gateway);
   }
 
+  @Test
+  public void testFailToStartWith2NextPorts() throws IOException {
+    InternalCache cache = mock(InternalCache.class);
+    CacheServerImpl server = mock(CacheServerImpl.class);
+    when(cache.addCacheServer(eq(true))).thenReturn(server);
+    doThrow(new SocketException("Address already in 
use")).when(server).start();
+    GatewayReceiverImpl gateway =
+        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, 
true);
+    assertThatThrownBy(() -> 
gateway.start()).isInstanceOf(GatewayReceiverException.class)
+        .hasMessageContaining("No available free port found in the given 
range");
+  }
+
+  @Test
+  public void testFailToStartWithSamePort() throws IOException {
+    InternalCache cache = mock(InternalCache.class);
+    CacheServerImpl server = mock(CacheServerImpl.class);
+    when(cache.addCacheServer(eq(true))).thenReturn(server);
+    doThrow(new SocketException("Address already in 
use")).when(server).start();
+    GatewayReceiverImpl gateway =
+        new GatewayReceiverImpl(cache, 2000, 2000, 5, 100, null, null, null, 
true);
+    assertThatThrownBy(() -> 
gateway.start()).isInstanceOf(GatewayReceiverException.class)
+        .hasMessageContaining("No available free port found in the given 
range");
+  }
+
+  @Test
+  public void testFailToStartWithARangeOfPorts() throws IOException {
+    InternalCache cache = mock(InternalCache.class);
+    CacheServerImpl server = mock(CacheServerImpl.class);
+    when(cache.addCacheServer(eq(true))).thenReturn(server);
+    doThrow(new SocketException("Address already in 
use")).when(server).start();
+    GatewayReceiverImpl gateway =
+        new GatewayReceiverImpl(cache, 2000, 2100, 5, 100, null, null, null, 
true);
+    assertThatThrownBy(() -> 
gateway.start()).isInstanceOf(GatewayReceiverException.class)
+        .hasMessageContaining("No available free port found in the given 
range");
+    assertTrue(gateway.getPort() == 0);
+  }
+
+  @Test
+  public void testSuccessToStartAtSpecifiedPort() throws IOException {
+    InternalCache cache = mock(InternalCache.class);
+    CacheServerImpl server = mock(CacheServerImpl.class);
+    InternalDistributedSystem system = mock(InternalDistributedSystem.class);
+    when(cache.getInternalDistributedSystem()).thenReturn(system);
+    when(cache.addCacheServer(eq(true))).thenReturn(server);
+    AtomicInteger callCount = new AtomicInteger();
+    doAnswer(invocation -> {
+      // only throw IOException for 10 times
+      if (callCount.get() < 10) {
+        callCount.incrementAndGet();
+        throw new SocketException("Address already in use");
+      }
+      return 0;
+    }).when(server).start();
+    GatewayReceiverImpl gateway =
+        new GatewayReceiverImpl(cache, 2000, 2100, 5, 100, null, null, null, 
true);
+    gateway.start();
+    System.out.println("GGG:" + gateway.getPort() + ":" + callCount.get());
+    assertTrue(gateway.getPort() > 2000);
+    assertEquals(10, callCount.get());
+  }
 }

Reply via email to