DonalEvans commented on a change in pull request #7378:
URL: https://github.com/apache/geode/pull/7378#discussion_r819842187
##########
File path:
geode-core/src/main/java/org/apache/geode/distributed/internal/LocatorLoadSnapshot.java
##########
@@ -479,27 +495,33 @@ void removeFromMap(Map<String,
Map<ServerLocationAndMemberId, LoadHolder>> map,
}
}
}
- Map groupMap = map.get(null);
+ Map<ServerLocationAndMemberId, LoadHolder> groupMap = map.get(null);
groupMap.remove(locationAndMemberId);
}
@VisibleForTesting
- void updateMap(Map map, ServerLocation location, float load, float
loadPerConnection) {
- updateMap(map, location, "", load, loadPerConnection);
- }
-
- @VisibleForTesting
- void updateMap(Map map, ServerLocation location, String memberId, float load,
+ void updateConnectionLoadMap(ServerLocation location, String memberId, float
load,
float loadPerConnection) {
- Map groupMap = (Map) map.get(null);
- LoadHolder holder;
- if (memberId.equals("")) {
- holder = (LoadHolder) groupMap.get(location);
- } else {
- ServerLocationAndMemberId locationAndMemberId =
- new ServerLocationAndMemberId(location, memberId);
- holder = (LoadHolder) groupMap.get(locationAndMemberId);
+ ServerLocationAndMemberId locationAndMemberId =
+ new ServerLocationAndMemberId(location, memberId);
+
+ Map<ServerLocationAndMemberId, LoadHolder> groupMap =
connectionLoadMap.get(null);
+ LoadHolder holder = groupMap.get(locationAndMemberId);
+ if (holder == null) {
+ groupMap = connectionLoadMap.get(GatewayReceiver.RECEIVER_GROUP);
+ if (groupMap != null) {
+ holder = groupMap.get(locationAndMemberId);
+ }
}
+
+ if (holder != null) {
+ holder.setLoad(load, loadPerConnection);
+ }
+ }
+
+ void updateQueueLoadMap(ServerLocation location, float load, float
loadPerConnection) {
Review comment:
This method should be annotated with `@VisibleForTesting` as it would be
`private` if it wasn't being used in the unit test for this class.
##########
File path:
geode-core/src/main/java/org/apache/geode/distributed/internal/LocatorLoadSnapshot.java
##########
@@ -413,7 +413,19 @@ public List getServersForQueue(String group,
Set<ServerLocation> excludedServers
new ServerLoad(connectionLoad.getLoad(),
connectionLoad.getLoadPerConnection(),
queueLoad.getLoad(), queueLoad.getLoadPerConnection()));
}
+ return result;
+ }
+
+ public synchronized Map<ServerLocationAndMemberId, LoadHolder>
getGatewayReceiverLoadMap() {
Review comment:
Could this method be annotated with the `@TestOnly` annotation to make
it clear that it's a test-only method please? Also, since it's only used in the
unit test for this class, the visibility can be changed to package-private
rather than public.
##########
File path:
geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -631,28 +632,94 @@ public void testFindBestServersCalledWithNegativeCount() {
}
@Test
- public void updateMapWithServerLocationAndMemberId() {
+ public void updateConnectionMapWithServerLocationAndMemberId() {
final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
final ServerLocation serverLocation = new ServerLocation("localhost", 1);
- final String uniqueId = new InternalDistributedMember("localhost",
1).getUniqueId();
- final ServerLocationAndMemberId sli = new
ServerLocationAndMemberId(serverLocation, uniqueId);
- LocatorLoadSnapshot.LoadHolder loadHolder =
- new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1,
LOAD_POLL_INTERVAL);
- Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder>
groupServers = new HashMap<>();
- groupServers.put(sli, loadHolder);
- Map<String, Map<ServerLocationAndMemberId,
LocatorLoadSnapshot.LoadHolder>> map =
- new HashMap<>();
- map.put(null, groupServers);
+ final String uniqueId = new InternalDistributedMember("localhost",
1).getUniqueId();;
- loadSnapshot.updateMap(map, serverLocation, uniqueId, 60, 2);
+ loadSnapshot.addServer(serverLocation, uniqueId, new String[0], new
ServerLoad(50, 1, 0, 1),
+ LOAD_POLL_INTERVAL);
+
+ loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 60, 2);
LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
new LocatorLoadSnapshot.LoadHolder(serverLocation, 60, 2,
LOAD_POLL_INTERVAL);
- assertEquals(expectedLoadHolder.getLoad(),
groupServers.get(sli).getLoad(), 0);
+ Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+ assertEquals(expectedLoadHolder.getLoad(),
+ serverLoadMap.get(serverLocation).getConnectionLoad(), 0);
+ assertEquals(expectedLoadHolder.getLoadPerConnection(),
+ serverLoadMap.get(serverLocation).getLoadPerConnection(), 0);
+ }
+
+ @Test
+ public void
updateConnectionMapWithServerLocationAndMemberIdGatewayReceiver() {
Review comment:
This test name is a little unclear. Would it be possible to change it so
that it states what behaviour is being tested, under what conditions, and what
the expected result is? Particularly to distinguish it from the above test case.
##########
File path:
geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -631,28 +632,94 @@ public void testFindBestServersCalledWithNegativeCount() {
}
@Test
- public void updateMapWithServerLocationAndMemberId() {
+ public void updateConnectionMapWithServerLocationAndMemberId() {
final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
final ServerLocation serverLocation = new ServerLocation("localhost", 1);
- final String uniqueId = new InternalDistributedMember("localhost",
1).getUniqueId();
- final ServerLocationAndMemberId sli = new
ServerLocationAndMemberId(serverLocation, uniqueId);
- LocatorLoadSnapshot.LoadHolder loadHolder =
- new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1,
LOAD_POLL_INTERVAL);
- Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder>
groupServers = new HashMap<>();
- groupServers.put(sli, loadHolder);
- Map<String, Map<ServerLocationAndMemberId,
LocatorLoadSnapshot.LoadHolder>> map =
- new HashMap<>();
- map.put(null, groupServers);
+ final String uniqueId = new InternalDistributedMember("localhost",
1).getUniqueId();;
- loadSnapshot.updateMap(map, serverLocation, uniqueId, 60, 2);
+ loadSnapshot.addServer(serverLocation, uniqueId, new String[0], new
ServerLoad(50, 1, 0, 1),
+ LOAD_POLL_INTERVAL);
+
+ loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 60, 2);
LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
new LocatorLoadSnapshot.LoadHolder(serverLocation, 60, 2,
LOAD_POLL_INTERVAL);
- assertEquals(expectedLoadHolder.getLoad(),
groupServers.get(sli).getLoad(), 0);
+ Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+ assertEquals(expectedLoadHolder.getLoad(),
+ serverLoadMap.get(serverLocation).getConnectionLoad(), 0);
+ assertEquals(expectedLoadHolder.getLoadPerConnection(),
+ serverLoadMap.get(serverLocation).getLoadPerConnection(), 0);
+ }
+
+ @Test
+ public void
updateConnectionMapWithServerLocationAndMemberIdGatewayReceiver() {
+ final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
+
+ final ServerLocation serverLocation = new ServerLocation("localhost", 1);
+ final String uniqueId = new InternalDistributedMember("localhost",
1).getUniqueId();
+ final ServerLocationAndMemberId servLocAndMemberId =
+ new ServerLocationAndMemberId(serverLocation, uniqueId);
+
+ loadSnapshot.addServer(serverLocation, uniqueId, new String[]
{GatewayReceiver.RECEIVER_GROUP},
+ new ServerLoad(50, 1, 0, 1),
+ LOAD_POLL_INTERVAL);
+
+ LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
+ new LocatorLoadSnapshot.LoadHolder(serverLocation, 70, 8,
LOAD_POLL_INTERVAL);
+
+ loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId,
expectedLoadHolder.getLoad(),
+ expectedLoadHolder.getLoadPerConnection());
+
+ Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder>
serverLoadMap =
+ loadSnapshot.getGatewayReceiverLoadMap();
+ assertEquals(expectedLoadHolder.getLoad(),
+ serverLoadMap.get(servLocAndMemberId).getLoad(), 0);
+ assertEquals(expectedLoadHolder.getLoadPerConnection(),
+ serverLoadMap.get(servLocAndMemberId).getLoadPerConnection(), 0);
+ }
+
+ @Test
+ public void
updateConnectionMapWithServerLocationAndMemberIdTrafficConnectionAndGatewayReceiverGroup()
{
+ final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
+
+ final ServerLocation serverLocation = new ServerLocation("localhost", 1);
+ final ServerLocation gatewayReceiverLocation = new
ServerLocation("gatewayReciverHost", 111);
+ final String uniqueId = new InternalDistributedMember("localhost",
1).getUniqueId();;
Review comment:
Extra semicolon here.
##########
File path:
geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -631,28 +632,94 @@ public void testFindBestServersCalledWithNegativeCount() {
}
@Test
- public void updateMapWithServerLocationAndMemberId() {
+ public void updateConnectionMapWithServerLocationAndMemberId() {
final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
final ServerLocation serverLocation = new ServerLocation("localhost", 1);
- final String uniqueId = new InternalDistributedMember("localhost",
1).getUniqueId();
- final ServerLocationAndMemberId sli = new
ServerLocationAndMemberId(serverLocation, uniqueId);
- LocatorLoadSnapshot.LoadHolder loadHolder =
- new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1,
LOAD_POLL_INTERVAL);
- Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder>
groupServers = new HashMap<>();
- groupServers.put(sli, loadHolder);
- Map<String, Map<ServerLocationAndMemberId,
LocatorLoadSnapshot.LoadHolder>> map =
- new HashMap<>();
- map.put(null, groupServers);
+ final String uniqueId = new InternalDistributedMember("localhost",
1).getUniqueId();;
Review comment:
Extra semicolon added here.
##########
File path:
geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -661,53 +728,48 @@ public void
updateMapWithServerLocationAndMemberIdKeyNotFound() {
final ServerLocation serverLocation = new ServerLocation("localhost", 1);
final String uniqueId = new InternalDistributedMember("localhost",
1).getUniqueId();
- final ServerLocationAndMemberId sli = new
ServerLocationAndMemberId(serverLocation, uniqueId);
- Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder>
groupServers = new HashMap<>();
- Map<String, Map<ServerLocationAndMemberId,
LocatorLoadSnapshot.LoadHolder>> map =
- new HashMap<>();
- map.put(null, groupServers);
- loadSnapshot.updateMap(map, serverLocation, uniqueId, 50, 1);
+ loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 50, 1);
- assertNull(groupServers.get(sli));
+ Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+ assertTrue("Expected connection map to be empty", serverLoadMap.isEmpty());
}
@Test
- public void updateMapWithServerLocation() {
+ public void updateQueueMapWithServerLocation() {
final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
final ServerLocation serverLocation = new ServerLocation("localhost", 1);
LocatorLoadSnapshot.LoadHolder loadHolder =
new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1,
LOAD_POLL_INTERVAL);
Review comment:
This variable is no longer used and can be removed.
##########
File path:
geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
##########
@@ -234,11 +229,11 @@ private Object getLocatorListResponse(LocatorListRequest
request) {
}
private Object pickQueueServers(QueueConnectionRequest clientRequest) {
- Set excludedServers = new HashSet(clientRequest.getExcludedServers());
+ HashSet<ServerLocation> excludedServers = new
HashSet<>(clientRequest.getExcludedServers());
Review comment:
Could this stay using the `Set` interface rather than the concrete
`HashSet` implementation please?
##########
File path:
geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -631,28 +632,94 @@ public void testFindBestServersCalledWithNegativeCount() {
}
@Test
- public void updateMapWithServerLocationAndMemberId() {
+ public void updateConnectionMapWithServerLocationAndMemberId() {
final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
final ServerLocation serverLocation = new ServerLocation("localhost", 1);
- final String uniqueId = new InternalDistributedMember("localhost",
1).getUniqueId();
- final ServerLocationAndMemberId sli = new
ServerLocationAndMemberId(serverLocation, uniqueId);
- LocatorLoadSnapshot.LoadHolder loadHolder =
- new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1,
LOAD_POLL_INTERVAL);
- Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder>
groupServers = new HashMap<>();
- groupServers.put(sli, loadHolder);
- Map<String, Map<ServerLocationAndMemberId,
LocatorLoadSnapshot.LoadHolder>> map =
- new HashMap<>();
- map.put(null, groupServers);
+ final String uniqueId = new InternalDistributedMember("localhost",
1).getUniqueId();;
- loadSnapshot.updateMap(map, serverLocation, uniqueId, 60, 2);
+ loadSnapshot.addServer(serverLocation, uniqueId, new String[0], new
ServerLoad(50, 1, 0, 1),
+ LOAD_POLL_INTERVAL);
+
+ loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 60, 2);
LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
new LocatorLoadSnapshot.LoadHolder(serverLocation, 60, 2,
LOAD_POLL_INTERVAL);
- assertEquals(expectedLoadHolder.getLoad(),
groupServers.get(sli).getLoad(), 0);
+ Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+ assertEquals(expectedLoadHolder.getLoad(),
+ serverLoadMap.get(serverLocation).getConnectionLoad(), 0);
+ assertEquals(expectedLoadHolder.getLoadPerConnection(),
+ serverLoadMap.get(serverLocation).getLoadPerConnection(), 0);
Review comment:
Could these assertions be changed to use AssertJ, i.e.
`assertThat(X).isEqualTo(Y)` please?
##########
File path:
geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -631,28 +632,94 @@ public void testFindBestServersCalledWithNegativeCount() {
}
@Test
- public void updateMapWithServerLocationAndMemberId() {
+ public void updateConnectionMapWithServerLocationAndMemberId() {
final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
final ServerLocation serverLocation = new ServerLocation("localhost", 1);
- final String uniqueId = new InternalDistributedMember("localhost",
1).getUniqueId();
- final ServerLocationAndMemberId sli = new
ServerLocationAndMemberId(serverLocation, uniqueId);
- LocatorLoadSnapshot.LoadHolder loadHolder =
- new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1,
LOAD_POLL_INTERVAL);
- Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder>
groupServers = new HashMap<>();
- groupServers.put(sli, loadHolder);
- Map<String, Map<ServerLocationAndMemberId,
LocatorLoadSnapshot.LoadHolder>> map =
- new HashMap<>();
- map.put(null, groupServers);
+ final String uniqueId = new InternalDistributedMember("localhost",
1).getUniqueId();;
- loadSnapshot.updateMap(map, serverLocation, uniqueId, 60, 2);
+ loadSnapshot.addServer(serverLocation, uniqueId, new String[0], new
ServerLoad(50, 1, 0, 1),
+ LOAD_POLL_INTERVAL);
+
+ loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 60, 2);
LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
new LocatorLoadSnapshot.LoadHolder(serverLocation, 60, 2,
LOAD_POLL_INTERVAL);
- assertEquals(expectedLoadHolder.getLoad(),
groupServers.get(sli).getLoad(), 0);
+ Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+ assertEquals(expectedLoadHolder.getLoad(),
+ serverLoadMap.get(serverLocation).getConnectionLoad(), 0);
+ assertEquals(expectedLoadHolder.getLoadPerConnection(),
+ serverLoadMap.get(serverLocation).getLoadPerConnection(), 0);
+ }
+
+ @Test
+ public void
updateConnectionMapWithServerLocationAndMemberIdGatewayReceiver() {
+ final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
+
+ final ServerLocation serverLocation = new ServerLocation("localhost", 1);
+ final String uniqueId = new InternalDistributedMember("localhost",
1).getUniqueId();
+ final ServerLocationAndMemberId servLocAndMemberId =
+ new ServerLocationAndMemberId(serverLocation, uniqueId);
+
+ loadSnapshot.addServer(serverLocation, uniqueId, new String[]
{GatewayReceiver.RECEIVER_GROUP},
+ new ServerLoad(50, 1, 0, 1),
+ LOAD_POLL_INTERVAL);
+
+ LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
+ new LocatorLoadSnapshot.LoadHolder(serverLocation, 70, 8,
LOAD_POLL_INTERVAL);
+
+ loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId,
expectedLoadHolder.getLoad(),
+ expectedLoadHolder.getLoadPerConnection());
+
+ Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder>
serverLoadMap =
+ loadSnapshot.getGatewayReceiverLoadMap();
+ assertEquals(expectedLoadHolder.getLoad(),
+ serverLoadMap.get(servLocAndMemberId).getLoad(), 0);
+ assertEquals(expectedLoadHolder.getLoadPerConnection(),
+ serverLoadMap.get(servLocAndMemberId).getLoadPerConnection(), 0);
+ }
+
+ @Test
+ public void
updateConnectionMapWithServerLocationAndMemberIdTrafficConnectionAndGatewayReceiverGroup()
{
Review comment:
This test name could also be improved a bit to help make it clearer what
the test is doing.
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderConnectionLoadBalanceDistributedTest.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static
org.apache.geode.cache.server.CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
+import static
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class ParallelGatewaySenderConnectionLoadBalanceDistributedTest
implements Serializable {
+
+ @Rule
+ public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+ @Rule
+ public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+ private MemberVM locator1Site2;
+ private MemberVM locator1Site1;
+ private MemberVM locator2Site1;
+ private MemberVM server1Site1;
+ private MemberVM server2Site1;
+ private MemberVM server3Site1;
+ private MemberVM server1Site2;
+ private MemberVM server2Site2;
+
+ private ClientVM clientSite2;
+
+ private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+ private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+ private static final String REGION_NAME = "test1";
+ private static final int NUMBER_OF_DISPATCHER_THREADS = 20;
+ private static final int NUM_CONNECTION_PER_SERVER_OFFSET = 4;
+ private static final int LOAD_POLL_INTERVAL_OFFSET = 2000;
+
+ @Test
+ public void testGatewayConnectionCorrectlyLoadBalancedAtStartup() throws
Exception {
+ startWAN();
+ // Do put operations to initialize gateway-sender connections
+ startClientSite2(locator1Site2.getPort());
+ doPutsClientSite2(0, 500);
Review comment:
Rather than calling these three methods at the start of each test, you
could move the code in those methods to a `@Before` method, or just have a
`@Before` method that calls them.
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderConnectionLoadBalanceDistributedTest.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static
org.apache.geode.cache.server.CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
+import static
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class ParallelGatewaySenderConnectionLoadBalanceDistributedTest
implements Serializable {
+
+ @Rule
+ public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+ @Rule
+ public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+ private MemberVM locator1Site2;
+ private MemberVM locator1Site1;
+ private MemberVM locator2Site1;
+ private MemberVM server1Site1;
+ private MemberVM server2Site1;
+ private MemberVM server3Site1;
+ private MemberVM server1Site2;
+ private MemberVM server2Site2;
+
+ private ClientVM clientSite2;
+
+ private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+ private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+ private static final String REGION_NAME = "test1";
+ private static final int NUMBER_OF_DISPATCHER_THREADS = 20;
+ private static final int NUM_CONNECTION_PER_SERVER_OFFSET = 4;
Review comment:
This offset seems somewhat arbitrary. How was it determined?
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderConnectionLoadBalanceDistributedTest.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static
org.apache.geode.cache.server.CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
+import static
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class ParallelGatewaySenderConnectionLoadBalanceDistributedTest
implements Serializable {
+
+ @Rule
+ public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+ @Rule
+ public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+ private MemberVM locator1Site2;
+ private MemberVM locator1Site1;
+ private MemberVM locator2Site1;
+ private MemberVM server1Site1;
+ private MemberVM server2Site1;
+ private MemberVM server3Site1;
+ private MemberVM server1Site2;
+ private MemberVM server2Site2;
+
+ private ClientVM clientSite2;
+
+ private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+ private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+ private static final String REGION_NAME = "test1";
+ private static final int NUMBER_OF_DISPATCHER_THREADS = 20;
+ private static final int NUM_CONNECTION_PER_SERVER_OFFSET = 4;
+ private static final int LOAD_POLL_INTERVAL_OFFSET = 2000;
+
+ @Test
+ public void testGatewayConnectionCorrectlyLoadBalancedAtStartup() throws
Exception {
+ startWAN();
+ // Do put operations to initialize gateway-sender connections
+ startClientSite2(locator1Site2.getPort());
+ doPutsClientSite2(0, 500);
+
+ checkConnectionLoadBalancedOnServers(server1Site1, server2Site1,
server3Site1);
+ }
+
+ @Test
+ public void testGatewayConnLoadBalancedAfterCoordinatorLocatorShutdown()
throws Exception {
+ startWAN();
+ // Do put operations to initialize gateway-sender connections
+ startClientSite2(locator1Site2.getPort());
+ doPutsClientSite2(0, 400);
+ checkConnectionLoadBalancedOnServers(server1Site1, server2Site1,
server3Site1);
+
+
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.STOP_GATEWAYSENDER,
null);
+
+ locator1Site1.stop(true);
+
+ // Wait for default load-poll-interval plus offset to expire, so that all
servers send load
+ // to locator before continuing with the test
+ GeodeAwaitility.await().atLeast(DEFAULT_LOAD_POLL_INTERVAL +
LOAD_POLL_INTERVAL_OFFSET,
+ TimeUnit.MILLISECONDS);
+
+
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.START_GATEWAYSENDER,
null);
+
+ doPutsClientSite2(400, 800);
+
+ checkConnectionLoadBalancedOnServers(server1Site1, server2Site1,
server3Site1);
+ }
+
+ @Test
+ public void
testGatewayConnLoadBalancedAfterCoordinatorLocatorShutdownAndGatewayReceiverStopped()
+ throws Exception {
+ startWAN();
+ // Do put operations to initialize gateway-sender connections
+ startClientSite2(locator1Site2.getPort());
+ doPutsClientSite2(0, 400);
+ checkConnectionLoadBalancedOnServers(server1Site1, server2Site1,
server3Site1);
+
+
executeGatewayReceiverActionCommandAndValidateStateSite1(CliStrings.STOP_GATEWAYRECEIVER,
+ server1Site1);
+
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.STOP_GATEWAYSENDER,
+ server1Site2);
+ locator1Site1.stop(true);
+
+ // Wait for default load-poll-interval plus offset to expire, so that all
servers send load
+ // to locator before continuing with the test
+ GeodeAwaitility.await().atLeast(DEFAULT_LOAD_POLL_INTERVAL +
LOAD_POLL_INTERVAL_OFFSET,
+ TimeUnit.MILLISECONDS);
+
+
executeGatewayReceiverActionCommandAndValidateStateSite1(CliStrings.START_GATEWAYRECEIVER,
+ server1Site1);
+
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.START_GATEWAYSENDER,
+ server1Site2);
+ doPutsClientSite2(400, 800);
+
+ checkConnectionLoadBalancedOnServers(server1Site1, server2Site1,
server3Site1);
+ }
+
+ void executeGatewayReceiverActionCommandAndValidateStateSite1(String
cliString, MemberVM memberVM)
+ throws Exception {
+ connectGfshToSite(locator2Site1);
+ String command = new CommandStringBuilder(cliString)
+ .addOption(CliStrings.MEMBERS, getMember(memberVM.getVM()).toString())
+ .getCommandString();
+ gfsh.executeAndAssertThat(command).statusIsSuccess();
+
+ if (cliString.equals(CliStrings.STOP_GATEWAYRECEIVER)) {
+ memberVM.invoke(() -> verifyReceiverState(false));
+ locator2Site1.invoke(
+ () ->
validateGatewayReceiverMXBeanProxy(getMember(memberVM.getVM()), false));
+ } else if (cliString.equals(CliStrings.START_GATEWAYRECEIVER)) {
+ memberVM.invoke(() -> verifyReceiverState(true));
+ locator2Site1.invoke(
+ () ->
validateGatewayReceiverMXBeanProxy(getMember(memberVM.getVM()), true));
+ }
+ }
+
+ void executeGatewaySenderActionCommandAndValidateStateSite2(String
cliString, MemberVM memberVM)
+ throws Exception {
+ connectGfshToSite(locator1Site2);
+ String command;
+ if (memberVM == null) {
+ command = new CommandStringBuilder(cliString)
+ .addOption(CliStrings.STOP_GATEWAYSENDER__ID, "ln")
+ .getCommandString();
+ gfsh.executeAndAssertThat(command).statusIsSuccess();
+ verifyGatewaySenderState(server1Site2, isRunning(cliString));
+ verifyGatewaySenderState(server2Site2, isRunning(cliString));
+ } else {
+ command = new CommandStringBuilder(cliString)
+ .addOption(CliStrings.STOP_GATEWAYSENDER__ID, "ln")
+ .addOption(CliStrings.MEMBERS,
getMember(memberVM.getVM()).toString())
+ .getCommandString();
+ gfsh.executeAndAssertThat(command).statusIsSuccess();
+ verifyGatewaySenderState(memberVM, isRunning(cliString));
+ }
+ }
+
+ boolean isRunning(String cliString) {
+ return CliStrings.START_GATEWAYSENDER.equals(cliString);
+ }
+
+ void verifyGatewaySenderState(MemberVM memberVM, boolean isRunning) {
+ memberVM.invoke(() -> verifySenderState("ln", isRunning, false));
+ locator1Site2.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(memberVM.getVM()),
"ln", isRunning,
+ false));
+ }
+
+ int getGatewayReceiverStats() {
+ Set<GatewayReceiver> gatewayReceivers =
ClusterStartupRule.getCache().getGatewayReceivers();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
+ CacheServerStats stats = ((CacheServerImpl)
receiver.getServer()).getAcceptor().getStats();
+ return stats.getCurrentClientConnections();
+ }
+
+ void checkConnectionLoadBalancedOnServers(MemberVM... members) {
+ int numberOfConnections = members[0].invoke(this::getGatewayReceiverStats);
+
+ for (MemberVM memberVM : members) {
+ await().untilAsserted(() ->
assertThat(memberVM.invoke(this::getGatewayReceiverStats))
+ .isLessThan(numberOfConnections + NUM_CONNECTION_PER_SERVER_OFFSET));
+ await().untilAsserted(() ->
assertThat(memberVM.invoke(this::getGatewayReceiverStats))
+ .isGreaterThan(numberOfConnections -
NUM_CONNECTION_PER_SERVER_OFFSET));
+ }
+ }
+
+ void startWAN() throws Exception {
+ Properties props = new Properties();
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE1);
+ locator1Site1 = clusterStartupRule.startLocatorVM(1, props);
+ locator2Site1 = clusterStartupRule.startLocatorVM(7, props,
locator1Site1.getPort());
+
+ // start servers for site #1
+ server1Site1 =
+ clusterStartupRule.startServerVM(3, locator1Site1.getPort(),
locator2Site1.getPort());
+ server2Site1 =
+ clusterStartupRule.startServerVM(4, locator1Site1.getPort(),
locator2Site1.getPort());
+ server3Site1 =
+ clusterStartupRule.startServerVM(6, locator1Site1.getPort(),
locator2Site1.getPort());
+
+ connectGfshToSite(locator1Site1);
+
+ // create partition region on site #1
+ CommandStringBuilder regionCmd = new
CommandStringBuilder(CliStrings.CREATE_REGION);
+ regionCmd.addOption(CliStrings.CREATE_REGION__REGION, REGION_NAME);
+ regionCmd.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION");
+ regionCmd.addOption(CliStrings.CREATE_REGION__REDUNDANTCOPIES, "1");
+
+ gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess();
+
+ String csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYRECEIVER)
+ .addOption(CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS, "localhost")
+ .getCommandString();
+
+ gfsh.executeAndAssertThat(csb).statusIsSuccess();
+
+ server1Site1.invoke(() -> verifyReceiverState(true));
+ locator2Site1.invoke(
+ () ->
validateGatewayReceiverMXBeanProxy(getMember(server1Site1.getVM()), true));
+ locator1Site1.invoke(
+ () ->
validateGatewayReceiverMXBeanProxy(getMember(server1Site1.getVM()), true));
+
+ server2Site1.invoke(() -> verifyReceiverState(true));
+ locator2Site1.invoke(
+ () ->
validateGatewayReceiverMXBeanProxy(getMember(server2Site1.getVM()), true));
+ locator1Site1.invoke(
+ () ->
validateGatewayReceiverMXBeanProxy(getMember(server2Site1.getVM()), true));
+
+ server3Site1.invoke(() -> verifyReceiverState(true));
+ locator2Site1.invoke(
+ () ->
validateGatewayReceiverMXBeanProxy(getMember(server3Site1.getVM()), true));
+ locator1Site1.invoke(
+ () ->
validateGatewayReceiverMXBeanProxy(getMember(server3Site1.getVM()), true));
+
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE2);
+ props.setProperty(REMOTE_LOCATORS,
+ "localhost[" + locator1Site1.getPort() + "],localhost[" +
locator2Site1.getPort() + "]");
+ locator1Site2 = clusterStartupRule.startLocatorVM(2, props);
+
+ // start servers for site #2
+ server1Site2 = clusterStartupRule.startServerVM(5,
locator1Site2.getPort());
+ server2Site2 = clusterStartupRule.startServerVM(9,
locator1Site2.getPort());
+
+ // create parallel gateway-sender on site #2
+ connectGfshToSite(locator1Site2);
+ String command = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER)
+ .addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ln")
+ .addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID,
"1")
+ .addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true")
+ .addOption(CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS,
+ "" + NUMBER_OF_DISPATCHER_THREADS)
+ .addOption(CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY, "key")
+ .getCommandString();
+ gfsh.executeAndAssertThat(command).statusIsSuccess();
+
+ server1Site2.invoke(() -> verifySenderState("ln", true, false));
+ server2Site2.invoke(() -> verifySenderState("ln", true, false));
+ locator1Site2.invoke(
+ () ->
validateGatewaySenderMXBeanProxy(getMember(server1Site2.getVM()), "ln", true,
false));
+ locator1Site2.invoke(
+ () ->
validateGatewaySenderMXBeanProxy(getMember(server2Site2.getVM()), "ln", true,
false));
+
+ // create partition region on site #2
+ regionCmd = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ regionCmd.addOption(CliStrings.CREATE_REGION__REGION, REGION_NAME);
+ regionCmd.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION");
+ regionCmd.addOption(CliStrings.CREATE_REGION__GATEWAYSENDERID, "ln");
+ regionCmd.addOption(CliStrings.CREATE_REGION__REDUNDANTCOPIES, "1");
+ gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess();
+ }
+
+ void startClientSite2(int locatorPort) throws Exception {
+ clientSite2 =
+ clusterStartupRule.startClientVM(8, c ->
c.withLocatorConnection(locatorPort));
+ clientSite2.invoke(() -> {
+ ClusterStartupRule.clientCacheRule.createProxyRegion(REGION_NAME);
+ });
+ }
+
+ void doPutsClientSite2(int starRange, int stopRange) {
Review comment:
Typo here, should be "startRange".
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderConnectionLoadBalanceDistributedTest.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static
org.apache.geode.cache.server.CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
+import static
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class ParallelGatewaySenderConnectionLoadBalanceDistributedTest
implements Serializable {
+
+ @Rule
+ public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+ @Rule
+ public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+ private MemberVM locator1Site2;
+ private MemberVM locator1Site1;
+ private MemberVM locator2Site1;
+ private MemberVM server1Site1;
+ private MemberVM server2Site1;
+ private MemberVM server3Site1;
+ private MemberVM server1Site2;
+ private MemberVM server2Site2;
+
+ private ClientVM clientSite2;
+
+ private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+ private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+ private static final String REGION_NAME = "test1";
+ private static final int NUMBER_OF_DISPATCHER_THREADS = 20;
+ private static final int NUM_CONNECTION_PER_SERVER_OFFSET = 4;
+ private static final int LOAD_POLL_INTERVAL_OFFSET = 2000;
+
+ @Test
+ public void testGatewayConnectionCorrectlyLoadBalancedAtStartup() throws
Exception {
+ startWAN();
+ // Do put operations to initialize gateway-sender connections
+ startClientSite2(locator1Site2.getPort());
+ doPutsClientSite2(0, 500);
+
+ checkConnectionLoadBalancedOnServers(server1Site1, server2Site1,
server3Site1);
+ }
+
+ @Test
+ public void testGatewayConnLoadBalancedAfterCoordinatorLocatorShutdown()
throws Exception {
+ startWAN();
+ // Do put operations to initialize gateway-sender connections
+ startClientSite2(locator1Site2.getPort());
+ doPutsClientSite2(0, 400);
+ checkConnectionLoadBalancedOnServers(server1Site1, server2Site1,
server3Site1);
+
+
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.STOP_GATEWAYSENDER,
null);
+
+ locator1Site1.stop(true);
+
+ // Wait for default load-poll-interval plus offset to expire, so that all
servers send load
+ // to locator before continuing with the test
+ GeodeAwaitility.await().atLeast(DEFAULT_LOAD_POLL_INTERVAL +
LOAD_POLL_INTERVAL_OFFSET,
+ TimeUnit.MILLISECONDS);
+
+
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.START_GATEWAYSENDER,
null);
+
+ doPutsClientSite2(400, 800);
+
+ checkConnectionLoadBalancedOnServers(server1Site1, server2Site1,
server3Site1);
+ }
+
+ @Test
+ public void
testGatewayConnLoadBalancedAfterCoordinatorLocatorShutdownAndGatewayReceiverStopped()
+ throws Exception {
+ startWAN();
+ // Do put operations to initialize gateway-sender connections
+ startClientSite2(locator1Site2.getPort());
+ doPutsClientSite2(0, 400);
+ checkConnectionLoadBalancedOnServers(server1Site1, server2Site1,
server3Site1);
+
+
executeGatewayReceiverActionCommandAndValidateStateSite1(CliStrings.STOP_GATEWAYRECEIVER,
+ server1Site1);
+
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.STOP_GATEWAYSENDER,
+ server1Site2);
+ locator1Site1.stop(true);
+
+ // Wait for default load-poll-interval plus offset to expire, so that all
servers send load
+ // to locator before continuing with the test
+ GeodeAwaitility.await().atLeast(DEFAULT_LOAD_POLL_INTERVAL +
LOAD_POLL_INTERVAL_OFFSET,
+ TimeUnit.MILLISECONDS);
+
+
executeGatewayReceiverActionCommandAndValidateStateSite1(CliStrings.START_GATEWAYRECEIVER,
+ server1Site1);
+
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.START_GATEWAYSENDER,
+ server1Site2);
+ doPutsClientSite2(400, 800);
+
+ checkConnectionLoadBalancedOnServers(server1Site1, server2Site1,
server3Site1);
+ }
+
+ void executeGatewayReceiverActionCommandAndValidateStateSite1(String
cliString, MemberVM memberVM)
+ throws Exception {
+ connectGfshToSite(locator2Site1);
+ String command = new CommandStringBuilder(cliString)
+ .addOption(CliStrings.MEMBERS, getMember(memberVM.getVM()).toString())
+ .getCommandString();
+ gfsh.executeAndAssertThat(command).statusIsSuccess();
+
+ if (cliString.equals(CliStrings.STOP_GATEWAYRECEIVER)) {
+ memberVM.invoke(() -> verifyReceiverState(false));
+ locator2Site1.invoke(
+ () ->
validateGatewayReceiverMXBeanProxy(getMember(memberVM.getVM()), false));
+ } else if (cliString.equals(CliStrings.START_GATEWAYRECEIVER)) {
+ memberVM.invoke(() -> verifyReceiverState(true));
+ locator2Site1.invoke(
+ () ->
validateGatewayReceiverMXBeanProxy(getMember(memberVM.getVM()), true));
+ }
+ }
+
+ void executeGatewaySenderActionCommandAndValidateStateSite2(String
cliString, MemberVM memberVM)
+ throws Exception {
+ connectGfshToSite(locator1Site2);
+ String command;
+ if (memberVM == null) {
+ command = new CommandStringBuilder(cliString)
+ .addOption(CliStrings.STOP_GATEWAYSENDER__ID, "ln")
+ .getCommandString();
+ gfsh.executeAndAssertThat(command).statusIsSuccess();
+ verifyGatewaySenderState(server1Site2, isRunning(cliString));
+ verifyGatewaySenderState(server2Site2, isRunning(cliString));
+ } else {
+ command = new CommandStringBuilder(cliString)
+ .addOption(CliStrings.STOP_GATEWAYSENDER__ID, "ln")
+ .addOption(CliStrings.MEMBERS,
getMember(memberVM.getVM()).toString())
+ .getCommandString();
+ gfsh.executeAndAssertThat(command).statusIsSuccess();
+ verifyGatewaySenderState(memberVM, isRunning(cliString));
+ }
+ }
+
+ boolean isRunning(String cliString) {
+ return CliStrings.START_GATEWAYSENDER.equals(cliString);
+ }
+
+ void verifyGatewaySenderState(MemberVM memberVM, boolean isRunning) {
+ memberVM.invoke(() -> verifySenderState("ln", isRunning, false));
+ locator1Site2.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(memberVM.getVM()),
"ln", isRunning,
+ false));
+ }
+
+ int getGatewayReceiverStats() {
+ Set<GatewayReceiver> gatewayReceivers =
ClusterStartupRule.getCache().getGatewayReceivers();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
+ CacheServerStats stats = ((CacheServerImpl)
receiver.getServer()).getAcceptor().getStats();
+ return stats.getCurrentClientConnections();
+ }
+
+ void checkConnectionLoadBalancedOnServers(MemberVM... members) {
+ int numberOfConnections = members[0].invoke(this::getGatewayReceiverStats);
+
+ for (MemberVM memberVM : members) {
+ await().untilAsserted(() ->
assertThat(memberVM.invoke(this::getGatewayReceiverStats))
+ .isLessThan(numberOfConnections + NUM_CONNECTION_PER_SERVER_OFFSET));
+ await().untilAsserted(() ->
assertThat(memberVM.invoke(this::getGatewayReceiverStats))
+ .isGreaterThan(numberOfConnections -
NUM_CONNECTION_PER_SERVER_OFFSET));
+ }
+ }
+
+ void startWAN() throws Exception {
+ Properties props = new Properties();
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE1);
+ locator1Site1 = clusterStartupRule.startLocatorVM(1, props);
+ locator2Site1 = clusterStartupRule.startLocatorVM(7, props,
locator1Site1.getPort());
+
+ // start servers for site #1
+ server1Site1 =
+ clusterStartupRule.startServerVM(3, locator1Site1.getPort(),
locator2Site1.getPort());
+ server2Site1 =
+ clusterStartupRule.startServerVM(4, locator1Site1.getPort(),
locator2Site1.getPort());
+ server3Site1 =
+ clusterStartupRule.startServerVM(6, locator1Site1.getPort(),
locator2Site1.getPort());
Review comment:
Small nitpick, but the VM numbers for the two sites seem kind of
arbitrary. Would it be possible to have site 1 use 0 -> 4 and site 2 use 5 -> 8?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]