GEODE-3072: Events do not get removed from the client queue for 1.0 clients
EventID and ThreadIdentifier hold the serialized form of portions of an InternalDistributedMember identifier. This serialized form changed in v1.0.0 and was causing .equals and .hashCode for these objects to return false when they should have returned true owing to additional data being in the serialized form of the identifier. This change set modifies the equals and hashCode methods of the classes to ensure that they return the correct results in the presence of this additional ID data. I created a dunit test to reproduce the problem but I think the behavior of HARegionQueues isn't reliable enough to check in that test. I'm afraid that it would end up being a "flaky" test that fails periodically. Instead, I'm relying on a new junit test and the test that Barry already checked in. Note: I've also included two other things in this change set. First, LocalRegion.dumpBackingMap() is a test-hook that should log its results at "info" level. I used it in debugging this problem. Second, I've added a new backward-compatibility version so now we have 100 and 110. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/ff6cbf31 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/ff6cbf31 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/ff6cbf31 Branch: refs/heads/feature/GEODE-3109 Commit: ff6cbf317ab24bb0e582a3eca01c23e7b435ef35 Parents: 718583b Author: Bruce Schuchardt <bschucha...@pivotal.io> Authored: Tue Jun 27 14:17:10 2017 -0700 Committer: Bruce Schuchardt <bschucha...@pivotal.io> Committed: Tue Jun 27 14:20:27 2017 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/AbstractRegionMap.java | 4 +- .../apache/geode/internal/cache/EventID.java | 60 ++++++++++++++++++-- .../internal/cache/ha/ThreadIdentifier.java | 18 +++--- .../cache/ha/ThreadIdentifierJUnitTest.java | 28 +++++++++ .../sockets/ClientServerMiscBCDUnitTest.java | 9 ++- geode-old-versions/build.gradle | 1 + 6 files changed, 102 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/ff6cbf31/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java index a1b4a9d..ece3de1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java @@ -3486,10 +3486,10 @@ public abstract class AbstractRegionMap implements RegionMap { } public void dumpMap() { - logger.debug("dump of concurrent map of size {} for region {}", this._getMap().size(), + logger.info("dump of concurrent map of size {} for region {}", this._getMap().size(), this._getOwner()); for (Iterator it = this._getMap().values().iterator(); it.hasNext();) { - logger.trace("dumpMap:" + it.next().toString()); + logger.info("dumpMap:" + it.next().toString()); } } http://git-wip-us.apache.org/repos/asf/geode/blob/ff6cbf31/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java index 4d2ddc1..55c89f1 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java @@ -388,18 +388,70 @@ public class EventID implements DataSerializableFixedID, Serializable, Externali return false; if (threadID != other.threadID) return false; - if (!Arrays.equals(membershipID, other.membershipID)) + return equalMembershipIds(membershipID, other.membershipID); + } + + /** GEODE_3072 - 1.0.0 client IDs contain a UUID and member-weight byte that are all zero */ + static final int NULL_90_MEMBER_DATA_LENGTH = 17; + + /** minimum length of an ID array */ + static final int MINIMIM_ID_LENGTH = 19; + + /** + * check to see if membership ID byte arrays are equal + */ + static public boolean equalMembershipIds(byte[] m1, byte[] m2) { + int sizeDifference = Math.abs(m1.length - m2.length); + if (sizeDifference != 0 && sizeDifference != NULL_90_MEMBER_DATA_LENGTH) { return false; + } + for (int i = 0; i < m1.length; i++) { + if (i >= m2.length) { + return nullUUIDCheck(m1, i); + } + if (m1[i] != m2[i]) { + return false; + } + } + if (m1.length != m2.length) { + return nullUUIDCheck(m2, m1.length); + } return true; } + /** + * GEODE-3072 - v1.0.0 memberIDs in EventIDs may have trailing bytes that should be ignored + */ + static private boolean nullUUIDCheck(byte[] memberID, int position) { + if (memberID.length - position != NULL_90_MEMBER_DATA_LENGTH) { + return false; + } + for (int i = position; i < memberID.length; i++) { + if (memberID[i] != 0) { + return false; + } + } + return true; + } + + /** + * form the hashcode for the memberID byte array + */ + static public int hashCodeMemberId(byte[] memberID) { + if (memberID.length < (NULL_90_MEMBER_DATA_LENGTH + MINIMIM_ID_LENGTH) + || !nullUUIDCheck(memberID, memberID.length - NULL_90_MEMBER_DATA_LENGTH)) { + return Arrays.hashCode(memberID); + } + byte[] newID = new byte[memberID.length - NULL_90_MEMBER_DATA_LENGTH]; + System.arraycopy(memberID, 0, newID, 0, newID.length); + return Arrays.hashCode(newID); + } - @Override public int hashCode() { if (hashCode == 0) { final int prime = 31; int result = 1; - result = prime * result + Arrays.hashCode(membershipID); + result = prime * result + hashCodeMemberId(membershipID); result = prime * result + (int) (sequenceID ^ (sequenceID >>> 32)); result = prime * result + (int) (threadID ^ (threadID >>> 32)); hashCode = result; @@ -448,7 +500,7 @@ public class EventID implements DataSerializableFixedID, Serializable, Externali } buf.append(";"); } else { - buf.append("["); + buf.append("id=").append(membershipID.length).append("bytes;"); } // buf.append(this.membershipID.toString()); buf.append("threadID="); http://git-wip-us.apache.org/repos/asf/geode/blob/ff6cbf31/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java index ec165a5..93a25d7 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java @@ -21,10 +21,14 @@ import java.io.DataOutput; import java.io.IOException; import java.util.Arrays; +import org.apache.logging.log4j.Logger; + import org.apache.geode.DataSerializable; import org.apache.geode.DataSerializer; +import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.logging.LogService; /** * Class identifying a Thread uniquely across the distributed system. It is composed of two fields @@ -151,21 +155,16 @@ public class ThreadIdentifier implements DataSerializable { if ((obj == null) || !(obj instanceof ThreadIdentifier)) { return false; } - return (this.threadID == ((ThreadIdentifier) obj).threadID - && Arrays.equals(this.membershipID, ((ThreadIdentifier) obj).membershipID)); + ThreadIdentifier other = (ThreadIdentifier) obj; + return (this.threadID == other.threadID + && EventID.equalMembershipIds(this.membershipID, other.membershipID)); } - // TODO: Asif : Check this implementation @Override public int hashCode() { - int result = 17; final int mult = 37; - if (this.membershipID != null && this.membershipID.length > 0) { - for (int i = 0; i < this.membershipID.length; i++) { - result = mult * result + this.membershipID[i]; - } - } + int result = EventID.hashCodeMemberId(membershipID); result = mult * result + (int) this.threadID; result = mult * result + (int) (this.threadID >>> 32); @@ -198,6 +197,7 @@ public class ThreadIdentifier implements DataSerializable { StringBuilder sb = new StringBuilder(); sb.append("ThreadId["); + sb.append("id=").append(membershipID.length).append("bytes; "); sb.append(toDisplayString(threadID)); sb.append("]"); http://git-wip-us.apache.org/repos/asf/geode/blob/ff6cbf31/geode-core/src/test/java/org/apache/geode/internal/cache/ha/ThreadIdentifierJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/ThreadIdentifierJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/ThreadIdentifierJUnitTest.java index 29b22be..a5de844 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/ThreadIdentifierJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/ThreadIdentifierJUnitTest.java @@ -16,6 +16,12 @@ package org.apache.geode.internal.cache.ha; import static org.junit.Assert.*; +import java.net.InetAddress; + +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.Version; +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.test.junit.categories.ClientServerTest; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -27,6 +33,28 @@ import org.apache.geode.test.junit.categories.UnitTest; public class ThreadIdentifierJUnitTest { @Test + public void testEqualsIgnoresUUIDBytes() throws Exception { + InternalDistributedMember id = new InternalDistributedMember(InetAddress.getLocalHost(), 1234); + id.setVersionObjectForTest(Version.GFE_90); + byte[] memberIdBytes = EventID.getMembershipId(new ClientProxyMembershipID(id)); + byte[] memberIdBytesWithoutUUID = new byte[memberIdBytes.length - (2 * 8 + 1)];// UUID bytes + + // weight byte + System.arraycopy(memberIdBytes, 0, memberIdBytesWithoutUUID, 0, + memberIdBytesWithoutUUID.length); + ThreadIdentifier threadIdWithUUID = new ThreadIdentifier(memberIdBytes, 1); + ThreadIdentifier threadIdWithoutUUID = new ThreadIdentifier(memberIdBytesWithoutUUID, 1); + assertEquals(threadIdWithoutUUID, threadIdWithUUID); + assertEquals(threadIdWithUUID, threadIdWithoutUUID); + assertEquals(threadIdWithoutUUID.hashCode(), threadIdWithUUID.hashCode()); + + EventID eventIDWithUUID = new EventID(memberIdBytes, 1, 1); + EventID eventIDWithoutUUID = new EventID(memberIdBytesWithoutUUID, 1, 1); + assertEquals(eventIDWithUUID, eventIDWithoutUUID); + assertEquals(eventIDWithoutUUID, eventIDWithUUID); + assertEquals(eventIDWithoutUUID.hashCode(), eventIDWithUUID.hashCode()); + } + + @Test public void testPutAllId() { int id = 42; int bucketNumber = 113; http://git-wip-us.apache.org/repos/asf/geode/blob/ff6cbf31/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscBCDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscBCDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscBCDUnitTest.java index 5fb8fa2..bc48d97 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscBCDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscBCDUnitTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertTrue; import org.apache.geode.cache.Region; import org.apache.geode.cache.client.Pool; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.test.dunit.Host; @@ -96,8 +97,7 @@ public class ClientServerMiscBCDUnitTest extends ClientServerMiscDUnitTest { }); } - // @Test - @Ignore + @Test public void testDistributedMemberBytesWithCurrentServerAndOldClient() throws Exception { // Start current version server int serverPort = initServerCache(true); @@ -119,7 +119,10 @@ public class ClientServerMiscBCDUnitTest extends ClientServerMiscDUnitTest { server1.invoke(() -> getClientMembershipIdBytesOnServer()); // Verify member id bytes on client and server are equal - assertTrue(Arrays.equals(clientMembershipIdBytesOnClient, clientMembershipIdBytesOnServer)); + String complaint = "size on client=" + clientMembershipIdBytesOnClient.length + + "; size on server=" + clientMembershipIdBytesOnServer.length; + assertTrue(complaint, + Arrays.equals(clientMembershipIdBytesOnClient, clientMembershipIdBytesOnServer)); } private byte[] getClientMembershipIdBytesOnClient() { http://git-wip-us.apache.org/repos/asf/geode/blob/ff6cbf31/geode-old-versions/build.gradle ---------------------------------------------------------------------- diff --git a/geode-old-versions/build.gradle b/geode-old-versions/build.gradle index d85eb0b..c4ffa54 100644 --- a/geode-old-versions/build.gradle +++ b/geode-old-versions/build.gradle @@ -38,6 +38,7 @@ def addTestSource(def source, def geodeVersion) { // Add sourceSets for backwards compatibility, rolling upgrade, and // pdx testing. addTestSource('test100', '1.0.0-incubating') +addTestSource('test110', '1.1.0') addTestSource('test111', '1.1.1') def generatedResources = "$buildDir/generated-resources/main"