Repository: incubator-geode
Updated Branches:
  refs/heads/develop 86ab7cda3 -> 39e94bc8b


GEODE-1178 Unexpected DistributedSystemDisconnectedException caused by 
RejectedExecutionException

This has been reported to JGroups.  While they're deciding what to do about
it I have coded a workaround in our StatRecorder class.  StatRecorder sits
in the JGroups stack just above the transport protocol that is throwing this
exception from its down() method.  StatRecorder will now catch the exception
and, after sleeping a short amount of time (10ms) it will retry as long as
the Manager is not shutting down.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/39e94bc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/39e94bc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/39e94bc8

Branch: refs/heads/develop
Commit: 39e94bc8beb22b62ed727640bbad3511affc9923
Parents: 86ab7cd
Author: Bruce Schuchardt <bschucha...@pivotal.io>
Authored: Tue Apr 12 10:43:26 2016 -0700
Committer: Bruce Schuchardt <bschucha...@pivotal.io>
Committed: Tue Apr 12 10:45:40 2016 -0700

----------------------------------------------------------------------
 .../internal/membership/gms/Services.java       | 23 +++++--
 .../membership/gms/fd/GMSHealthMonitor.java     |  1 -
 .../gms/messenger/JGroupsMessenger.java         |  2 +-
 .../membership/gms/messenger/StatRecorder.java  | 30 +++++++--
 .../gms/membership/StatRecorderJUnitTest.java   | 67 ++++++++++++++------
 5 files changed, 93 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/39e94bc8/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
index 4484c00..4f5a1a4 100755
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
@@ -102,6 +102,19 @@ public class Services {
   }
   
 
+  /**
+   * for testing only - create a non-functional Services object with a Stopper
+   */
+  public Services() {
+    this.cancelCriterion = new Stopper();
+    this.stats = null;
+    this.config = null;
+    this.manager = null;
+    this.joinLeave = null;
+    this.healthMon = null;
+    this.messenger = null;
+    this.auth = null;
+  }
 
   public Services(
       DistributedMembershipListener listener, DistributionConfig config,
@@ -348,10 +361,10 @@ public class Services {
   public boolean isAutoReconnectEnabled() {
     return !getConfig().getDistributionConfig().getDisableAutoReconnect();
   }
-  
+
   public class Stopper extends CancelCriterion {
     volatile String reasonForStopping = null;
-    
+
     public void cancel(String reason) {
       this.reasonForStopping = reason;
     }
@@ -362,7 +375,7 @@ public class Services {
         return Services.this.shutdownCause.toString();
       return reasonForStopping;
     }
-    
+
     public boolean isCancelInProgress() {
       return cancelInProgress() != null;
     }
@@ -381,7 +394,7 @@ public class Services {
         }
       }
     }
-    
+
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/39e94bc8/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 510c5a8..5427d77 100755
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -1190,7 +1190,6 @@ public class GMSHealthMonitor implements HealthMonitor, 
MessageHandler {
    * @param initiator
    * @param sMembers
    * @param cv
-   * @param initiateRemoval
    */
   private void checkIfAvailable(final InternalDistributedMember initiator,
       List<SuspectRequest> sMembers, final NetView cv) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/39e94bc8/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 2dfeeaa..a9abea3 100755
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -293,7 +293,7 @@ public class JGroupsMessenger implements Messenger {
     // give the stats to the jchannel statistics recorder
     StatRecorder sr = 
(StatRecorder)myChannel.getProtocolStack().findProtocol(StatRecorder.class);
     if (sr != null) {
-      sr.setDMStats(services.getStatistics());
+      sr.setServices(services);
     }
     
     Transport transport = 
(Transport)myChannel.getProtocolStack().getTransport();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/39e94bc8/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
index e29d71e..e013de6 100755
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
@@ -32,6 +32,8 @@ import com.gemstone.gemfire.distributed.internal.DMStats;
 import com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil;
 import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
 
+import java.util.concurrent.RejectedExecutionException;
+
 /**
  * JGroups doesn't capture quite the stats we want so this protocol is
  * inserted into the stack to gather the missing ones.
@@ -46,17 +48,19 @@ public class StatRecorder extends Protocol {
   private static final int INCOMING = 1;
   
   DMStats stats;
+  Services services;
   
   private final short nakackHeaderId = 
ClassConfigurator.getProtocolId(NAKACK2.class);
   private final short unicastHeaderId = 
ClassConfigurator.getProtocolId(UNICAST3.class);
   private final short frag2HeaderId = 
ClassConfigurator.getProtocolId(FRAG2.class);
   
   /**
-   * set the statistics object to modify when events are detected
-   * @param stats
+   * sets the services object of the GMS that is using this recorder
+   * @param services the Services collective of the GMS
    */
-  public void setDMStats(DMStats stats) {
-    this.stats = stats;
+  public void setServices(Services services) {
+    this.services = services;
+    this.stats = services.getStatistics();
   }
   
   @Override
@@ -81,7 +85,23 @@ public class StatRecorder extends Protocol {
       filter(msg, OUTGOING);
       break;
     }
-    return down_prot.down(evt);
+    do {
+      try {
+        return down_prot.down(evt);
+      } catch (RejectedExecutionException e) {
+        logger.debug("retrying JGroups message transmission due to rejected 
execution (GEODE-1178)");
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ie) {
+          // down() does not throw InterruptedException so we can only set the 
interrupt flag and return
+          Thread.currentThread().interrupt();
+          return null;
+        }
+      }
+    } while (services != null
+      && !services.getManager().shutdownInProgress()
+      && !services.getCancelCriterion().isCancelInProgress());
+    return null;
   }
   
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/39e94bc8/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java
 
b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java
index 91d4a57..b7b80ac 100755
--- 
a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java
+++ 
b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java
@@ -16,33 +16,34 @@
  */
 package com.gemstone.gemfire.distributed.internal.membership.gms.membership;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Properties;
-
-import org.jgroups.Event;
-import org.jgroups.Message;
-import org.jgroups.protocols.UNICAST3.Header;
-import org.jgroups.protocols.pbcast.NakAckHeader2;
-import org.jgroups.stack.Protocol;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
 import com.gemstone.gemfire.distributed.internal.DistributionManager;
 import 
com.gemstone.gemfire.distributed.internal.LonerDistributionManager.DummyDMStats;
 import com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig;
 import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
+import 
com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
 import 
com.gemstone.gemfire.distributed.internal.membership.gms.messenger.JGroupsMessenger;
 import 
com.gemstone.gemfire.distributed.internal.membership.gms.messenger.StatRecorder;
 import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
 import com.gemstone.gemfire.test.junit.categories.UnitTest;
+import org.jgroups.Event;
+import org.jgroups.Message;
+import org.jgroups.protocols.UNICAST3.Header;
+import org.jgroups.protocols.pbcast.NakAckHeader2;
+import org.jgroups.stack.Protocol;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Properties;
+import java.util.concurrent.RejectedExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.*;
 
 /**
  * This class tests the GMS StatRecorder class, which records JGroups
@@ -53,14 +54,17 @@ public class StatRecorderJUnitTest {
   Protocol mockDownProtocol, mockUpProtocol;
   StatRecorder recorder;
   MyStats stats = new MyStats();
+  Services services;
   
   @Before
   public void initMocks() throws Exception {
     // create a StatRecorder that has mock up/down protocols and stats
     mockDownProtocol = mock(Protocol.class);
     mockUpProtocol = mock(Protocol.class);
+    services = mock(Services.class);
+    when(services.getStatistics()).thenReturn(stats);
     recorder = new StatRecorder();
-    recorder.setDMStats(stats);
+    recorder.setServices(services);
     recorder.setUpProtocol(mockUpProtocol);
     recorder.setDownProtocol(mockDownProtocol);
   }
@@ -94,6 +98,33 @@ public class StatRecorderJUnitTest {
         stats.ucastRetransmits == 1);
   }
 
+
+  @Test
+  public void recorderHandlesRejectedExecution() throws Exception {
+    Message msg = mock(Message.class);
+    
when(msg.getHeader(any(Short.class))).thenReturn(Header.createDataHeader(1L, 
(short)1, true));
+    when(msg.size()).thenReturn(150L);
+
+
+    // GEODE-1178, the TP protocol may throw a RejectedExecutionException & 
StatRecorder should retry
+    when(mockDownProtocol.down(any(Event.class))).thenThrow(new 
RejectedExecutionException());
+
+    // after the first down() throws an exception we want StatRecorder to 
retry, so
+    // we set the Manager to say no shutdown is in progress the first time and 
then say
+    // one IS in progress so we can break out of the StatRecorder exception 
handling loop
+    when(services.getCancelCriterion()).thenReturn(new 
Services().getCancelCriterion());
+    Manager manager = mock(Manager.class);
+    when(services.getManager()).thenReturn(manager);
+    when(manager.shutdownInProgress()).thenReturn(Boolean.FALSE, Boolean.TRUE);
+
+    verify(mockDownProtocol, never()).down(isA(Event.class));
+
+    Event evt = new Event(Event.MSG, msg);
+    recorder.down(evt);
+
+    verify(mockDownProtocol, times(2)).down(isA(Event.class));
+  }
+
   /**
    * ensure that multicast events are recorded in DMStats
    */

Reply via email to