This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-7219
in repository https://gitbox.apache.org/repos/asf/geode.git

commit f4c807a0479b96bb1dfc38dd1ceb821c20e4529a
Author: Bruce Schuchardt <bschucha...@pivotal.io>
AuthorDate: Thu Sep 19 13:49:53 2019 -0700

    GEODE-7219 BufferUnderflowException in PutReplyMessage deserialization
    
    VersionTag serialization was being affected by concurrent modification
    of its memberId/previousMemberId fields, causing the HAS_PREVIOUS_MEMBER_ID
    flag bit to be set and the DUPLICATE_MEMBER_IDS flag to _not_ be set.
    It then went on to perform the same checks later in toData() and make
    different decisions, winding up by not serializing the previousMemberId
    field because it was then == to the memberId field.
    
    1) decide on what the flags are going to be in toData and do not perform
    the same calculations again.
    
    2) use equals() when seeing if memberId and previousMemberId are equal.
---
 .../codeAnalysis/sanctionedDataSerializables.txt   |   7 +-
 .../geode/internal/cache/versions/VersionTag.java  |  34 +++--
 .../cache/versions/AbstractVersionTagTestBase.java | 149 +++++++++++++++++++++
 ...VersionTagTest.java => DiskVersionTagTest.java} |  12 +-
 .../internal/cache/versions/VMVersionTagTest.java  |   6 +
 5 files changed, 191 insertions(+), 17 deletions(-)

diff --git 
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
 
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 486e2e2..6d3264c 100644
--- 
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ 
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -970,9 +970,6 @@ 
org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage,2
 fromData,177
 toData,174
 
-org/apache/geode/internal/cache/DistributedRemoveAllOperation$RemoveAllEntryData,1
-toData,162
-
 
org/apache/geode/internal/cache/DistributedRemoveAllOperation$RemoveAllMessage,2
 fromData,217
 toData,188
@@ -1985,8 +1982,8 @@ fromData,214
 toData,245
 
 org/apache/geode/internal/cache/versions/VersionTag,2
-fromData,189
-toData,259
+fromData,225
+toData,254
 
 org/apache/geode/internal/cache/wan/GatewaySenderAdvisor$GatewaySenderProfile,4
 fromData,283
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionTag.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionTag.java
index ff76383..40d3cbd 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionTag.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionTag.java
@@ -17,6 +17,8 @@ package org.apache.geode.internal.cache.versions;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.logging.log4j.Logger;
@@ -29,6 +31,7 @@ import org.apache.geode.internal.logging.log4j.LogMarker;
 import org.apache.geode.internal.serialization.DataSerializableFixedID;
 import org.apache.geode.internal.serialization.DeserializationContext;
 import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.internal.serialization.Version;
 import org.apache.geode.internal.size.ReflectionSingleObjectSizer;
 
 /**
@@ -60,11 +63,11 @@ public abstract class VersionTag<T extends VersionSource>
 
 
   // flags for serialization
-  private static final int HAS_MEMBER_ID = 0x01;
-  private static final int HAS_PREVIOUS_MEMBER_ID = 0x02;
-  private static final int VERSION_TWO_BYTES = 0x04;
-  private static final int DUPLICATE_MEMBER_IDS = 0x08;
-  private static final int HAS_RVV_HIGH_BYTE = 0x10;
+  static final int HAS_MEMBER_ID = 0x01;
+  static final int HAS_PREVIOUS_MEMBER_ID = 0x02;
+  static final int VERSION_TWO_BYTES = 0x04;
+  static final int DUPLICATE_MEMBER_IDS = 0x08;
+  static final int HAS_RVV_HIGH_BYTE = 0x10;
 
   private static final int BITS_POSDUP = 0x01;
   private static final int BITS_RECORDED = 0x02; // has the rvv recorded this?
@@ -350,10 +353,13 @@ public abstract class VersionTag<T extends VersionSource>
     if (this.memberID != null && includeMember) {
       flags |= HAS_MEMBER_ID;
     }
-    if (this.previousMemberID != null) {
+    boolean writePreviousMemberID = false;
+    if (this.previousMemberID != null && includeMember) {
       flags |= HAS_PREVIOUS_MEMBER_ID;
-      if (this.previousMemberID == this.memberID && includeMember) {
+      if (Objects.equals(this.previousMemberID, this.memberID)) {
         flags |= DUPLICATE_MEMBER_IDS;
+      } else {
+        writePreviousMemberID = true;
       }
     }
     if (logger.isTraceEnabled(LogMarker.VERSION_TAG_VERBOSE)) {
@@ -376,8 +382,7 @@ public abstract class VersionTag<T extends VersionSource>
     if (this.memberID != null && includeMember) {
       writeMember(this.memberID, out);
     }
-    if (this.previousMemberID != null
-        && (this.previousMemberID != this.memberID || !includeMember)) {
+    if (writePreviousMemberID) {
       writeMember(this.previousMemberID, out);
     }
   }
@@ -409,7 +414,16 @@ public abstract class VersionTag<T extends VersionSource>
       if ((flags & DUPLICATE_MEMBER_IDS) != 0) {
         this.previousMemberID = this.memberID;
       } else {
-        this.previousMemberID = readMember(in);
+        try {
+          this.previousMemberID = readMember(in);
+        } catch (BufferUnderflowException e) {
+          if 
(context.getSerializationVersion().compareTo(Version.GEODE_1_11_0) < 0) {
+            // GEODE-7219: older versions may report HAS_PREVIOUS_MEMBER_ID 
but not transmit it
+            logger.info("Buffer underflow encountered while reading a version 
tag - ignoring");
+          } else {
+            throw e;
+          }
+        }
       }
     }
     setBits(BITS_IS_REMOTE_TAG);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/AbstractVersionTagTestBase.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/AbstractVersionTagTestBase.java
index 82a15fa..ad0e252 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/AbstractVersionTagTestBase.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/AbstractVersionTagTestBase.java
@@ -14,15 +14,48 @@
  */
 package org.apache.geode.internal.cache.versions;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.internal.serialization.VersionedDataInputStream;
+import org.apache.geode.internal.tcp.ByteBufferInputStream;
 
 public abstract class AbstractVersionTagTestBase {
+  Set<Integer> usedInts = new HashSet<>();
+  Random random = new Random();
+
   @SuppressWarnings("rawtypes")
   protected abstract VersionTag createVersionTag();
 
+  protected abstract VersionSource createMemberID();
+
   @SuppressWarnings("rawtypes")
   private VersionTag vt;
 
@@ -31,6 +64,122 @@ public abstract class AbstractVersionTagTestBase {
     this.vt = createVersionTag();
   }
 
+  int getRandomUnusedInt() {
+    int unusedInt;
+    do {
+      unusedInt = random.nextInt(60000);
+    } while (usedInts.contains(unusedInt));
+    return unusedInt;
+  }
+
+  @Test
+  public void testConcurrentCanonicalizationOfIDsAndSerialization() throws 
IOException {
+    VersionTag spy = spy(vt);
+    DataOutput dataOutput = mock(DataOutput.class);
+    spy.setMemberID(createMemberID());
+    spy.setPreviousMemberID(createMemberID());
+    final short[] flags = {0};
+
+    Answer myAnswer = new Answer() {
+      boolean firstInvocation = true;
+
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        if (firstInvocation) {
+          // save the argument - it's the "flags" int that we'll want to verify
+          flags[0] = (short) (((Integer) invocation.getArgument(0)).intValue() 
& 0xFFFF);
+          firstInvocation = false;
+          // canonicalize the member IDs. Once flags have been written the tag 
shouldn't examine
+          // previousMemberID again to see if it's the same as the memberID.
+          spy.setPreviousMemberID(spy.getMemberID());
+        }
+        return null;
+      }
+    };
+    doAnswer(myAnswer).when(dataOutput).writeShort(any(Integer.class));
+    spy.toData(dataOutput, true);
+    // verify that we only wrote the
+    verify(spy, times(2)).writeMember(isA(VersionSource.class), 
isA(DataOutput.class));
+    assertThat(flags[0] & 
VersionTag.HAS_MEMBER_ID).isEqualTo(VersionTag.HAS_MEMBER_ID);
+    assertThat(flags[0] & VersionTag.HAS_PREVIOUS_MEMBER_ID)
+        .isEqualTo(VersionTag.HAS_PREVIOUS_MEMBER_ID);
+    assertThat(flags[0] & VersionTag.DUPLICATE_MEMBER_IDS)
+        .isNotEqualTo(VersionTag.DUPLICATE_MEMBER_IDS);
+  }
+
+  @Test
+  public void testSerializationWritesNoMemberID() throws IOException {
+    VersionTag spy = spy(vt);
+    DataOutput dataOutput = mock(DataOutput.class);
+    spy.setMemberID(createMemberID());
+    spy.setPreviousMemberID(createMemberID());
+    final short[] flags = {0};
+
+    Answer myAnswer = new Answer() {
+      boolean firstInvocation = true;
+
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        if (firstInvocation) {
+          // save the argument - it's the "flags" int that we'll want to verify
+          flags[0] = (short) (((Integer) invocation.getArgument(0)).intValue() 
& 0xFFFF);
+          firstInvocation = false;
+        }
+        return null;
+      }
+    };
+    doAnswer(myAnswer).when(dataOutput).writeShort(any(Integer.class));
+    spy.toData(dataOutput, false);
+    // verify that we didn't write member IDs and the flags don't state that 
there are IDs in the
+    // tag
+    verify(spy, times(0)).writeMember(isA(VersionSource.class), 
isA(DataOutput.class));
+    assertThat(flags[0] & 
VersionTag.HAS_MEMBER_ID).isNotEqualTo(VersionTag.HAS_MEMBER_ID);
+    assertThat(flags[0] & VersionTag.HAS_PREVIOUS_MEMBER_ID)
+        .isNotEqualTo(VersionTag.HAS_PREVIOUS_MEMBER_ID);
+    assertThat(flags[0] & VersionTag.DUPLICATE_MEMBER_IDS)
+        .isNotEqualTo(VersionTag.DUPLICATE_MEMBER_IDS);
+  }
+
+  @Test
+  public void testBufferUnderflowFromOldVersionIsIgnored()
+      throws IOException, ClassNotFoundException {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream(1000);
+    DataOutputStream out = new DataOutputStream(outputStream);
+    short flags =
+        VersionTag.HAS_MEMBER_ID | VersionTag.HAS_PREVIOUS_MEMBER_ID | 
VersionTag.VERSION_TWO_BYTES;
+    out.writeShort(flags);
+    out.writeShort(0);
+    out.write(1);
+    out.writeShort(12345);
+    out.writeInt(12345);
+    InternalDataSerializer.writeUnsignedVL(1L, out);
+    VersionSource memberID = createMemberID();
+    vt.writeMember(memberID, out);
+    out.flush();
+
+    ByteBufferInputStream inputStream =
+        new ByteBufferInputStream(ByteBuffer.wrap(outputStream.toByteArray()));
+    DataInputStream in = new DataInputStream(inputStream);
+    VersionedDataInputStream versionedDataInputStream =
+        new VersionedDataInputStream(in, Version.GEODE_1_10_0);
+    DeserializationContext context =
+        
InternalDataSerializer.createDeserializationContext(versionedDataInputStream);
+
+    // deserializing a version tag that's missing the "previous member id" 
should work for messages
+    // from older nodes but not post-1.10 because the serialization problem 
was fixed
+    vt = createVersionTag();
+    vt.fromData(versionedDataInputStream, context);
+    assertThat(vt.getMemberID()).isEqualTo(memberID);
+
+    inputStream.position(0);
+    final DataInputStream unversionedInputStream = new 
DataInputStream(inputStream);
+    final DeserializationContext unversionedContext =
+        InternalDataSerializer.createDeserializationContext(in);
+    vt = createVersionTag();
+    assertThatThrownBy(() -> vt.fromData(unversionedInputStream, 
unversionedContext))
+        .isExactlyInstanceOf(BufferUnderflowException.class);
+  }
+
   @Test
   public void testFromOtherMemberBit() {
     assertEquals(false, vt.isFromOtherMember());
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/VMVersionTagTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/DiskVersionTagTest.java
similarity index 73%
copy from 
geode-core/src/test/java/org/apache/geode/internal/cache/versions/VMVersionTagTest.java
copy to 
geode-core/src/test/java/org/apache/geode/internal/cache/versions/DiskVersionTagTest.java
index 7a4971e..0991f26 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/VMVersionTagTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/DiskVersionTagTest.java
@@ -15,13 +15,21 @@
 package org.apache.geode.internal.cache.versions;
 
 
+import org.apache.geode.internal.cache.persistence.DiskStoreID;
 
-public class VMVersionTagTest extends AbstractVersionTagTestBase {
+public class DiskVersionTagTest extends AbstractVersionTagTestBase {
 
   @SuppressWarnings("rawtypes")
   @Override
   protected VersionTag createVersionTag() {
-    return new VMVersionTag();
+    return new DiskVersionTag();
+  }
+
+  @Override
+  protected VersionSource createMemberID() {
+    int high = getRandomUnusedInt();
+    int low = getRandomUnusedInt();
+    return new DiskStoreID(high, low);
   }
 
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/VMVersionTagTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/VMVersionTagTest.java
index 7a4971e..e35a734 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/VMVersionTagTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/VMVersionTagTest.java
@@ -15,6 +15,7 @@
 package org.apache.geode.internal.cache.versions;
 
 
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 
 public class VMVersionTagTest extends AbstractVersionTagTestBase {
 
@@ -24,4 +25,9 @@ public class VMVersionTagTest extends 
AbstractVersionTagTestBase {
     return new VMVersionTag();
   }
 
+  @Override
+  protected VersionSource createMemberID() {
+    int port = getRandomUnusedInt();
+    return new InternalDistributedMember("localhost", port);
+  }
 }

Reply via email to