This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch feature/GEODE-7219 in repository https://gitbox.apache.org/repos/asf/geode.git
commit f4c807a0479b96bb1dfc38dd1ceb821c20e4529a Author: Bruce Schuchardt <bschucha...@pivotal.io> AuthorDate: Thu Sep 19 13:49:53 2019 -0700 GEODE-7219 BufferUnderflowException in PutReplyMessage deserialization VersionTag serialization was being affected by concurrent modification of its memberId/previousMemberId fields, causing the HAS_PREVIOUS_MEMBER_ID flag bit to be set and the DUPLICATE_MEMBER_IDS flag to _not_ be set. It then went on to perform the same checks later in toData() and make different decisions, winding up by not serializing the previousMemberId field because it was then == to the memberId field. 1) decide on what the flags are going to be in toData and do not perform the same calculations again. 2) use equals() when seeing if memberId and previousMemberId are equal. --- .../codeAnalysis/sanctionedDataSerializables.txt | 7 +- .../geode/internal/cache/versions/VersionTag.java | 34 +++-- .../cache/versions/AbstractVersionTagTestBase.java | 149 +++++++++++++++++++++ ...VersionTagTest.java => DiskVersionTagTest.java} | 12 +- .../internal/cache/versions/VMVersionTagTest.java | 6 + 5 files changed, 191 insertions(+), 17 deletions(-) diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index 486e2e2..6d3264c 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -970,9 +970,6 @@ org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage,2 fromData,177 toData,174 -org/apache/geode/internal/cache/DistributedRemoveAllOperation$RemoveAllEntryData,1 -toData,162 - org/apache/geode/internal/cache/DistributedRemoveAllOperation$RemoveAllMessage,2 fromData,217 toData,188 @@ -1985,8 +1982,8 @@ fromData,214 toData,245 org/apache/geode/internal/cache/versions/VersionTag,2 -fromData,189 -toData,259 +fromData,225 +toData,254 org/apache/geode/internal/cache/wan/GatewaySenderAdvisor$GatewaySenderProfile,4 fromData,283 diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionTag.java b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionTag.java index ff76383..40d3cbd 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionTag.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionTag.java @@ -17,6 +17,8 @@ package org.apache.geode.internal.cache.versions; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.nio.BufferUnderflowException; +import java.util.Objects; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.logging.log4j.Logger; @@ -29,6 +31,7 @@ import org.apache.geode.internal.logging.log4j.LogMarker; import org.apache.geode.internal.serialization.DataSerializableFixedID; import org.apache.geode.internal.serialization.DeserializationContext; import org.apache.geode.internal.serialization.SerializationContext; +import org.apache.geode.internal.serialization.Version; import org.apache.geode.internal.size.ReflectionSingleObjectSizer; /** @@ -60,11 +63,11 @@ public abstract class VersionTag<T extends VersionSource> // flags for serialization - private static final int HAS_MEMBER_ID = 0x01; - private static final int HAS_PREVIOUS_MEMBER_ID = 0x02; - private static final int VERSION_TWO_BYTES = 0x04; - private static final int DUPLICATE_MEMBER_IDS = 0x08; - private static final int HAS_RVV_HIGH_BYTE = 0x10; + static final int HAS_MEMBER_ID = 0x01; + static final int HAS_PREVIOUS_MEMBER_ID = 0x02; + static final int VERSION_TWO_BYTES = 0x04; + static final int DUPLICATE_MEMBER_IDS = 0x08; + static final int HAS_RVV_HIGH_BYTE = 0x10; private static final int BITS_POSDUP = 0x01; private static final int BITS_RECORDED = 0x02; // has the rvv recorded this? @@ -350,10 +353,13 @@ public abstract class VersionTag<T extends VersionSource> if (this.memberID != null && includeMember) { flags |= HAS_MEMBER_ID; } - if (this.previousMemberID != null) { + boolean writePreviousMemberID = false; + if (this.previousMemberID != null && includeMember) { flags |= HAS_PREVIOUS_MEMBER_ID; - if (this.previousMemberID == this.memberID && includeMember) { + if (Objects.equals(this.previousMemberID, this.memberID)) { flags |= DUPLICATE_MEMBER_IDS; + } else { + writePreviousMemberID = true; } } if (logger.isTraceEnabled(LogMarker.VERSION_TAG_VERBOSE)) { @@ -376,8 +382,7 @@ public abstract class VersionTag<T extends VersionSource> if (this.memberID != null && includeMember) { writeMember(this.memberID, out); } - if (this.previousMemberID != null - && (this.previousMemberID != this.memberID || !includeMember)) { + if (writePreviousMemberID) { writeMember(this.previousMemberID, out); } } @@ -409,7 +414,16 @@ public abstract class VersionTag<T extends VersionSource> if ((flags & DUPLICATE_MEMBER_IDS) != 0) { this.previousMemberID = this.memberID; } else { - this.previousMemberID = readMember(in); + try { + this.previousMemberID = readMember(in); + } catch (BufferUnderflowException e) { + if (context.getSerializationVersion().compareTo(Version.GEODE_1_11_0) < 0) { + // GEODE-7219: older versions may report HAS_PREVIOUS_MEMBER_ID but not transmit it + logger.info("Buffer underflow encountered while reading a version tag - ignoring"); + } else { + throw e; + } + } } } setBits(BITS_IS_REMOTE_TAG); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/AbstractVersionTagTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/AbstractVersionTagTestBase.java index 82a15fa..ad0e252 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/AbstractVersionTagTestBase.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/AbstractVersionTagTestBase.java @@ -14,15 +14,48 @@ */ package org.apache.geode.internal.cache.versions; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; import org.junit.Before; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.apache.geode.internal.InternalDataSerializer; +import org.apache.geode.internal.serialization.DeserializationContext; +import org.apache.geode.internal.serialization.Version; +import org.apache.geode.internal.serialization.VersionedDataInputStream; +import org.apache.geode.internal.tcp.ByteBufferInputStream; public abstract class AbstractVersionTagTestBase { + Set<Integer> usedInts = new HashSet<>(); + Random random = new Random(); + @SuppressWarnings("rawtypes") protected abstract VersionTag createVersionTag(); + protected abstract VersionSource createMemberID(); + @SuppressWarnings("rawtypes") private VersionTag vt; @@ -31,6 +64,122 @@ public abstract class AbstractVersionTagTestBase { this.vt = createVersionTag(); } + int getRandomUnusedInt() { + int unusedInt; + do { + unusedInt = random.nextInt(60000); + } while (usedInts.contains(unusedInt)); + return unusedInt; + } + + @Test + public void testConcurrentCanonicalizationOfIDsAndSerialization() throws IOException { + VersionTag spy = spy(vt); + DataOutput dataOutput = mock(DataOutput.class); + spy.setMemberID(createMemberID()); + spy.setPreviousMemberID(createMemberID()); + final short[] flags = {0}; + + Answer myAnswer = new Answer() { + boolean firstInvocation = true; + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + if (firstInvocation) { + // save the argument - it's the "flags" int that we'll want to verify + flags[0] = (short) (((Integer) invocation.getArgument(0)).intValue() & 0xFFFF); + firstInvocation = false; + // canonicalize the member IDs. Once flags have been written the tag shouldn't examine + // previousMemberID again to see if it's the same as the memberID. + spy.setPreviousMemberID(spy.getMemberID()); + } + return null; + } + }; + doAnswer(myAnswer).when(dataOutput).writeShort(any(Integer.class)); + spy.toData(dataOutput, true); + // verify that we only wrote the + verify(spy, times(2)).writeMember(isA(VersionSource.class), isA(DataOutput.class)); + assertThat(flags[0] & VersionTag.HAS_MEMBER_ID).isEqualTo(VersionTag.HAS_MEMBER_ID); + assertThat(flags[0] & VersionTag.HAS_PREVIOUS_MEMBER_ID) + .isEqualTo(VersionTag.HAS_PREVIOUS_MEMBER_ID); + assertThat(flags[0] & VersionTag.DUPLICATE_MEMBER_IDS) + .isNotEqualTo(VersionTag.DUPLICATE_MEMBER_IDS); + } + + @Test + public void testSerializationWritesNoMemberID() throws IOException { + VersionTag spy = spy(vt); + DataOutput dataOutput = mock(DataOutput.class); + spy.setMemberID(createMemberID()); + spy.setPreviousMemberID(createMemberID()); + final short[] flags = {0}; + + Answer myAnswer = new Answer() { + boolean firstInvocation = true; + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + if (firstInvocation) { + // save the argument - it's the "flags" int that we'll want to verify + flags[0] = (short) (((Integer) invocation.getArgument(0)).intValue() & 0xFFFF); + firstInvocation = false; + } + return null; + } + }; + doAnswer(myAnswer).when(dataOutput).writeShort(any(Integer.class)); + spy.toData(dataOutput, false); + // verify that we didn't write member IDs and the flags don't state that there are IDs in the + // tag + verify(spy, times(0)).writeMember(isA(VersionSource.class), isA(DataOutput.class)); + assertThat(flags[0] & VersionTag.HAS_MEMBER_ID).isNotEqualTo(VersionTag.HAS_MEMBER_ID); + assertThat(flags[0] & VersionTag.HAS_PREVIOUS_MEMBER_ID) + .isNotEqualTo(VersionTag.HAS_PREVIOUS_MEMBER_ID); + assertThat(flags[0] & VersionTag.DUPLICATE_MEMBER_IDS) + .isNotEqualTo(VersionTag.DUPLICATE_MEMBER_IDS); + } + + @Test + public void testBufferUnderflowFromOldVersionIsIgnored() + throws IOException, ClassNotFoundException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(1000); + DataOutputStream out = new DataOutputStream(outputStream); + short flags = + VersionTag.HAS_MEMBER_ID | VersionTag.HAS_PREVIOUS_MEMBER_ID | VersionTag.VERSION_TWO_BYTES; + out.writeShort(flags); + out.writeShort(0); + out.write(1); + out.writeShort(12345); + out.writeInt(12345); + InternalDataSerializer.writeUnsignedVL(1L, out); + VersionSource memberID = createMemberID(); + vt.writeMember(memberID, out); + out.flush(); + + ByteBufferInputStream inputStream = + new ByteBufferInputStream(ByteBuffer.wrap(outputStream.toByteArray())); + DataInputStream in = new DataInputStream(inputStream); + VersionedDataInputStream versionedDataInputStream = + new VersionedDataInputStream(in, Version.GEODE_1_10_0); + DeserializationContext context = + InternalDataSerializer.createDeserializationContext(versionedDataInputStream); + + // deserializing a version tag that's missing the "previous member id" should work for messages + // from older nodes but not post-1.10 because the serialization problem was fixed + vt = createVersionTag(); + vt.fromData(versionedDataInputStream, context); + assertThat(vt.getMemberID()).isEqualTo(memberID); + + inputStream.position(0); + final DataInputStream unversionedInputStream = new DataInputStream(inputStream); + final DeserializationContext unversionedContext = + InternalDataSerializer.createDeserializationContext(in); + vt = createVersionTag(); + assertThatThrownBy(() -> vt.fromData(unversionedInputStream, unversionedContext)) + .isExactlyInstanceOf(BufferUnderflowException.class); + } + @Test public void testFromOtherMemberBit() { assertEquals(false, vt.isFromOtherMember()); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/VMVersionTagTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/DiskVersionTagTest.java similarity index 73% copy from geode-core/src/test/java/org/apache/geode/internal/cache/versions/VMVersionTagTest.java copy to geode-core/src/test/java/org/apache/geode/internal/cache/versions/DiskVersionTagTest.java index 7a4971e..0991f26 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/VMVersionTagTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/DiskVersionTagTest.java @@ -15,13 +15,21 @@ package org.apache.geode.internal.cache.versions; +import org.apache.geode.internal.cache.persistence.DiskStoreID; -public class VMVersionTagTest extends AbstractVersionTagTestBase { +public class DiskVersionTagTest extends AbstractVersionTagTestBase { @SuppressWarnings("rawtypes") @Override protected VersionTag createVersionTag() { - return new VMVersionTag(); + return new DiskVersionTag(); + } + + @Override + protected VersionSource createMemberID() { + int high = getRandomUnusedInt(); + int low = getRandomUnusedInt(); + return new DiskStoreID(high, low); } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/VMVersionTagTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/VMVersionTagTest.java index 7a4971e..e35a734 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/VMVersionTagTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/VMVersionTagTest.java @@ -15,6 +15,7 @@ package org.apache.geode.internal.cache.versions; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; public class VMVersionTagTest extends AbstractVersionTagTestBase { @@ -24,4 +25,9 @@ public class VMVersionTagTest extends AbstractVersionTagTestBase { return new VMVersionTag(); } + @Override + protected VersionSource createMemberID() { + int port = getRandomUnusedInt(); + return new InternalDistributedMember("localhost", port); + } }