This is an automated email from the ASF dual-hosted git repository. jensdeppe pushed a commit to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 9166cb44ec2854c8c5d501334476f0d6dfe9d258 Author: Donal Evans <doev...@vmware.com> AuthorDate: Fri Jan 21 15:28:10 2022 -0800 GEODE-9885: Handle duplicated appends in Redis StringsDUnitTest (#7290) * GEODE-9885: Handle duplicated appends in Redis StringsDUnitTest - The Jedis client automatically retries when a bucket is moved during an APPEND operation, which can lead to the append being duplicated. Add error handling to the test to check if unexpected append values are duplicates of the previous append and if so, ignore the AssertionError - Add missing synchronization around in-place array modification in RedisString.applyReplaceByteArrayAtOffsetDelta() method - Remove unnecessarily overridden method from NullRedisSet Authored-by: Donal Evans <doev...@vmware.com> (cherry picked from commit b8dd86b846083a59ffe1aa56b489df60f4d75d39) --- .../commands/executor/string/StringsDUnitTest.java | 71 +++++++++++++--------- .../geode/redis/internal/data/NullRedisString.java | 5 -- .../geode/redis/internal/data/RedisString.java | 6 +- 3 files changed, 47 insertions(+), 35 deletions(-) diff --git a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/string/StringsDUnitTest.java b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/string/StringsDUnitTest.java index 2a4479a..a8f8ff4 100644 --- a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/string/StringsDUnitTest.java +++ b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/string/StringsDUnitTest.java @@ -30,7 +30,6 @@ import java.util.function.Consumer; import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; @@ -57,14 +56,13 @@ public class StringsDUnitTest { private static final int NUM_ITERATIONS = 1000; private static JedisCluster jedisCluster; - private static MemberVM locator; - @BeforeClass public static void classSetup() { - locator = clusterStartUp.startLocatorVM(0); - clusterStartUp.startRedisVM(1, locator.getPort()); - clusterStartUp.startRedisVM(2, locator.getPort()); - clusterStartUp.startRedisVM(3, locator.getPort()); + MemberVM locator = clusterStartUp.startLocatorVM(0); + int locatorPort = locator.getPort(); + clusterStartUp.startRedisVM(1, locatorPort); + clusterStartUp.startRedisVM(2, locatorPort); + clusterStartUp.startRedisVM(3, locatorPort); int redisServerPort1 = clusterStartUp.getRedisPort(1); jedisCluster = @@ -263,9 +261,9 @@ public class StringsDUnitTest { hashtags.add(clusterStartUp.getKeyOnServer("append", 2)); hashtags.add(clusterStartUp.getKeyOnServer("append", 3)); - Runnable task1 = () -> appendPerformAndVerify(1, 10000, hashtags.get(0), running); - Runnable task2 = () -> appendPerformAndVerify(2, 10000, hashtags.get(1), running); - Runnable task3 = () -> appendPerformAndVerify(3, 10000, hashtags.get(2), running); + Runnable task1 = () -> appendPerformAndVerify(1, hashtags.get(0), running); + Runnable task2 = () -> appendPerformAndVerify(2, hashtags.get(1), running); + Runnable task3 = () -> appendPerformAndVerify(3, hashtags.get(2), running); Future<Void> future1 = executor.runAsync(task1); Future<Void> future2 = executor.runAsync(task2); @@ -276,11 +274,6 @@ public class StringsDUnitTest { GeodeAwaitility.await().during(Duration.ofMillis(200)).until(() -> true); } - for (int i = 0; i < 100 && running.get(); i++) { - clusterStartUp.moveBucketForKey(hashtags.get(i % hashtags.size())); - GeodeAwaitility.await().during(Duration.ofMillis(200)).until(() -> true); - } - running.set(false); future1.get(); @@ -288,12 +281,11 @@ public class StringsDUnitTest { future3.get(); } - private void appendPerformAndVerify(int index, int minimumIterations, String hashtag, - AtomicBoolean isRunning) { + private void appendPerformAndVerify(int index, String hashtag, AtomicBoolean isRunning) { String key = "{" + hashtag + "}-key-" + index; int iterationCount = 0; - while (iterationCount < minimumIterations || isRunning.get()) { + while (isRunning.get()) { String appendString = "-" + key + "-" + iterationCount + "-"; try { jedisCluster.append(key, appendString); @@ -301,23 +293,46 @@ public class StringsDUnitTest { isRunning.set(false); throw new RuntimeException("Exception performing APPEND " + appendString, ex); } - iterationCount += 1; + iterationCount++; } String storedString = jedisCluster.get(key); - int idx = 0; + int startIndex = 0; int i = 0; - while (i < iterationCount) { + boolean lastWasDuplicate = false; + boolean reachedEnd = false; + while (!reachedEnd) { String expectedValue = "-" + key + "-" + i + "-"; - String foundValue = storedString.substring(idx, idx + expectedValue.length()); - if (!foundValue.equals(expectedValue)) { - Assert.fail("unexpected " + foundValue + " at index " + i + " iterationCount=" - + iterationCount + " in string " - + storedString); - break; + + int endIndex = startIndex + expectedValue.length(); + reachedEnd = endIndex == storedString.length(); + String foundValue = storedString.substring(startIndex, endIndex); + + int subsectionStart = Math.max(0, startIndex - (2 * expectedValue.length())); + int subsectionEnd = Math.min(storedString.length(), endIndex + (2 * expectedValue.length())); + + try { + assertThat(foundValue).as("unexpected " + foundValue + + " at index " + i + + " iterationCount=" + iterationCount + + " in String subsection: " + storedString.substring(subsectionStart, subsectionEnd)) + .isEqualTo(expectedValue); + lastWasDuplicate = false; + } catch (AssertionError error) { + // Jedis client retries can lead to duplicated appends, so check if the append is a + // duplicate of the previous one, but only allow one duplicated append in a row + String previousAppend = "-" + key + "-" + (i - 1) + "-"; + if (lastWasDuplicate || !foundValue.equals(previousAppend)) { + throw error; + } + + // Decrement the counter to account for the duplicated append and reset the flag to detect + // multiple duplicated appends in a row + lastWasDuplicate = true; + i--; } - idx += expectedValue.length(); + startIndex = endIndex; i++; } } diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisString.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisString.java index 5a644fc..fdd9326 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisString.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisString.java @@ -43,11 +43,6 @@ public class NullRedisString extends RedisString { } @Override - protected void valueAppend(byte[] bytes) { - throw new UnsupportedOperationException(); - } - - @Override public byte[] get() { return null; } diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisString.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisString.java index db9baf2..cfde92b 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisString.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisString.java @@ -190,7 +190,9 @@ public class RedisString extends AbstractRedisData { public void applyReplaceByteArrayAtOffsetDelta(int offset, byte[] valueToAdd) { int totalLength = offset + valueToAdd.length; if (totalLength < value.length) { - System.arraycopy(valueToAdd, 0, value, offset, valueToAdd.length); + synchronized (this) { + System.arraycopy(valueToAdd, 0, value, offset, valueToAdd.length); + } } else { byte[] newBytes = Arrays.copyOf(value, totalLength); System.arraycopy(valueToAdd, 0, newBytes, offset, valueToAdd.length); @@ -362,7 +364,7 @@ public class RedisString extends AbstractRedisData { /** * Since GII (getInitialImage) can come in and call toData while other threads * are modifying this object, the striped executor will not protect toData. - * So any methods that modify "value", "appendSequence" need to be thread safe with toData. + * So any methods that modify "value" need to be thread safe with toData. */ @Override