pivotal-jbarrett commented on code in PR #7517:
URL: https://github.com/apache/geode/pull/7517#discussion_r858023952


##########
geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java:
##########
@@ -41,26 +43,46 @@ public class LiveServerPinger extends 
EndpointListenerAdapter {
   protected final InternalPool pool;
   protected final long pingIntervalNanos;
 
+  /**
+   * Initial delay offset time between LiveServerPinger tasks. Time in 
milliseconds.
+   */
+  public static final int OFFSET =
+      Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + 
"InitialServerPinger.OFFSET", 0);
+
+  private final AtomicInteger offsetIndex = new AtomicInteger(0);
+
+
   public LiveServerPinger(InternalPool pool) {
     this.pool = pool;
     pingIntervalNanos = ((pool.getPingInterval() + 1) / 2) * NANOS_PER_MS;

Review Comment:
   Replace time math with `TimeUnit.NANOSECONDS.toNano(...)`.



##########
geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java:
##########
@@ -41,26 +43,46 @@ public class LiveServerPinger extends 
EndpointListenerAdapter {
   protected final InternalPool pool;
   protected final long pingIntervalNanos;
 
+  /**
+   * Initial delay offset time between LiveServerPinger tasks. Time in 
milliseconds.
+   */
+  public static final int OFFSET =
+      Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + 
"InitialServerPinger.OFFSET", 0);
+
+  private final AtomicInteger offsetIndex = new AtomicInteger(0);
+
+
   public LiveServerPinger(InternalPool pool) {
     this.pool = pool;
     pingIntervalNanos = ((pool.getPingInterval() + 1) / 2) * NANOS_PER_MS;
   }
 
   @Override
   public void endpointCrashed(Endpoint endpoint) {
+    offsetIndex.set(0); // Reset counter
     cancelFuture(endpoint);
   }
 
   @Override
   public void endpointNoLongerInUse(Endpoint endpoint) {
+    offsetIndex.set(0); // Reset counter
     cancelFuture(endpoint);
   }
 
   @Override
   public void endpointNowInUse(Endpoint endpoint) {
     try {
+      // At each registration of new endpoint increase counter for calculation 
of initial delay
+      // offset
+      long initDelay = offsetIndex.getAndIncrement();
+      initDelay = (initDelay * OFFSET * NANOS_PER_MS) + pingIntervalNanos;

Review Comment:
   Replace time math here too.



##########
geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java:
##########
@@ -41,26 +43,46 @@ public class LiveServerPinger extends 
EndpointListenerAdapter {
   protected final InternalPool pool;
   protected final long pingIntervalNanos;
 
+  /**
+   * Initial delay offset time between LiveServerPinger tasks. Time in 
milliseconds.
+   */
+  public static final int OFFSET =
+      Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + 
"InitialServerPinger.OFFSET", 0);

Review Comment:
   I feel like we are lacking consistency here with property naming 
conventions. Maybe it is out of scope but it would be nice to get this under 
control. If the intention here was `ClassName.CONSTANT` then should this not be 
`LiveServerPinger.OFFSET`. Offset of what and in what units? 
`LiveServerPinger.INITIAL_DELAY_MULTIPLYER_IN_MILLISECONDS`. I say multiplier 
because below we calculate the `initDelay` by multiplying it with `OFFSET`. 



##########
geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java:
##########
@@ -214,6 +215,62 @@ public void 
testTwoSendersWithSameIdShouldUseSameValueForEnforceThreadsConnectTo
 
   }
 
+
+  /**
+   * The aim of this test is verify that when several gateway receivers in a 
remote site share the
+   * same port and hostname-for-senders, the pings sent from the gateway 
senders reach the right
+   * gateway receiver and not just any of the receivers. Check that only one 
additional connection
+   * is used.
+   */
+  @Test
+  public void 
testPingsToReceiversWithSamePortAndHostnameForSendersUseOnlyOneMoreConnection()
+      throws InterruptedException {
+    String senderId = "ln";
+    String regionName = "region-wan";
+    final int remoteLocPort = docker.getExternalPortForService("haproxy", 
20334);
+
+    int locPort = createLocator(VM.getVM(0), 1, remoteLocPort);
+
+    VM vm1 = VM.getVM(1);
+
+    vm1.invoke(() -> {
+      System.setProperty(
+          GeodeGlossary.GEMFIRE_PREFIX + "InitialServerPinger.OFFSET", "500");
+
+      Properties props = new Properties();
+      props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+      CacheFactory cacheFactory = new CacheFactory(props);
+      cache = cacheFactory.create();
+    });
+
+    createGatewaySender(vm1, senderId, 2, true, 5,
+        2, GatewaySender.DEFAULT_ORDER_POLICY);
+
+    createPartitionedRegion(vm1, regionName, senderId, 0, 10);
+
+    int NUM_PUTS = 10;
+
+    putKeyValues(vm1, NUM_PUTS, regionName);
+
+    await().untilAsserted(() -> assertThat(getQueuedEvents(vm1, 
senderId)).isEqualTo(0));
+
+
+    // Wait longer than the value set in the receivers for
+    // maximum-time-between-pings: 10000 (see geode-starter-create.gfsh)
+    // to verify that connections are not closed
+    // by the receivers because each has received the pings timely.
+    int maxTimeBetweenPingsInReceiver = 10000;
+    Thread.sleep(maxTimeBetweenPingsInReceiver);
+
+    int senderPoolDisconnects = getSenderPoolDisconnects(vm1, senderId);
+    assertEquals(0, senderPoolDisconnects);

Review Comment:
   Replace with AssertJ.



##########
geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java:
##########
@@ -41,26 +43,46 @@ public class LiveServerPinger extends 
EndpointListenerAdapter {
   protected final InternalPool pool;
   protected final long pingIntervalNanos;
 
+  /**
+   * Initial delay offset time between LiveServerPinger tasks. Time in 
milliseconds.
+   */
+  public static final int OFFSET =
+      Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + 
"InitialServerPinger.OFFSET", 0);
+
+  private final AtomicInteger offsetIndex = new AtomicInteger(0);
+
+
   public LiveServerPinger(InternalPool pool) {
     this.pool = pool;
     pingIntervalNanos = ((pool.getPingInterval() + 1) / 2) * NANOS_PER_MS;
   }
 
   @Override
   public void endpointCrashed(Endpoint endpoint) {
+    offsetIndex.set(0); // Reset counter
     cancelFuture(endpoint);
   }
 
   @Override
   public void endpointNoLongerInUse(Endpoint endpoint) {
+    offsetIndex.set(0); // Reset counter
     cancelFuture(endpoint);
   }
 
   @Override
   public void endpointNowInUse(Endpoint endpoint) {
     try {
+      // At each registration of new endpoint increase counter for calculation 
of initial delay
+      // offset
+      long initDelay = offsetIndex.getAndIncrement();
+      initDelay = (initDelay * OFFSET * NANOS_PER_MS) + pingIntervalNanos;

Review Comment:
   Also, this should be unit testable and all I see is a very high level 
acceptance test. Extract a method here that we can unit test.



##########
geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java:
##########
@@ -214,6 +215,62 @@ public void 
testTwoSendersWithSameIdShouldUseSameValueForEnforceThreadsConnectTo
 
   }
 
+
+  /**
+   * The aim of this test is verify that when several gateway receivers in a 
remote site share the
+   * same port and hostname-for-senders, the pings sent from the gateway 
senders reach the right
+   * gateway receiver and not just any of the receivers. Check that only one 
additional connection
+   * is used.
+   */
+  @Test
+  public void 
testPingsToReceiversWithSamePortAndHostnameForSendersUseOnlyOneMoreConnection()
+      throws InterruptedException {
+    String senderId = "ln";
+    String regionName = "region-wan";
+    final int remoteLocPort = docker.getExternalPortForService("haproxy", 
20334);
+
+    int locPort = createLocator(VM.getVM(0), 1, remoteLocPort);
+
+    VM vm1 = VM.getVM(1);
+
+    vm1.invoke(() -> {
+      System.setProperty(
+          GeodeGlossary.GEMFIRE_PREFIX + "InitialServerPinger.OFFSET", "500");
+
+      Properties props = new Properties();
+      props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+      CacheFactory cacheFactory = new CacheFactory(props);
+      cache = cacheFactory.create();
+    });
+
+    createGatewaySender(vm1, senderId, 2, true, 5,
+        2, GatewaySender.DEFAULT_ORDER_POLICY);
+
+    createPartitionedRegion(vm1, regionName, senderId, 0, 10);
+
+    int NUM_PUTS = 10;
+
+    putKeyValues(vm1, NUM_PUTS, regionName);
+
+    await().untilAsserted(() -> assertThat(getQueuedEvents(vm1, 
senderId)).isEqualTo(0));
+
+
+    // Wait longer than the value set in the receivers for
+    // maximum-time-between-pings: 10000 (see geode-starter-create.gfsh)
+    // to verify that connections are not closed
+    // by the receivers because each has received the pings timely.
+    int maxTimeBetweenPingsInReceiver = 10000;
+    Thread.sleep(maxTimeBetweenPingsInReceiver);
+
+    int senderPoolDisconnects = getSenderPoolDisconnects(vm1, senderId);
+    assertEquals(0, senderPoolDisconnects);
+
+    int poolEndPointSize = getSenderPoolConnects(vm1, senderId);

Review Comment:
   When asserting on stats from the `Statistics` types you should `await` on 
them. They aren't thread safe and my take some time to arrive at the expected 
result across all threads.



##########
geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java:
##########
@@ -214,6 +215,62 @@ public void 
testTwoSendersWithSameIdShouldUseSameValueForEnforceThreadsConnectTo
 
   }
 
+
+  /**
+   * The aim of this test is verify that when several gateway receivers in a 
remote site share the
+   * same port and hostname-for-senders, the pings sent from the gateway 
senders reach the right
+   * gateway receiver and not just any of the receivers. Check that only one 
additional connection
+   * is used.
+   */
+  @Test
+  public void 
testPingsToReceiversWithSamePortAndHostnameForSendersUseOnlyOneMoreConnection()
+      throws InterruptedException {
+    String senderId = "ln";
+    String regionName = "region-wan";
+    final int remoteLocPort = docker.getExternalPortForService("haproxy", 
20334);
+
+    int locPort = createLocator(VM.getVM(0), 1, remoteLocPort);
+
+    VM vm1 = VM.getVM(1);
+
+    vm1.invoke(() -> {
+      System.setProperty(
+          GeodeGlossary.GEMFIRE_PREFIX + "InitialServerPinger.OFFSET", "500");
+
+      Properties props = new Properties();
+      props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+      CacheFactory cacheFactory = new CacheFactory(props);
+      cache = cacheFactory.create();
+    });
+
+    createGatewaySender(vm1, senderId, 2, true, 5,
+        2, GatewaySender.DEFAULT_ORDER_POLICY);
+
+    createPartitionedRegion(vm1, regionName, senderId, 0, 10);
+
+    int NUM_PUTS = 10;
+
+    putKeyValues(vm1, NUM_PUTS, regionName);
+
+    await().untilAsserted(() -> assertThat(getQueuedEvents(vm1, 
senderId)).isEqualTo(0));
+
+
+    // Wait longer than the value set in the receivers for
+    // maximum-time-between-pings: 10000 (see geode-starter-create.gfsh)
+    // to verify that connections are not closed
+    // by the receivers because each has received the pings timely.
+    int maxTimeBetweenPingsInReceiver = 10000;
+    Thread.sleep(maxTimeBetweenPingsInReceiver);

Review Comment:
   Is there a way to use an `await` here to avoid this arbitrary sleep time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to