DonalEvans commented on a change in pull request #7378:
URL: https://github.com/apache/geode/pull/7378#discussion_r819842187



##########
File path: 
geode-core/src/main/java/org/apache/geode/distributed/internal/LocatorLoadSnapshot.java
##########
@@ -479,27 +495,33 @@ void removeFromMap(Map<String, 
Map<ServerLocationAndMemberId, LoadHolder>> map,
         }
       }
     }
-    Map groupMap = map.get(null);
+    Map<ServerLocationAndMemberId, LoadHolder> groupMap = map.get(null);
     groupMap.remove(locationAndMemberId);
   }
 
   @VisibleForTesting
-  void updateMap(Map map, ServerLocation location, float load, float 
loadPerConnection) {
-    updateMap(map, location, "", load, loadPerConnection);
-  }
-
-  @VisibleForTesting
-  void updateMap(Map map, ServerLocation location, String memberId, float load,
+  void updateConnectionLoadMap(ServerLocation location, String memberId, float 
load,
       float loadPerConnection) {
-    Map groupMap = (Map) map.get(null);
-    LoadHolder holder;
-    if (memberId.equals("")) {
-      holder = (LoadHolder) groupMap.get(location);
-    } else {
-      ServerLocationAndMemberId locationAndMemberId =
-          new ServerLocationAndMemberId(location, memberId);
-      holder = (LoadHolder) groupMap.get(locationAndMemberId);
+    ServerLocationAndMemberId locationAndMemberId =
+        new ServerLocationAndMemberId(location, memberId);
+
+    Map<ServerLocationAndMemberId, LoadHolder> groupMap = 
connectionLoadMap.get(null);
+    LoadHolder holder = groupMap.get(locationAndMemberId);
+    if (holder == null) {
+      groupMap = connectionLoadMap.get(GatewayReceiver.RECEIVER_GROUP);
+      if (groupMap != null) {
+        holder = groupMap.get(locationAndMemberId);
+      }
     }
+
+    if (holder != null) {
+      holder.setLoad(load, loadPerConnection);
+    }
+  }
+
+  void updateQueueLoadMap(ServerLocation location, float load, float 
loadPerConnection) {

Review comment:
       This method should be annotated with `@VisibleForTesting` as it would be 
`private` if it wasn't being used in the unit test for this class.

##########
File path: 
geode-core/src/main/java/org/apache/geode/distributed/internal/LocatorLoadSnapshot.java
##########
@@ -413,7 +413,19 @@ public List getServersForQueue(String group, 
Set<ServerLocation> excludedServers
           new ServerLoad(connectionLoad.getLoad(), 
connectionLoad.getLoadPerConnection(),
               queueLoad.getLoad(), queueLoad.getLoadPerConnection()));
     }
+    return result;
+  }
+
+  public synchronized Map<ServerLocationAndMemberId, LoadHolder> 
getGatewayReceiverLoadMap() {

Review comment:
       Could this method be annotated with the `@TestOnly` annotation to make 
it clear that it's a test-only method please? Also, since it's only used in the 
unit test for this class, the visibility can be changed to package-private 
rather than public.

##########
File path: 
geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -631,28 +632,94 @@ public void testFindBestServersCalledWithNegativeCount() {
   }
 
   @Test
-  public void updateMapWithServerLocationAndMemberId() {
+  public void updateConnectionMapWithServerLocationAndMemberId() {
     final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
-    final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();
-    final ServerLocationAndMemberId sli = new 
ServerLocationAndMemberId(serverLocation, uniqueId);
-    LocatorLoadSnapshot.LoadHolder loadHolder =
-        new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1, 
LOAD_POLL_INTERVAL);
-    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> 
groupServers = new HashMap<>();
-    groupServers.put(sli, loadHolder);
-    Map<String, Map<ServerLocationAndMemberId, 
LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
+    final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();;
 
-    loadSnapshot.updateMap(map, serverLocation, uniqueId, 60, 2);
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[0], new 
ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
+
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 60, 2);
 
     LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
         new LocatorLoadSnapshot.LoadHolder(serverLocation, 60, 2, 
LOAD_POLL_INTERVAL);
 
-    assertEquals(expectedLoadHolder.getLoad(), 
groupServers.get(sli).getLoad(), 0);
+    Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+    assertEquals(expectedLoadHolder.getLoad(),
+        serverLoadMap.get(serverLocation).getConnectionLoad(), 0);
+    assertEquals(expectedLoadHolder.getLoadPerConnection(),
+        serverLoadMap.get(serverLocation).getLoadPerConnection(), 0);
+  }
+
+  @Test
+  public void 
updateConnectionMapWithServerLocationAndMemberIdGatewayReceiver() {

Review comment:
       This test name is a little unclear. Would it be possible to change it so 
that it states what behaviour is being tested, under what conditions, and what 
the expected result is? Particularly to distinguish it from the above test case.

##########
File path: 
geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -631,28 +632,94 @@ public void testFindBestServersCalledWithNegativeCount() {
   }
 
   @Test
-  public void updateMapWithServerLocationAndMemberId() {
+  public void updateConnectionMapWithServerLocationAndMemberId() {
     final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
-    final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();
-    final ServerLocationAndMemberId sli = new 
ServerLocationAndMemberId(serverLocation, uniqueId);
-    LocatorLoadSnapshot.LoadHolder loadHolder =
-        new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1, 
LOAD_POLL_INTERVAL);
-    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> 
groupServers = new HashMap<>();
-    groupServers.put(sli, loadHolder);
-    Map<String, Map<ServerLocationAndMemberId, 
LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
+    final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();;
 
-    loadSnapshot.updateMap(map, serverLocation, uniqueId, 60, 2);
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[0], new 
ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
+
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 60, 2);
 
     LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
         new LocatorLoadSnapshot.LoadHolder(serverLocation, 60, 2, 
LOAD_POLL_INTERVAL);
 
-    assertEquals(expectedLoadHolder.getLoad(), 
groupServers.get(sli).getLoad(), 0);
+    Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+    assertEquals(expectedLoadHolder.getLoad(),
+        serverLoadMap.get(serverLocation).getConnectionLoad(), 0);
+    assertEquals(expectedLoadHolder.getLoadPerConnection(),
+        serverLoadMap.get(serverLocation).getLoadPerConnection(), 0);
+  }
+
+  @Test
+  public void 
updateConnectionMapWithServerLocationAndMemberIdGatewayReceiver() {
+    final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
+
+    final ServerLocation serverLocation = new ServerLocation("localhost", 1);
+    final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();
+    final ServerLocationAndMemberId servLocAndMemberId =
+        new ServerLocationAndMemberId(serverLocation, uniqueId);
+
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[] 
{GatewayReceiver.RECEIVER_GROUP},
+        new ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
+
+    LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
+        new LocatorLoadSnapshot.LoadHolder(serverLocation, 70, 8, 
LOAD_POLL_INTERVAL);
+
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 
expectedLoadHolder.getLoad(),
+        expectedLoadHolder.getLoadPerConnection());
+
+    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> 
serverLoadMap =
+        loadSnapshot.getGatewayReceiverLoadMap();
+    assertEquals(expectedLoadHolder.getLoad(),
+        serverLoadMap.get(servLocAndMemberId).getLoad(), 0);
+    assertEquals(expectedLoadHolder.getLoadPerConnection(),
+        serverLoadMap.get(servLocAndMemberId).getLoadPerConnection(), 0);
+  }
+
+  @Test
+  public void 
updateConnectionMapWithServerLocationAndMemberIdTrafficConnectionAndGatewayReceiverGroup()
 {
+    final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
+
+    final ServerLocation serverLocation = new ServerLocation("localhost", 1);
+    final ServerLocation gatewayReceiverLocation = new 
ServerLocation("gatewayReciverHost", 111);
+    final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();;

Review comment:
       Extra semicolon here.

##########
File path: 
geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -631,28 +632,94 @@ public void testFindBestServersCalledWithNegativeCount() {
   }
 
   @Test
-  public void updateMapWithServerLocationAndMemberId() {
+  public void updateConnectionMapWithServerLocationAndMemberId() {
     final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
-    final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();
-    final ServerLocationAndMemberId sli = new 
ServerLocationAndMemberId(serverLocation, uniqueId);
-    LocatorLoadSnapshot.LoadHolder loadHolder =
-        new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1, 
LOAD_POLL_INTERVAL);
-    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> 
groupServers = new HashMap<>();
-    groupServers.put(sli, loadHolder);
-    Map<String, Map<ServerLocationAndMemberId, 
LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
+    final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();;

Review comment:
       Extra semicolon added here.

##########
File path: 
geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -661,53 +728,48 @@ public void 
updateMapWithServerLocationAndMemberIdKeyNotFound() {
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
     final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();
-    final ServerLocationAndMemberId sli = new 
ServerLocationAndMemberId(serverLocation, uniqueId);
-    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> 
groupServers = new HashMap<>();
-    Map<String, Map<ServerLocationAndMemberId, 
LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
 
-    loadSnapshot.updateMap(map, serverLocation, uniqueId, 50, 1);
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 50, 1);
 
-    assertNull(groupServers.get(sli));
+    Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+    assertTrue("Expected connection map to be empty", serverLoadMap.isEmpty());
   }
 
   @Test
-  public void updateMapWithServerLocation() {
+  public void updateQueueMapWithServerLocation() {
     final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
     LocatorLoadSnapshot.LoadHolder loadHolder =
         new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1, 
LOAD_POLL_INTERVAL);

Review comment:
       This variable is no longer used and can be removed.

##########
File path: 
geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
##########
@@ -234,11 +229,11 @@ private Object getLocatorListResponse(LocatorListRequest 
request) {
   }
 
   private Object pickQueueServers(QueueConnectionRequest clientRequest) {
-    Set excludedServers = new HashSet(clientRequest.getExcludedServers());
+    HashSet<ServerLocation> excludedServers = new 
HashSet<>(clientRequest.getExcludedServers());

Review comment:
       Could this stay using the `Set` interface rather than the concrete 
`HashSet` implementation please?

##########
File path: 
geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -631,28 +632,94 @@ public void testFindBestServersCalledWithNegativeCount() {
   }
 
   @Test
-  public void updateMapWithServerLocationAndMemberId() {
+  public void updateConnectionMapWithServerLocationAndMemberId() {
     final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
-    final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();
-    final ServerLocationAndMemberId sli = new 
ServerLocationAndMemberId(serverLocation, uniqueId);
-    LocatorLoadSnapshot.LoadHolder loadHolder =
-        new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1, 
LOAD_POLL_INTERVAL);
-    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> 
groupServers = new HashMap<>();
-    groupServers.put(sli, loadHolder);
-    Map<String, Map<ServerLocationAndMemberId, 
LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
+    final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();;
 
-    loadSnapshot.updateMap(map, serverLocation, uniqueId, 60, 2);
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[0], new 
ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
+
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 60, 2);
 
     LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
         new LocatorLoadSnapshot.LoadHolder(serverLocation, 60, 2, 
LOAD_POLL_INTERVAL);
 
-    assertEquals(expectedLoadHolder.getLoad(), 
groupServers.get(sli).getLoad(), 0);
+    Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+    assertEquals(expectedLoadHolder.getLoad(),
+        serverLoadMap.get(serverLocation).getConnectionLoad(), 0);
+    assertEquals(expectedLoadHolder.getLoadPerConnection(),
+        serverLoadMap.get(serverLocation).getLoadPerConnection(), 0);

Review comment:
       Could these assertions be changed to use AssertJ, i.e. 
`assertThat(X).isEqualTo(Y)` please?

##########
File path: 
geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -631,28 +632,94 @@ public void testFindBestServersCalledWithNegativeCount() {
   }
 
   @Test
-  public void updateMapWithServerLocationAndMemberId() {
+  public void updateConnectionMapWithServerLocationAndMemberId() {
     final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
-    final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();
-    final ServerLocationAndMemberId sli = new 
ServerLocationAndMemberId(serverLocation, uniqueId);
-    LocatorLoadSnapshot.LoadHolder loadHolder =
-        new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1, 
LOAD_POLL_INTERVAL);
-    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> 
groupServers = new HashMap<>();
-    groupServers.put(sli, loadHolder);
-    Map<String, Map<ServerLocationAndMemberId, 
LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
+    final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();;
 
-    loadSnapshot.updateMap(map, serverLocation, uniqueId, 60, 2);
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[0], new 
ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
+
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 60, 2);
 
     LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
         new LocatorLoadSnapshot.LoadHolder(serverLocation, 60, 2, 
LOAD_POLL_INTERVAL);
 
-    assertEquals(expectedLoadHolder.getLoad(), 
groupServers.get(sli).getLoad(), 0);
+    Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+    assertEquals(expectedLoadHolder.getLoad(),
+        serverLoadMap.get(serverLocation).getConnectionLoad(), 0);
+    assertEquals(expectedLoadHolder.getLoadPerConnection(),
+        serverLoadMap.get(serverLocation).getLoadPerConnection(), 0);
+  }
+
+  @Test
+  public void 
updateConnectionMapWithServerLocationAndMemberIdGatewayReceiver() {
+    final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
+
+    final ServerLocation serverLocation = new ServerLocation("localhost", 1);
+    final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();
+    final ServerLocationAndMemberId servLocAndMemberId =
+        new ServerLocationAndMemberId(serverLocation, uniqueId);
+
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[] 
{GatewayReceiver.RECEIVER_GROUP},
+        new ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
+
+    LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
+        new LocatorLoadSnapshot.LoadHolder(serverLocation, 70, 8, 
LOAD_POLL_INTERVAL);
+
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 
expectedLoadHolder.getLoad(),
+        expectedLoadHolder.getLoadPerConnection());
+
+    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> 
serverLoadMap =
+        loadSnapshot.getGatewayReceiverLoadMap();
+    assertEquals(expectedLoadHolder.getLoad(),
+        serverLoadMap.get(servLocAndMemberId).getLoad(), 0);
+    assertEquals(expectedLoadHolder.getLoadPerConnection(),
+        serverLoadMap.get(servLocAndMemberId).getLoadPerConnection(), 0);
+  }
+
+  @Test
+  public void 
updateConnectionMapWithServerLocationAndMemberIdTrafficConnectionAndGatewayReceiverGroup()
 {

Review comment:
       This test name could also be improved a bit to help make it clearer what 
the test is doing.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderConnectionLoadBalanceDistributedTest.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static 
org.apache.geode.cache.server.CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class ParallelGatewaySenderConnectionLoadBalanceDistributedTest 
implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  private MemberVM locator1Site2;
+  private MemberVM locator1Site1;
+  private MemberVM locator2Site1;
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+  private MemberVM server3Site1;
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private ClientVM clientSite2;
+
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+  private static final String REGION_NAME = "test1";
+  private static final int NUMBER_OF_DISPATCHER_THREADS = 20;
+  private static final int NUM_CONNECTION_PER_SERVER_OFFSET = 4;
+  private static final int LOAD_POLL_INTERVAL_OFFSET = 2000;
+
+  @Test
+  public void testGatewayConnectionCorrectlyLoadBalancedAtStartup() throws 
Exception {
+    startWAN();
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 500);

Review comment:
       Rather than calling these three methods at the start of each test, you 
could move the code in those methods to a `@Before` method, or just have a 
`@Before` method that calls them.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderConnectionLoadBalanceDistributedTest.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static 
org.apache.geode.cache.server.CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class ParallelGatewaySenderConnectionLoadBalanceDistributedTest 
implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  private MemberVM locator1Site2;
+  private MemberVM locator1Site1;
+  private MemberVM locator2Site1;
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+  private MemberVM server3Site1;
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private ClientVM clientSite2;
+
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+  private static final String REGION_NAME = "test1";
+  private static final int NUMBER_OF_DISPATCHER_THREADS = 20;
+  private static final int NUM_CONNECTION_PER_SERVER_OFFSET = 4;

Review comment:
       This offset seems somewhat arbitrary. How was it determined?

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderConnectionLoadBalanceDistributedTest.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static 
org.apache.geode.cache.server.CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class ParallelGatewaySenderConnectionLoadBalanceDistributedTest 
implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  private MemberVM locator1Site2;
+  private MemberVM locator1Site1;
+  private MemberVM locator2Site1;
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+  private MemberVM server3Site1;
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private ClientVM clientSite2;
+
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+  private static final String REGION_NAME = "test1";
+  private static final int NUMBER_OF_DISPATCHER_THREADS = 20;
+  private static final int NUM_CONNECTION_PER_SERVER_OFFSET = 4;
+  private static final int LOAD_POLL_INTERVAL_OFFSET = 2000;
+
+  @Test
+  public void testGatewayConnectionCorrectlyLoadBalancedAtStartup() throws 
Exception {
+    startWAN();
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 500);
+
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, 
server3Site1);
+  }
+
+  @Test
+  public void testGatewayConnLoadBalancedAfterCoordinatorLocatorShutdown() 
throws Exception {
+    startWAN();
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 400);
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, 
server3Site1);
+
+    
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.STOP_GATEWAYSENDER,
 null);
+
+    locator1Site1.stop(true);
+
+    // Wait for default load-poll-interval plus offset to expire, so that all 
servers send load
+    // to locator before continuing with the test
+    GeodeAwaitility.await().atLeast(DEFAULT_LOAD_POLL_INTERVAL + 
LOAD_POLL_INTERVAL_OFFSET,
+        TimeUnit.MILLISECONDS);
+
+    
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.START_GATEWAYSENDER,
 null);
+
+    doPutsClientSite2(400, 800);
+
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, 
server3Site1);
+  }
+
+  @Test
+  public void 
testGatewayConnLoadBalancedAfterCoordinatorLocatorShutdownAndGatewayReceiverStopped()
+      throws Exception {
+    startWAN();
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 400);
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, 
server3Site1);
+
+    
executeGatewayReceiverActionCommandAndValidateStateSite1(CliStrings.STOP_GATEWAYRECEIVER,
+        server1Site1);
+    
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.STOP_GATEWAYSENDER,
+        server1Site2);
+    locator1Site1.stop(true);
+
+    // Wait for default load-poll-interval plus offset to expire, so that all 
servers send load
+    // to locator before continuing with the test
+    GeodeAwaitility.await().atLeast(DEFAULT_LOAD_POLL_INTERVAL + 
LOAD_POLL_INTERVAL_OFFSET,
+        TimeUnit.MILLISECONDS);
+
+    
executeGatewayReceiverActionCommandAndValidateStateSite1(CliStrings.START_GATEWAYRECEIVER,
+        server1Site1);
+    
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.START_GATEWAYSENDER,
+        server1Site2);
+    doPutsClientSite2(400, 800);
+
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, 
server3Site1);
+  }
+
+  void executeGatewayReceiverActionCommandAndValidateStateSite1(String 
cliString, MemberVM memberVM)
+      throws Exception {
+    connectGfshToSite(locator2Site1);
+    String command = new CommandStringBuilder(cliString)
+        .addOption(CliStrings.MEMBERS, getMember(memberVM.getVM()).toString())
+        .getCommandString();
+    gfsh.executeAndAssertThat(command).statusIsSuccess();
+
+    if (cliString.equals(CliStrings.STOP_GATEWAYRECEIVER)) {
+      memberVM.invoke(() -> verifyReceiverState(false));
+      locator2Site1.invoke(
+          () -> 
validateGatewayReceiverMXBeanProxy(getMember(memberVM.getVM()), false));
+    } else if (cliString.equals(CliStrings.START_GATEWAYRECEIVER)) {
+      memberVM.invoke(() -> verifyReceiverState(true));
+      locator2Site1.invoke(
+          () -> 
validateGatewayReceiverMXBeanProxy(getMember(memberVM.getVM()), true));
+    }
+  }
+
+  void executeGatewaySenderActionCommandAndValidateStateSite2(String 
cliString, MemberVM memberVM)
+      throws Exception {
+    connectGfshToSite(locator1Site2);
+    String command;
+    if (memberVM == null) {
+      command = new CommandStringBuilder(cliString)
+          .addOption(CliStrings.STOP_GATEWAYSENDER__ID, "ln")
+          .getCommandString();
+      gfsh.executeAndAssertThat(command).statusIsSuccess();
+      verifyGatewaySenderState(server1Site2, isRunning(cliString));
+      verifyGatewaySenderState(server2Site2, isRunning(cliString));
+    } else {
+      command = new CommandStringBuilder(cliString)
+          .addOption(CliStrings.STOP_GATEWAYSENDER__ID, "ln")
+          .addOption(CliStrings.MEMBERS, 
getMember(memberVM.getVM()).toString())
+          .getCommandString();
+      gfsh.executeAndAssertThat(command).statusIsSuccess();
+      verifyGatewaySenderState(memberVM, isRunning(cliString));
+    }
+  }
+
+  boolean isRunning(String cliString) {
+    return CliStrings.START_GATEWAYSENDER.equals(cliString);
+  }
+
+  void verifyGatewaySenderState(MemberVM memberVM, boolean isRunning) {
+    memberVM.invoke(() -> verifySenderState("ln", isRunning, false));
+    locator1Site2.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(memberVM.getVM()), 
"ln", isRunning,
+            false));
+  }
+
+  int getGatewayReceiverStats() {
+    Set<GatewayReceiver> gatewayReceivers = 
ClusterStartupRule.getCache().getGatewayReceivers();
+    GatewayReceiver receiver = gatewayReceivers.iterator().next();
+    CacheServerStats stats = ((CacheServerImpl) 
receiver.getServer()).getAcceptor().getStats();
+    return stats.getCurrentClientConnections();
+  }
+
+  void checkConnectionLoadBalancedOnServers(MemberVM... members) {
+    int numberOfConnections = members[0].invoke(this::getGatewayReceiverStats);
+
+    for (MemberVM memberVM : members) {
+      await().untilAsserted(() -> 
assertThat(memberVM.invoke(this::getGatewayReceiverStats))
+          .isLessThan(numberOfConnections + NUM_CONNECTION_PER_SERVER_OFFSET));
+      await().untilAsserted(() -> 
assertThat(memberVM.invoke(this::getGatewayReceiverStats))
+          .isGreaterThan(numberOfConnections - 
NUM_CONNECTION_PER_SERVER_OFFSET));
+    }
+  }
+
+  void startWAN() throws Exception {
+    Properties props = new Properties();
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE1);
+    locator1Site1 = clusterStartupRule.startLocatorVM(1, props);
+    locator2Site1 = clusterStartupRule.startLocatorVM(7, props, 
locator1Site1.getPort());
+
+    // start servers for site #1
+    server1Site1 =
+        clusterStartupRule.startServerVM(3, locator1Site1.getPort(), 
locator2Site1.getPort());
+    server2Site1 =
+        clusterStartupRule.startServerVM(4, locator1Site1.getPort(), 
locator2Site1.getPort());
+    server3Site1 =
+        clusterStartupRule.startServerVM(6, locator1Site1.getPort(), 
locator2Site1.getPort());
+
+    connectGfshToSite(locator1Site1);
+
+    // create partition region on site #1
+    CommandStringBuilder regionCmd = new 
CommandStringBuilder(CliStrings.CREATE_REGION);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGION, REGION_NAME);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION");
+    regionCmd.addOption(CliStrings.CREATE_REGION__REDUNDANTCOPIES, "1");
+
+    gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess();
+
+    String csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYRECEIVER)
+        .addOption(CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS, "localhost")
+        .getCommandString();
+
+    gfsh.executeAndAssertThat(csb).statusIsSuccess();
+
+    server1Site1.invoke(() -> verifyReceiverState(true));
+    locator2Site1.invoke(
+        () -> 
validateGatewayReceiverMXBeanProxy(getMember(server1Site1.getVM()), true));
+    locator1Site1.invoke(
+        () -> 
validateGatewayReceiverMXBeanProxy(getMember(server1Site1.getVM()), true));
+
+    server2Site1.invoke(() -> verifyReceiverState(true));
+    locator2Site1.invoke(
+        () -> 
validateGatewayReceiverMXBeanProxy(getMember(server2Site1.getVM()), true));
+    locator1Site1.invoke(
+        () -> 
validateGatewayReceiverMXBeanProxy(getMember(server2Site1.getVM()), true));
+
+    server3Site1.invoke(() -> verifyReceiverState(true));
+    locator2Site1.invoke(
+        () -> 
validateGatewayReceiverMXBeanProxy(getMember(server3Site1.getVM()), true));
+    locator1Site1.invoke(
+        () -> 
validateGatewayReceiverMXBeanProxy(getMember(server3Site1.getVM()), true));
+
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE2);
+    props.setProperty(REMOTE_LOCATORS,
+        "localhost[" + locator1Site1.getPort() + "],localhost[" + 
locator2Site1.getPort() + "]");
+    locator1Site2 = clusterStartupRule.startLocatorVM(2, props);
+
+    // start servers for site #2
+    server1Site2 = clusterStartupRule.startServerVM(5, 
locator1Site2.getPort());
+    server2Site2 = clusterStartupRule.startServerVM(9, 
locator1Site2.getPort());
+
+    // create parallel gateway-sender on site #2
+    connectGfshToSite(locator1Site2);
+    String command = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER)
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ln")
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, 
"1")
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true")
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS,
+            "" + NUMBER_OF_DISPATCHER_THREADS)
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY, "key")
+        .getCommandString();
+    gfsh.executeAndAssertThat(command).statusIsSuccess();
+
+    server1Site2.invoke(() -> verifySenderState("ln", true, false));
+    server2Site2.invoke(() -> verifySenderState("ln", true, false));
+    locator1Site2.invoke(
+        () -> 
validateGatewaySenderMXBeanProxy(getMember(server1Site2.getVM()), "ln", true, 
false));
+    locator1Site2.invoke(
+        () -> 
validateGatewaySenderMXBeanProxy(getMember(server2Site2.getVM()), "ln", true, 
false));
+
+    // create partition region on site #2
+    regionCmd = new CommandStringBuilder(CliStrings.CREATE_REGION);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGION, REGION_NAME);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION");
+    regionCmd.addOption(CliStrings.CREATE_REGION__GATEWAYSENDERID, "ln");
+    regionCmd.addOption(CliStrings.CREATE_REGION__REDUNDANTCOPIES, "1");
+    gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess();
+  }
+
+  void startClientSite2(int locatorPort) throws Exception {
+    clientSite2 =
+        clusterStartupRule.startClientVM(8, c -> 
c.withLocatorConnection(locatorPort));
+    clientSite2.invoke(() -> {
+      ClusterStartupRule.clientCacheRule.createProxyRegion(REGION_NAME);
+    });
+  }
+
+  void doPutsClientSite2(int starRange, int stopRange) {

Review comment:
       Typo here, should be "startRange".

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderConnectionLoadBalanceDistributedTest.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static 
org.apache.geode.cache.server.CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class ParallelGatewaySenderConnectionLoadBalanceDistributedTest 
implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  private MemberVM locator1Site2;
+  private MemberVM locator1Site1;
+  private MemberVM locator2Site1;
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+  private MemberVM server3Site1;
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private ClientVM clientSite2;
+
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+  private static final String REGION_NAME = "test1";
+  private static final int NUMBER_OF_DISPATCHER_THREADS = 20;
+  private static final int NUM_CONNECTION_PER_SERVER_OFFSET = 4;
+  private static final int LOAD_POLL_INTERVAL_OFFSET = 2000;
+
+  @Test
+  public void testGatewayConnectionCorrectlyLoadBalancedAtStartup() throws 
Exception {
+    startWAN();
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 500);
+
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, 
server3Site1);
+  }
+
+  @Test
+  public void testGatewayConnLoadBalancedAfterCoordinatorLocatorShutdown() 
throws Exception {
+    startWAN();
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 400);
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, 
server3Site1);
+
+    
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.STOP_GATEWAYSENDER,
 null);
+
+    locator1Site1.stop(true);
+
+    // Wait for default load-poll-interval plus offset to expire, so that all 
servers send load
+    // to locator before continuing with the test
+    GeodeAwaitility.await().atLeast(DEFAULT_LOAD_POLL_INTERVAL + 
LOAD_POLL_INTERVAL_OFFSET,
+        TimeUnit.MILLISECONDS);
+
+    
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.START_GATEWAYSENDER,
 null);
+
+    doPutsClientSite2(400, 800);
+
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, 
server3Site1);
+  }
+
+  @Test
+  public void 
testGatewayConnLoadBalancedAfterCoordinatorLocatorShutdownAndGatewayReceiverStopped()
+      throws Exception {
+    startWAN();
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 400);
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, 
server3Site1);
+
+    
executeGatewayReceiverActionCommandAndValidateStateSite1(CliStrings.STOP_GATEWAYRECEIVER,
+        server1Site1);
+    
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.STOP_GATEWAYSENDER,
+        server1Site2);
+    locator1Site1.stop(true);
+
+    // Wait for default load-poll-interval plus offset to expire, so that all 
servers send load
+    // to locator before continuing with the test
+    GeodeAwaitility.await().atLeast(DEFAULT_LOAD_POLL_INTERVAL + 
LOAD_POLL_INTERVAL_OFFSET,
+        TimeUnit.MILLISECONDS);
+
+    
executeGatewayReceiverActionCommandAndValidateStateSite1(CliStrings.START_GATEWAYRECEIVER,
+        server1Site1);
+    
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.START_GATEWAYSENDER,
+        server1Site2);
+    doPutsClientSite2(400, 800);
+
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, 
server3Site1);
+  }
+
+  void executeGatewayReceiverActionCommandAndValidateStateSite1(String 
cliString, MemberVM memberVM)
+      throws Exception {
+    connectGfshToSite(locator2Site1);
+    String command = new CommandStringBuilder(cliString)
+        .addOption(CliStrings.MEMBERS, getMember(memberVM.getVM()).toString())
+        .getCommandString();
+    gfsh.executeAndAssertThat(command).statusIsSuccess();
+
+    if (cliString.equals(CliStrings.STOP_GATEWAYRECEIVER)) {
+      memberVM.invoke(() -> verifyReceiverState(false));
+      locator2Site1.invoke(
+          () -> 
validateGatewayReceiverMXBeanProxy(getMember(memberVM.getVM()), false));
+    } else if (cliString.equals(CliStrings.START_GATEWAYRECEIVER)) {
+      memberVM.invoke(() -> verifyReceiverState(true));
+      locator2Site1.invoke(
+          () -> 
validateGatewayReceiverMXBeanProxy(getMember(memberVM.getVM()), true));
+    }
+  }
+
+  void executeGatewaySenderActionCommandAndValidateStateSite2(String 
cliString, MemberVM memberVM)
+      throws Exception {
+    connectGfshToSite(locator1Site2);
+    String command;
+    if (memberVM == null) {
+      command = new CommandStringBuilder(cliString)
+          .addOption(CliStrings.STOP_GATEWAYSENDER__ID, "ln")
+          .getCommandString();
+      gfsh.executeAndAssertThat(command).statusIsSuccess();
+      verifyGatewaySenderState(server1Site2, isRunning(cliString));
+      verifyGatewaySenderState(server2Site2, isRunning(cliString));
+    } else {
+      command = new CommandStringBuilder(cliString)
+          .addOption(CliStrings.STOP_GATEWAYSENDER__ID, "ln")
+          .addOption(CliStrings.MEMBERS, 
getMember(memberVM.getVM()).toString())
+          .getCommandString();
+      gfsh.executeAndAssertThat(command).statusIsSuccess();
+      verifyGatewaySenderState(memberVM, isRunning(cliString));
+    }
+  }
+
+  boolean isRunning(String cliString) {
+    return CliStrings.START_GATEWAYSENDER.equals(cliString);
+  }
+
+  void verifyGatewaySenderState(MemberVM memberVM, boolean isRunning) {
+    memberVM.invoke(() -> verifySenderState("ln", isRunning, false));
+    locator1Site2.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(memberVM.getVM()), 
"ln", isRunning,
+            false));
+  }
+
+  int getGatewayReceiverStats() {
+    Set<GatewayReceiver> gatewayReceivers = 
ClusterStartupRule.getCache().getGatewayReceivers();
+    GatewayReceiver receiver = gatewayReceivers.iterator().next();
+    CacheServerStats stats = ((CacheServerImpl) 
receiver.getServer()).getAcceptor().getStats();
+    return stats.getCurrentClientConnections();
+  }
+
+  void checkConnectionLoadBalancedOnServers(MemberVM... members) {
+    int numberOfConnections = members[0].invoke(this::getGatewayReceiverStats);
+
+    for (MemberVM memberVM : members) {
+      await().untilAsserted(() -> 
assertThat(memberVM.invoke(this::getGatewayReceiverStats))
+          .isLessThan(numberOfConnections + NUM_CONNECTION_PER_SERVER_OFFSET));
+      await().untilAsserted(() -> 
assertThat(memberVM.invoke(this::getGatewayReceiverStats))
+          .isGreaterThan(numberOfConnections - 
NUM_CONNECTION_PER_SERVER_OFFSET));
+    }
+  }
+
+  void startWAN() throws Exception {
+    Properties props = new Properties();
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE1);
+    locator1Site1 = clusterStartupRule.startLocatorVM(1, props);
+    locator2Site1 = clusterStartupRule.startLocatorVM(7, props, 
locator1Site1.getPort());
+
+    // start servers for site #1
+    server1Site1 =
+        clusterStartupRule.startServerVM(3, locator1Site1.getPort(), 
locator2Site1.getPort());
+    server2Site1 =
+        clusterStartupRule.startServerVM(4, locator1Site1.getPort(), 
locator2Site1.getPort());
+    server3Site1 =
+        clusterStartupRule.startServerVM(6, locator1Site1.getPort(), 
locator2Site1.getPort());

Review comment:
       Small nitpick, but the VM numbers for the two sites seem kind of 
arbitrary. Would it be possible to have site 1 use 0 -> 4 and site 2 use 5 -> 8?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to