This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 2ac3de7 GEODE-7921: NullPointerExceptions logged during auto-reconnect (#4898) 2ac3de7 is described below commit 2ac3de788502d1a40189bed06ce86779ee47bdf8 Author: Bruce Schuchardt <bschucha...@pivotal.io> AuthorDate: Fri Apr 3 08:24:34 2020 -0700 GEODE-7921: NullPointerExceptions logged during auto-reconnect (#4898) Don't deliver cache-level messages that were queued while disconnected for auto-reconnect. During auto-reconnect there is a QuorumChecker that receives messages on the jgroups channel and tries to establish communications with a quorum of the old membership view. It may also get JoinRequest messages and other membership-level messages but I observed one case where it also queued cache-level messages when the property disable-tcp was set to true (funnelling all comms through jgroups). I also added some null checks to the LatestLastAccessTimeMessage and a small test for that. --- .../cache/LatestLastAccessTimeMessage.java | 26 +++++++----- .../cache/LatestLastAccessTimeMessageTest.java | 43 +++++++++++++++++++ .../membership/gms/GMSMembershipJUnitTest.java | 30 -------------- .../internal/membership/gms/TestMessage.java | 48 ++++++++++++++++++++++ .../gms/messenger/JGroupsMessengerJUnitTest.java | 21 ++++++++++ .../membership/gms/messenger/JGroupsMessenger.java | 12 ++++-- 6 files changed, 136 insertions(+), 44 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java index cfaa24d..f012085 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java @@ -62,17 +62,23 @@ public class LatestLastAccessTimeMessage<K> extends PooledDistributionMessage @Override protected void process(ClusterDistributionManager dm) { long latestLastAccessTime = 0L; + InternalCache cache = dm.getCache(); + if (cache == null) { + return; + } InternalDistributedRegion region = - (InternalDistributedRegion) dm.getCache().getRegion(this.regionName); - if (region != null) { - RegionEntry entry = region.getRegionEntry(this.key); - if (entry != null) { - try { - latestLastAccessTime = entry.getLastAccessed(); - } catch (InternalStatisticsDisabledException ignored) { - // last access time is not available - } - } + (InternalDistributedRegion) cache.getRegion(this.regionName); + if (region == null) { + return; + } + RegionEntry entry = region.getRegionEntry(this.key); + if (entry == null) { + return; + } + try { + latestLastAccessTime = entry.getLastAccessed(); + } catch (InternalStatisticsDisabledException ignored) { + // last access time is not available } ReplyMessage.send(getSender(), this.processorId, latestLastAccessTime, dm); } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessageTest.java new file mode 100644 index 0000000..1ddd31e --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessageTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.mockito.Mockito.mock; + +import java.util.Collections; +import java.util.Set; + +import org.junit.Test; + +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.inet.LocalHostUtil; + +public class LatestLastAccessTimeMessageTest { + + @Test + public void processMessageShouldLookForNullCache() throws Exception { + final DistributionManager distributionManager = mock(DistributionManager.class); + final LatestLastAccessTimeReplyProcessor replyProcessor = + mock(LatestLastAccessTimeReplyProcessor.class); + final InternalDistributedRegion region = mock(InternalDistributedRegion.class); + Set<InternalDistributedMember> recipients = Collections.singleton(new InternalDistributedMember( + LocalHostUtil.getLocalHost(), 1234)); + final LatestLastAccessTimeMessage<String> lastAccessTimeMessage = + new LatestLastAccessTimeMessage<>(replyProcessor, recipients, region, "foo"); + lastAccessTimeMessage.process(mock(ClusterDistributionManager.class)); + } +} diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java index 0e19f70..49b8b3a 100644 --- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java +++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java @@ -28,9 +28,6 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -61,11 +58,7 @@ import org.apache.geode.distributed.internal.membership.gms.Services.Stopper; import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor; import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave; import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger; -import org.apache.geode.distributed.internal.membership.gms.messages.AbstractGMSMessage; import org.apache.geode.internal.serialization.DSFIDSerializer; -import org.apache.geode.internal.serialization.DeserializationContext; -import org.apache.geode.internal.serialization.SerializationContext; -import org.apache.geode.internal.serialization.Version; import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl; import org.apache.geode.test.junit.categories.MembershipTest; @@ -335,27 +328,4 @@ public class GMSMembershipJUnitTest { assertThat(spy.getStartupEvents()).isEmpty(); } - public static class TestMessage extends AbstractGMSMessage { - - @Override - public int getDSFID() { - return HIGH_PRIORITY_ACKED_MESSAGE; - } - - @Override - public void toData(DataOutput out, SerializationContext context) throws IOException { - - } - - @Override - public void fromData(DataInput in, DeserializationContext context) - throws IOException, ClassNotFoundException { - - } - - @Override - public Version[] getSerializationVersions() { - return null; - } - } } diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/TestMessage.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/TestMessage.java new file mode 100644 index 0000000..8da2dac --- /dev/null +++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/TestMessage.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.distributed.internal.membership.gms; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.geode.distributed.internal.membership.gms.messages.AbstractGMSMessage; +import org.apache.geode.internal.serialization.DeserializationContext; +import org.apache.geode.internal.serialization.SerializationContext; +import org.apache.geode.internal.serialization.Version; + +public class TestMessage extends AbstractGMSMessage { + + @Override + public int getDSFID() { + return HIGH_PRIORITY_ACKED_MESSAGE; + } + + @Override + public void toData(DataOutput out, SerializationContext context) throws IOException { + + } + + @Override + public void fromData(DataInput in, DeserializationContext context) + throws IOException, ClassNotFoundException { + + } + + @Override + public Version[] getSerializationVersions() { + return null; + } +} diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java index 0e0748d..fe52eb8 100755 --- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java +++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java @@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -61,6 +62,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedException; @@ -74,6 +76,7 @@ import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView; import org.apache.geode.distributed.internal.membership.gms.MemberIdentifierImpl; import org.apache.geode.distributed.internal.membership.gms.Services; import org.apache.geode.distributed.internal.membership.gms.Services.Stopper; +import org.apache.geode.distributed.internal.membership.gms.TestMessage; import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor; import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave; import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager; @@ -823,6 +826,24 @@ public class JGroupsMessengerJUnitTest { assertTrue(pinger.isPongMessage(m.getBuffer())); } + /** + * messages for the Manager that were queued by a quorum checker shouldn't be delivered to + * a Manager + */ + @Test + public void testIgnoreManagerMessagesFromQuorumChecker() throws Exception { + initMocks(false); + MemberIdentifier memberIdentifier = createAddress(8888); + JGAddress jgAddress = new JGAddress(memberIdentifier); + + ArgumentCaptor<Message> valueCapture = ArgumentCaptor.forClass(Message.class); + doNothing().when(manager).processMessage(valueCapture.capture()); + org.jgroups.Message jgroupsMessage = messenger.createJGMessage(new TestMessage(), jgAddress, + memberIdentifier, Version.CURRENT_ORDINAL); + messenger.jgroupsReceiver.receive(jgroupsMessage, true); + assertThat(valueCapture.getAllValues()).isEmpty(); + } + @Test public void testJGroupsIOExceptionHandler() throws Exception { initMocks(false); diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java index ca53906..da67d36 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java @@ -82,6 +82,7 @@ import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView; import org.apache.geode.distributed.internal.membership.gms.GMSUtil; import org.apache.geode.distributed.internal.membership.gms.Services; import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor; +import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager; import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler; import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger; import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorRequest; @@ -178,7 +179,7 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger< * The JGroupsReceiver is handed messages by the JGroups Channel. It is responsible * for deserializating and dispatching those messages to the appropriate handler */ - private JGroupsReceiver jgroupsReceiver; + protected JGroupsReceiver jgroupsReceiver; public static void setChannelReceiver(JChannel channel, Receiver r) { try { @@ -1266,7 +1267,7 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger< receive(jgmsg, false); } - private void receive(org.jgroups.Message jgmsg, boolean fromQuorumChecker) { + protected void receive(org.jgroups.Message jgmsg, boolean fromQuorumChecker) { long startTime = services.getStatistics().startUDPDispatchRequest(); try { if (services.getManager().shutdownInProgress()) { @@ -1320,9 +1321,12 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger< } filterIncomingMessage(msg); MessageHandler<Message<ID>> handler = getMessageHandler(msg); - if (fromQuorumChecker && handler instanceof HealthMonitor) { + if (fromQuorumChecker + && (handler instanceof HealthMonitor || handler instanceof Manager)) { // ignore suspect / heartbeat messages that happened during - // auto-reconnect because they very likely have old member IDs in them + // auto-reconnect because they very likely have old member IDs in them. + // Also ignore non-membership messages because we weren't a member when we received + // them. } else { handler.processMessage(msg); }