Repository: incubator-geode Updated Branches: refs/heads/develop 86ab7cda3 -> 39e94bc8b
GEODE-1178 Unexpected DistributedSystemDisconnectedException caused by RejectedExecutionException This has been reported to JGroups. While they're deciding what to do about it I have coded a workaround in our StatRecorder class. StatRecorder sits in the JGroups stack just above the transport protocol that is throwing this exception from its down() method. StatRecorder will now catch the exception and, after sleeping a short amount of time (10ms) it will retry as long as the Manager is not shutting down. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/39e94bc8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/39e94bc8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/39e94bc8 Branch: refs/heads/develop Commit: 39e94bc8beb22b62ed727640bbad3511affc9923 Parents: 86ab7cd Author: Bruce Schuchardt <bschucha...@pivotal.io> Authored: Tue Apr 12 10:43:26 2016 -0700 Committer: Bruce Schuchardt <bschucha...@pivotal.io> Committed: Tue Apr 12 10:45:40 2016 -0700 ---------------------------------------------------------------------- .../internal/membership/gms/Services.java | 23 +++++-- .../membership/gms/fd/GMSHealthMonitor.java | 1 - .../gms/messenger/JGroupsMessenger.java | 2 +- .../membership/gms/messenger/StatRecorder.java | 30 +++++++-- .../gms/membership/StatRecorderJUnitTest.java | 67 ++++++++++++++------ 5 files changed, 93 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/39e94bc8/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java index 4484c00..4f5a1a4 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java @@ -102,6 +102,19 @@ public class Services { } + /** + * for testing only - create a non-functional Services object with a Stopper + */ + public Services() { + this.cancelCriterion = new Stopper(); + this.stats = null; + this.config = null; + this.manager = null; + this.joinLeave = null; + this.healthMon = null; + this.messenger = null; + this.auth = null; + } public Services( DistributedMembershipListener listener, DistributionConfig config, @@ -348,10 +361,10 @@ public class Services { public boolean isAutoReconnectEnabled() { return !getConfig().getDistributionConfig().getDisableAutoReconnect(); } - + public class Stopper extends CancelCriterion { volatile String reasonForStopping = null; - + public void cancel(String reason) { this.reasonForStopping = reason; } @@ -362,7 +375,7 @@ public class Services { return Services.this.shutdownCause.toString(); return reasonForStopping; } - + public boolean isCancelInProgress() { return cancelInProgress() != null; } @@ -381,7 +394,7 @@ public class Services { } } } - + } - + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/39e94bc8/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java index 510c5a8..5427d77 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java @@ -1190,7 +1190,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { * @param initiator * @param sMembers * @param cv - * @param initiateRemoval */ private void checkIfAvailable(final InternalDistributedMember initiator, List<SuspectRequest> sMembers, final NetView cv) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/39e94bc8/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java index 2dfeeaa..a9abea3 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java @@ -293,7 +293,7 @@ public class JGroupsMessenger implements Messenger { // give the stats to the jchannel statistics recorder StatRecorder sr = (StatRecorder)myChannel.getProtocolStack().findProtocol(StatRecorder.class); if (sr != null) { - sr.setDMStats(services.getStatistics()); + sr.setServices(services); } Transport transport = (Transport)myChannel.getProtocolStack().getTransport(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/39e94bc8/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java index e29d71e..e013de6 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java @@ -32,6 +32,8 @@ import com.gemstone.gemfire.distributed.internal.DMStats; import com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil; import com.gemstone.gemfire.distributed.internal.membership.gms.Services; +import java.util.concurrent.RejectedExecutionException; + /** * JGroups doesn't capture quite the stats we want so this protocol is * inserted into the stack to gather the missing ones. @@ -46,17 +48,19 @@ public class StatRecorder extends Protocol { private static final int INCOMING = 1; DMStats stats; + Services services; private final short nakackHeaderId = ClassConfigurator.getProtocolId(NAKACK2.class); private final short unicastHeaderId = ClassConfigurator.getProtocolId(UNICAST3.class); private final short frag2HeaderId = ClassConfigurator.getProtocolId(FRAG2.class); /** - * set the statistics object to modify when events are detected - * @param stats + * sets the services object of the GMS that is using this recorder + * @param services the Services collective of the GMS */ - public void setDMStats(DMStats stats) { - this.stats = stats; + public void setServices(Services services) { + this.services = services; + this.stats = services.getStatistics(); } @Override @@ -81,7 +85,23 @@ public class StatRecorder extends Protocol { filter(msg, OUTGOING); break; } - return down_prot.down(evt); + do { + try { + return down_prot.down(evt); + } catch (RejectedExecutionException e) { + logger.debug("retrying JGroups message transmission due to rejected execution (GEODE-1178)"); + try { + Thread.sleep(10); + } catch (InterruptedException ie) { + // down() does not throw InterruptedException so we can only set the interrupt flag and return + Thread.currentThread().interrupt(); + return null; + } + } + } while (services != null + && !services.getManager().shutdownInProgress() + && !services.getCancelCriterion().isCancelInProgress()); + return null; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/39e94bc8/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java index 91d4a57..b7b80ac 100755 --- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java @@ -16,33 +16,34 @@ */ package com.gemstone.gemfire.distributed.internal.membership.gms.membership; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertEquals; - -import java.util.Properties; - -import org.jgroups.Event; -import org.jgroups.Message; -import org.jgroups.protocols.UNICAST3.Header; -import org.jgroups.protocols.pbcast.NakAckHeader2; -import org.jgroups.stack.Protocol; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - import com.gemstone.gemfire.distributed.internal.DistributionConfig; import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl; import com.gemstone.gemfire.distributed.internal.DistributionManager; import com.gemstone.gemfire.distributed.internal.LonerDistributionManager.DummyDMStats; import com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig; import com.gemstone.gemfire.distributed.internal.membership.gms.Services; +import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager; import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.JGroupsMessenger; import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.StatRecorder; import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig; import com.gemstone.gemfire.test.junit.categories.UnitTest; +import org.jgroups.Event; +import org.jgroups.Message; +import org.jgroups.protocols.UNICAST3.Header; +import org.jgroups.protocols.pbcast.NakAckHeader2; +import org.jgroups.stack.Protocol; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.Properties; +import java.util.concurrent.RejectedExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.*; /** * This class tests the GMS StatRecorder class, which records JGroups @@ -53,14 +54,17 @@ public class StatRecorderJUnitTest { Protocol mockDownProtocol, mockUpProtocol; StatRecorder recorder; MyStats stats = new MyStats(); + Services services; @Before public void initMocks() throws Exception { // create a StatRecorder that has mock up/down protocols and stats mockDownProtocol = mock(Protocol.class); mockUpProtocol = mock(Protocol.class); + services = mock(Services.class); + when(services.getStatistics()).thenReturn(stats); recorder = new StatRecorder(); - recorder.setDMStats(stats); + recorder.setServices(services); recorder.setUpProtocol(mockUpProtocol); recorder.setDownProtocol(mockDownProtocol); } @@ -94,6 +98,33 @@ public class StatRecorderJUnitTest { stats.ucastRetransmits == 1); } + + @Test + public void recorderHandlesRejectedExecution() throws Exception { + Message msg = mock(Message.class); + when(msg.getHeader(any(Short.class))).thenReturn(Header.createDataHeader(1L, (short)1, true)); + when(msg.size()).thenReturn(150L); + + + // GEODE-1178, the TP protocol may throw a RejectedExecutionException & StatRecorder should retry + when(mockDownProtocol.down(any(Event.class))).thenThrow(new RejectedExecutionException()); + + // after the first down() throws an exception we want StatRecorder to retry, so + // we set the Manager to say no shutdown is in progress the first time and then say + // one IS in progress so we can break out of the StatRecorder exception handling loop + when(services.getCancelCriterion()).thenReturn(new Services().getCancelCriterion()); + Manager manager = mock(Manager.class); + when(services.getManager()).thenReturn(manager); + when(manager.shutdownInProgress()).thenReturn(Boolean.FALSE, Boolean.TRUE); + + verify(mockDownProtocol, never()).down(isA(Event.class)); + + Event evt = new Event(Event.MSG, msg); + recorder.down(evt); + + verify(mockDownProtocol, times(2)).down(isA(Event.class)); + } + /** * ensure that multicast events are recorded in DMStats */