This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 5d2181d6a259035aaa77a73daa0dcbacefb56038
Author: Bruce Schuchardt <bschucha...@pivotal.io>
AuthorDate: Fri Jun 26 07:47:23 2020 -0700

    GEODE-8195: ConcurrentModificationException from 
LocatorMembershipListenerImpl (#5306)
    
    I've replaced the "for" loop using an implicit Iterator with one using an
    explicit Iterator so that its safe "remove()" method can be used.  The
    Iterator method is stated as being the only safe way to modify the
    collection while iterating over its contents.
    
    I've also modified a test to validate the fix.  The test forces a
    failure to send two messages to an address.  The failures are then
    handled in the code that was throwing the
    ConcurrentModificationException and, since there are two failures,
    it causes two removals to be performedon the failedMessages collection.
    
    (cherry picked from commit 3cda1b1a213f2195ff0b97361883f6a6c3972b14)
---
 .../locator/wan/LocatorMembershipListenerImpl.java | 14 ++--
 .../locator/wan/LocatorMembershipListenerTest.java | 88 ++++++++++++++++------
 2 files changed, 72 insertions(+), 30 deletions(-)

diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
index 31315ed..434231d 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
@@ -18,6 +18,7 @@ package org.apache.geode.cache.client.internal.locator.wan;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -291,15 +292,15 @@ public class LocatorMembershipListenerImpl implements 
LocatorMembershipListener
     public void run() {
       Map<DistributionLocatorId, Set<LocatorJoinMessage>> failedMessages = new 
HashMap<>();
       for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : 
remoteLocators.entrySet()) {
-        for (DistributionLocatorId value : entry.getValue()) {
+        for (DistributionLocatorId remoteLocator : entry.getValue()) {
           // Notify known remote locator about the advertised locator.
           LocatorJoinMessage advertiseNewLocatorMessage = new 
LocatorJoinMessage(
               joiningLocatorDistributedSystemId, joiningLocator, 
localLocatorId, "");
-          sendMessage(value, advertiseNewLocatorMessage, failedMessages);
+          sendMessage(remoteLocator, advertiseNewLocatorMessage, 
failedMessages);
 
           // Notify the advertised locator about remote known locator.
           LocatorJoinMessage advertiseKnownLocatorMessage =
-              new LocatorJoinMessage(entry.getKey(), value, localLocatorId, 
"");
+              new LocatorJoinMessage(entry.getKey(), remoteLocator, 
localLocatorId, "");
           sendMessage(joiningLocator, advertiseKnownLocatorMessage, 
failedMessages);
         }
       }
@@ -313,9 +314,11 @@ public class LocatorMembershipListenerImpl implements 
LocatorMembershipListener
             DistributionLocatorId targetLocator = entry.getKey();
             Set<LocatorJoinMessage> joinMessages = entry.getValue();
 
-            for (LocatorJoinMessage locatorJoinMessage : joinMessages) {
+            for (Iterator<LocatorJoinMessage> iterator = 
joinMessages.iterator(); iterator
+                .hasNext();) {
+              LocatorJoinMessage locatorJoinMessage = iterator.next();
               if (retryMessage(targetLocator, locatorJoinMessage, attempt)) {
-                joinMessages.remove(locatorJoinMessage);
+                iterator.remove();
               } else {
                 // Sleep between retries.
                 try {
@@ -324,6 +327,7 @@ public class LocatorMembershipListenerImpl implements 
LocatorMembershipListener
                   Thread.currentThread().interrupt();
                   logger.warn(
                       "Locator Membership listener permanently failed to 
exchange locator information due to interruption.");
+                  return;
                 }
               }
             }
diff --git 
a/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
 
b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
index f3770a6..b2b3c14 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
@@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -47,7 +48,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.contrib.java.lang.system.SystemOutRule;
 
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.tcpserver.TcpClient;
@@ -55,9 +58,13 @@ import 
org.apache.geode.internal.admin.remote.DistributionLocatorId;
 import org.apache.geode.test.junit.ResultCaptor;
 
 public class LocatorMembershipListenerTest {
+  public static final int TIMEOUT = 500;
   private TcpClient tcpClient;
   private LocatorMembershipListenerImpl locatorMembershipListener;
 
+  @Rule
+  public SystemOutRule systemOutRule = new SystemOutRule();
+
   private DistributionLocatorId buildDistributionLocatorId(int port) {
     return new DistributionLocatorId("localhost[" + port + "]");
   }
@@ -86,10 +93,10 @@ public class LocatorMembershipListenerTest {
       throws IOException, ClassNotFoundException {
     verify(tcpClient).requestToServer(initialTargetLocator.getHost(),
         new LocatorJoinMessage(advertisedLocatorDsId, advertisedLocator, 
sourceLocator, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
     verify(tcpClient).requestToServer(advertisedLocator.getHost(),
         new LocatorJoinMessage(initialTargetLocatorDsId, initialTargetLocator, 
sourceLocator, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
   }
 
   private void joinLocatorsDistributorThread(ResultCaptor<Thread> 
resultCaptor) {
@@ -103,7 +110,7 @@ public class LocatorMembershipListenerTest {
     DistributionConfig distributionConfig = mock(DistributionConfig.class);
     
when(distributionConfig.getStartLocator()).thenReturn(DistributionConfig.DEFAULT_START_LOCATOR);
     when(distributionConfig.getMemberTimeout())
-        .thenReturn(DistributionConfig.DEFAULT_MEMBER_TIMEOUT);
+        .thenReturn(TIMEOUT);
 
     tcpClient = mock(TcpClient.class);
     locatorMembershipListener = spy(new 
LocatorMembershipListenerImpl(tcpClient));
@@ -275,7 +282,7 @@ public class LocatorMembershipListenerTest {
     
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
     when(tcpClient.requestToServer(locator3Site1.getHost(),
         new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+        TIMEOUT, false))
             .thenThrow(new EOFException("Mock Exception"));
 
     ResultCaptor<Thread> resultCaptor = new ResultCaptor<>();
@@ -287,39 +294,70 @@ public class LocatorMembershipListenerTest {
     verify(tcpClient, times(LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS + 
1)).requestToServer(
         locator3Site1.getHost(),
         new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
     verify(tcpClient).requestToServer(joiningLocator.getHost(),
         new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
   }
 
   @Test
   public void 
locatorJoinedShouldNotRetryAgainAfterSuccessfulRetryOnConnectionFailures()
       throws IOException, ClassNotFoundException {
+    systemOutRule.enableLog();
     ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new 
ConcurrentHashMap<>();
+    DistributionLocatorId localLocatorID = buildDistributionLocatorId(10101);
     DistributionLocatorId joiningLocator = buildDistributionLocatorId(10102);
-    DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
-    DistributionLocatorId locator3Site1 = buildDistributionLocatorId(10103);
-    allLocatorsInfo.put(1, new 
HashSet<>(Collections.singletonList(locator3Site1)));
+    DistributionLocatorId remoteLocator1 = buildDistributionLocatorId(10103);
+    DistributionLocatorId remoteLocator2 = buildDistributionLocatorId(10104);
+    final HashSet<DistributionLocatorId> remoteLocators =
+        new HashSet<>(Arrays.asList(new DistributionLocatorId[] 
{remoteLocator1, remoteLocator2}));
+    allLocatorsInfo.put(1, remoteLocators);
     
when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
-    when(tcpClient.requestToServer(locator3Site1.getHost(),
-        new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+    // have messaging fail twice so that LocatorMembershipListenerImpl's 
retryMessage logic is
+    // exercised
+    when(tcpClient.requestToServer(remoteLocator1.getHost(),
+        new LocatorJoinMessage(1, joiningLocator, localLocatorID, ""),
+        TIMEOUT, false))
+            .thenThrow(new EOFException("Test Exception"))
+            .thenThrow(new EOFException("Test Exception"))
+            .thenReturn(null);
+    when(tcpClient.requestToServer(remoteLocator2.getHost(),
+        new LocatorJoinMessage(1, joiningLocator, localLocatorID, ""),
+        TIMEOUT, false))
+            .thenThrow(new EOFException("Test Exception"))
+            .thenThrow(new EOFException("Test Exception"))
+            .thenReturn(null);
+    when(tcpClient.requestToServer(joiningLocator.getHost(),
+        new LocatorJoinMessage(1, remoteLocator1, localLocatorID, ""),
+        TIMEOUT, false))
+            .thenThrow(new EOFException("Test Exception"))
+            .thenThrow(new EOFException("Test Exception"))
+            .thenReturn(null);
+    // also have the joining locator fail to receive messages so we can test 
that code path.
+    // It will fail to receive messages informing it of remoteLocator1 and 
remoteLocator2, so it
+    // will have
+    // two failed messages to retry. The others will each have one message to 
retry, informing
+    // them about the joiningLocator.
+    when(tcpClient.requestToServer(joiningLocator.getHost(),
+        new LocatorJoinMessage(1, remoteLocator2, localLocatorID, ""),
+        TIMEOUT, false))
+            .thenThrow(new EOFException("Mock Exception"))
             .thenThrow(new EOFException("Mock Exception"))
             .thenReturn(null);
 
     ResultCaptor<Thread> resultCaptor = new ResultCaptor<>();
     
doAnswer(resultCaptor).when(locatorMembershipListener).buildLocatorsDistributorThread(
         any(DistributionLocatorId.class), anyMap(), 
any(DistributionLocatorId.class), anyInt());
-    locatorMembershipListener.locatorJoined(1, joiningLocator, locator1Site1);
+    locatorMembershipListener.locatorJoined(1, joiningLocator, localLocatorID);
     joinLocatorsDistributorThread(resultCaptor);
 
-    verify(tcpClient, times(2)).requestToServer(locator3Site1.getHost(),
-        new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
-    verify(tcpClient).requestToServer(joiningLocator.getHost(),
-        new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+    
assertThat(systemOutRule.getLog()).doesNotContain("ConcurrentModificationException");
+
+    // The sendMessage loop in the listener will try to send 4 messages. Two 
to the remoteLocators
+    // and two to the joiningLocator. The retry loop will try to send the 
messages again and
+    // fail (4 more messages) and then it will succeed (4 more messages, for a 
total of 12).
+    verify(tcpClient, times(12)).requestToServer(isA(HostAndPort.class),
+        isA(LocatorJoinMessage.class), isA(Integer.class), isA(Boolean.class));
   }
 
   @Test
@@ -339,14 +377,14 @@ public class LocatorMembershipListenerTest {
     // Fail on first 2 attempts and succeed on third attempt.
     when(tcpClient.requestToServer(locator3Site1.getHost(),
         new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+        TIMEOUT, false))
             .thenThrow(new EOFException("Mock Exception"))
             .thenThrow(new EOFException("Mock Exception")).thenReturn(null);
 
     // Fail always.
     when(tcpClient.requestToServer(joiningLocator.getHost(),
         new LocatorJoinMessage(3, locator1Site3, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+        TIMEOUT, false))
             .thenThrow(new EOFException("Mock Exception"));
 
     ResultCaptor<Thread> resultCaptor = new ResultCaptor<>();
@@ -358,17 +396,17 @@ public class LocatorMembershipListenerTest {
     verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 2, 
locator1Site2);
     verify(tcpClient, times(3)).requestToServer(locator3Site1.getHost(),
         new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
     verify(tcpClient).requestToServer(joiningLocator.getHost(),
         new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
     verify(tcpClient).requestToServer(locator1Site3.getHost(),
         new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
     verify(tcpClient, times(LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS + 
1)).requestToServer(
         joiningLocator.getHost(),
         new LocatorJoinMessage(3, locator1Site3, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
   }
 
   private static class HandlerCallable implements Callable<Object> {

Reply via email to