DonalEvans commented on code in PR #7513:
URL: https://github.com/apache/geode/pull/7513#discussion_r841954532
##########
geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/RPopLPushDUnitTest.java:
##########
@@ -142,51 +142,89 @@ public void
givenBucketsMovedDuringRPopLPush_thenOperationsAreNotLostOrDuplicate
future3.get();
}
- @Ignore("GEODE-10121")
@Test
- public void rpoplpush_isTransactional() {
- String hashTag = "{" + clusterStartUp.getKeyOnServer("tag", 1) + "}";
-
- // Create two real RedisList entries
- String sourceKey = hashTag + KEY_1;
- String[] sourceElements = {"sourceElement1", "sourceElement2"};
- jedis.lpush(sourceKey, sourceElements);
- String destinationKey = hashTag + KEY_2;
- String destinationElement = "destinationElement";
- jedis.lpush(destinationKey, destinationElement);
-
- String throwingRedisListKey = hashTag + "ThrowingRedisList";
- String throwingListElement = "shouldNotMove";
-
- // Put a test version of RedisList directly into the region that throws if
rpop() or lpush() are
- // called on it
+ public void rpop_isTransactional() {
Review Comment:
This should be "rpoplpush_isTransactional"
##########
geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractRPopLPushIntegrationTest.java:
##########
@@ -99,6 +99,14 @@ public void rPopLPush_withNonexistentSourceKey_returnsNull()
{
assertThat(jedis.rpoplpush(SOURCE_KEY, DESTINATION_KEY)).isNull();
}
+ @Test
+ public void rPopLPush_withNonexistentDestinationKey_returnsPoppedElement() {
Review Comment:
This test is duplicating
"rPopLPush_createsListAtDestination_whenDestinationKeyIsEmpty" so should
probably be removed.
##########
geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java:
##########
@@ -86,12 +93,19 @@ public static int smove(RedisKey sourceKey, RedisKey
destKey, byte[] member,
RegionProvider regionProvider) {
RedisSet source = regionProvider.getTypedRedisData(REDIS_SET, sourceKey,
false);
RedisSet destination = regionProvider.getTypedRedisData(REDIS_SET,
destKey, false);
- List<byte[]> memberList = new ArrayList<>();
- memberList.add(member);
- if (source.srem(memberList, regionProvider.getDataRegion(), sourceKey) ==
0) {
+ if (!source.sismember(member)) {
return 0;
}
- destination.sadd(memberList, regionProvider.getDataRegion(), destKey);
+ if (sourceKey.equals(destKey)) {
+ return 1;
+ }
+
+ List<byte[]> memberList = new ArrayList<>();
+ memberList.add(member);
+ RedisSet newSource = new RedisSet(source);
+ newSource.srem(memberList, regionProvider.getDataRegion(), sourceKey);
+ RedisSet newDestination = new RedisSet(destination);
+ newDestination.sadd(memberList, regionProvider.getDataRegion(), destKey);
Review Comment:
This method can be improved to be more efficient and fix some incorrect
behaviour similar to that in RPOPLPUSH:
```
RedisSet source = regionProvider.getTypedRedisData(REDIS_SET, sourceKey,
false);
if (!source.sismember(member)) {
return 0;
}
if (sourceKey.equals(destKey)) {
return 1;
}
List<byte[]> memberList = new ArrayList<>();
memberList.add(member);
RedisSet newSource = new RedisSet(source);
newSource.srem(memberList, regionProvider.getDataRegion(), sourceKey
RedisSet destination = regionProvider.getTypedRedisData(REDIS_SET,
destKey, false);
RedisSet newDestination = new RedisSet(destination);
newDestination.sadd(memberList, regionProvider.getDataRegion(), destKey);
return 1;
```
Without these changes, the below integration test fails with geode-for-redis
but passes for open source Redis:
```
@Test
public void
smove_withNonExistentSourceAndWrongTypeDestination_returnsZero() {
jedis.set(DESTINATION_KEY, "not a RedisSet");
assertThat(jedis.smove(NON_EXISTENT_SET_KEY, DESTINATION_KEY,
MOVED_MEMBER))
.isEqualTo(0);
}
```
##########
geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/RPopLPushDUnitTest.java:
##########
@@ -142,51 +142,89 @@ public void
givenBucketsMovedDuringRPopLPush_thenOperationsAreNotLostOrDuplicate
future3.get();
}
- @Ignore("GEODE-10121")
@Test
- public void rpoplpush_isTransactional() {
- String hashTag = "{" + clusterStartUp.getKeyOnServer("tag", 1) + "}";
-
- // Create two real RedisList entries
- String sourceKey = hashTag + KEY_1;
- String[] sourceElements = {"sourceElement1", "sourceElement2"};
- jedis.lpush(sourceKey, sourceElements);
- String destinationKey = hashTag + KEY_2;
- String destinationElement = "destinationElement";
- jedis.lpush(destinationKey, destinationElement);
-
- String throwingRedisListKey = hashTag + "ThrowingRedisList";
- String throwingListElement = "shouldNotMove";
-
- // Put a test version of RedisList directly into the region that throws if
rpop() or lpush() are
- // called on it
+ public void rpop_isTransactional() {
+ IgnoredException.addIgnoredException(THROWING_CACHE_WRITER_EXCEPTION);
+
+ int primaryVMIndex = 1;
+ final String tag = "{" + clusterStartUp.getKeyOnServer("tag",
primaryVMIndex) + "}";
+
+ final String sourceKey = tag + KEY_1;
+ int sourceSize = 2;
+ List<String> initialElements = makeInitialElementsList(sourceSize);
+ jedis.lpush(sourceKey, initialElements.toArray(new String[0]));
+
+ String throwingKey = tag + "ThrowingRedisString";
+ int destinationSize = 2;
+ List<String> elementsForThrowingKey = makeInitialElementsList(sourceSize);
+ jedis.lpush(throwingKey, elementsForThrowingKey.toArray(new String[0]));
+
+ // Install a cache writer that will throw an exception if a key with a
name equal to throwingKey
+ // is updated or created
clusterStartUp.getMember(1).invoke(() -> {
- RedisKey throwingKey = new
RedisKey(throwingRedisListKey.getBytes(StandardCharsets.UTF_8));
- ThrowingRedisList throwingRedisList = new ThrowingRedisList();
-
throwingRedisList.elementInsert(throwingListElement.getBytes(StandardCharsets.UTF_8),
0);
-
ClusterStartupRule.getCache().getRegion(DEFAULT_REDIS_REGION_NAME).put(throwingKey,
- throwingRedisList);
+ RedisClusterStartupRule.getCache()
+ .<RedisKey, RedisData>getRegion(DEFAULT_REDIS_REGION_NAME)
+ .getAttributesMutator()
+ .setCacheWriter(new
RPopLPushDUnitTest.ThrowingCacheWriter(throwingKey));
});
- IgnoredException.addIgnoredException(THROWING_REDIS_LIST_EXCEPTION);
+ assertThatThrownBy(
+ () -> jedis.rpoplpush(sourceKey, throwingKey))
+ .hasMessage(SERVER_ERROR_MESSAGE);
- // Test with an exception being thrown from the source RedisList
- assertThatThrownBy(() -> jedis.rpoplpush(throwingRedisListKey,
destinationKey))
- .hasMessage(SERVER_ERROR_MESSAGE);
-
- assertThat(jedis.lrange(throwingRedisListKey, 0,
-1)).containsExactly(throwingListElement);
- assertThat(jedis.lrange(destinationKey, 0,
-1)).containsExactly(destinationElement);
+ List<String> reversedInitialElements = new ArrayList<>(initialElements);
+ Collections.reverse(reversedInitialElements);
- // Test with an exception being thrown from the destination RedisList
- assertThatThrownBy(() -> jedis.rpoplpush(sourceKey, throwingRedisListKey))
- .hasMessage(SERVER_ERROR_MESSAGE);
+ List<String> reversedElementsForThrowingKey = new
ArrayList<>(elementsForThrowingKey);
+ Collections.reverse(reversedElementsForThrowingKey);
- assertThat(jedis.lrange(sourceKey, 0,
-1)).containsExactlyInAnyOrder(sourceElements);
- assertThat(jedis.lrange(throwingRedisListKey, 0,
-1)).containsExactly(throwingRedisListKey);
+ // Assert rpoplpush has not happened
+ assertThat(jedis.llen(sourceKey)).isEqualTo(sourceSize);
+ assertThat(jedis.lrange(sourceKey, 0,
-1)).containsExactlyElementsOf(reversedInitialElements);
+ assertThat(jedis.llen(throwingKey)).isEqualTo(destinationSize);
Review Comment:
It's not necessary to check the length of the lists here, as
`containsExactlyElementsOf()` implicitly confirms that the length is correct.
##########
geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/set/SMoveDunitTest.java:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.set;
+
+import static
org.apache.geode.redis.internal.RedisConstants.SERVER_ERROR_MESSAGE;
+import static
org.apache.geode.redis.internal.services.RegionProvider.DEFAULT_REDIS_REGION_NAME;
+import static
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class SMoveDunitTest {
+
+ public static final String THROWING_CACHE_WRITER_EXCEPTION = "to be ignored";
+
+ @Rule
+ public RedisClusterStartupRule clusterStartUp = new
RedisClusterStartupRule();
+
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ public static final String KEY_1 = "key1";
+ public static final String KEY_2 = "key2";
+ private static JedisCluster jedis;
+
+ @Before
+ public void testSetup() {
+ MemberVM locator = clusterStartUp.startLocatorVM(0);
+ clusterStartUp.startRedisVM(1, locator.getPort());
+ clusterStartUp.startRedisVM(2, locator.getPort());
+ clusterStartUp.startRedisVM(3, locator.getPort());
+ int redisServerPort = clusterStartUp.getRedisPort(1);
+ jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort),
20_000);
+ clusterStartUp.flushAll();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ jedis.close();
+ }
+
+
+ @Test
+ public void shouldDistributeDataAmongCluster_andRetainDataAfterServerCrash()
{
+ int primaryVMIndex = 1;
+ final String tag = "{" + clusterStartUp.getKeyOnServer("tag",
primaryVMIndex) + "}";
+ final String sourceKey = tag + KEY_1;
+ final String destinationKey = tag + KEY_2;
+
+
+ final int elementsToMove = 5;
+ final int initialElementCount = elementsToMove * 2;
+
+ List<String> members = makeMemberList(initialElementCount, "member1-");
+
+ jedis.sadd(sourceKey, members.toArray(new String[] {}));
+
+ // Move half the elements from the source set to the destination
+ for (int i = 0; i < elementsToMove; ++i) {
+ assertThat(jedis.smove(sourceKey, destinationKey,
members.get(i))).isEqualTo(1);
+ }
+
+ clusterStartUp.crashVM(primaryVMIndex); // kill primary server
+
+ assertThat(jedis.smembers(sourceKey))
+ .containsExactlyInAnyOrderElementsOf(members.subList(elementsToMove,
initialElementCount));
+ assertThat(jedis.smembers(destinationKey))
+ .containsExactlyInAnyOrderElementsOf(members.subList(0,
elementsToMove));
+ }
+
+ @Test
+ public void
givenBucketsMovedDuringSMove_thenOperationsAreNotLostOrDuplicated()
+ throws InterruptedException, ExecutionException {
+
+ final AtomicBoolean continueRunning = new AtomicBoolean(true);
+ final List<String> hashTags = getHashTagsForEachServer();
+
+ List<String> members1 = makeMemberList(10, "member1-");
+ List<String> members2 = makeMemberList(10, "member2-");
+ List<String> members3 = makeMemberList(10, "member3-");
+
+ jedis.sadd(hashTags.get(0) + KEY_1, members1.toArray(new String[] {}));
+ jedis.sadd(hashTags.get(1) + KEY_1, members2.toArray(new String[] {}));
+ jedis.sadd(hashTags.get(2) + KEY_1, members3.toArray(new String[] {}));
+
+ Future<Void> future1 = executor.runAsync(() -> repeatSMove(hashTags.get(0),
+ members1, continueRunning));
+ Future<Void> future2 = executor.runAsync(() -> repeatSMove(hashTags.get(1),
+ members2, continueRunning));
+ Future<Void> future3 =
+ executor.runAsync(() ->
repeatSMoveWithSameSourceAndDest(hashTags.get(2),
+ members3, continueRunning));
+
+ for (int i = 0; i < 25 && continueRunning.get(); i++) {
+ clusterStartUp.moveBucketForKey(hashTags.get(i % hashTags.size()));
+ Thread.sleep(200);
+ }
+
+ continueRunning.set(false);
+
+ future1.get();
+ future2.get();
+ future3.get();
+ }
+
+ @Test
+ public void smove_isTransactional() {
+ IgnoredException.addIgnoredException(THROWING_CACHE_WRITER_EXCEPTION);
+
+ int primaryVMIndex = 1;
+ final String tag = "{" + clusterStartUp.getKeyOnServer("tag",
primaryVMIndex) + "}";
+
+ final String sourceKey = tag + KEY_1;
+ int sourceSize = 2;
+ List<String> members = makeMemberList(sourceSize, "member1-");
+ jedis.sadd(sourceKey, members.toArray(new String[] {}));
+
+ String throwingKey = tag + "ThrowingRedisString";
+ int destinationSize = 2;
+ List<String> membersForThrowingKey = makeMemberList(2,
"ThrowingRedisStringValue");
+ jedis.sadd(throwingKey, membersForThrowingKey.toArray(new String[] {}));
+
+ // Install a cache writer that will throw an exception if a key with a
name equal to throwingKey
+ // is updated or created
+ clusterStartUp.getMember(1).invoke(() -> {
+ RedisClusterStartupRule.getCache()
+ .<RedisKey, RedisData>getRegion(DEFAULT_REDIS_REGION_NAME)
+ .getAttributesMutator()
+ .setCacheWriter(new SMoveDunitTest.ThrowingCacheWriter(throwingKey));
+ });
+
+ String memberToRemove = "member1-0";
+
+ assertThatThrownBy(
+ () -> jedis.smove(sourceKey, throwingKey, memberToRemove))
+ .hasMessage(SERVER_ERROR_MESSAGE);
+
+ // Assert smove has not happened
+ assertThat(jedis.scard(sourceKey)).isEqualTo(sourceSize);
+ assertThat(jedis.smembers(sourceKey)).contains(memberToRemove);
+ assertThat(jedis.scard(throwingKey)).isEqualTo(destinationSize);
+ assertThat(jedis.smembers(throwingKey)).doesNotContain(memberToRemove);
Review Comment:
It would be better to assert on the contents of the sets here rather than
the sizes and whether or not they contain a specific element:
```
assertThat(jedis.smembers(sourceKey)).containsExactlyInAnyOrderElementsOf(members);
assertThat(jedis.smembers(throwingKey)).containsExactlyInAnyOrderElementsOf(membersForThrowingKey);
```
##########
geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java:
##########
@@ -316,10 +323,25 @@ public long rpush(ExecutionHandlerContext context,
List<byte[]> elementsToAdd, R
RedisList sourceList = regionProvider.getTypedRedisData(REDIS_LIST,
source, false);
RedisList destinationList = regionProvider.getTypedRedisData(REDIS_LIST,
destination, false);
Region<RedisKey, RedisData> region = regionProvider.getDataRegion();
- byte[] moved = sourceList.rpop(region, source);
+
+ if (!sourceList.exists()) {
+ return null;
+ }
+
+ if (source.equals(destination)) {
+ RedisList newSourceList = new RedisList(sourceList);
+ byte[] moved = newSourceList.rpop(region, source);
+ newSourceList.lpush(context, Collections.singletonList(moved),
destination, false);
+ return moved;
+ }
+
+ RedisList newSourceList = new RedisList(sourceList);
+ byte[] moved = newSourceList.rpop(region, source);
if (moved != null) {
- destinationList.lpush(context, Collections.singletonList(moved),
destination, false);
+ RedisList newDestinationList = new RedisList(destinationList);
+ newDestinationList.lpush(context, Collections.singletonList(moved),
destination, false);
}
+
return moved;
Review Comment:
This block of code can be simplified and made a little more efficient by
changing the order in which we're doing things a little:
```
RedisList sourceList = regionProvider.getTypedRedisData(REDIS_LIST,
source, false);
if (sourceList.isNull()) {
return null;
}
RedisList newSourceList = new RedisList(sourceList);
Region<RedisKey, RedisData> region = regionProvider.getDataRegion();
byte[] moved = newSourceList.rpop(region, source);
if (source.equals(destination)) {
newSourceList.lpush(context, Collections.singletonList(moved),
destination, false);
} else {
RedisList destinationList =
regionProvider.getTypedRedisData(REDIS_LIST, destination, false);
new RedisList(destinationList).lpush(context,
Collections.singletonList(moved), destination, false);
}
return moved;
```
By moving the check for whether the source list is null higher up, we can
return early without having to retrieve the destination list, which also fixes
some incorrect behaviour where a nonexistent source key with a non-list
destination key would return an error instead of returning null. An integration
test to show this behaviour is:
```
@Test
public void
rPopLPush_withNonexistentSourceKeyAndNonListDestinationKey_returnsNull() {
jedis.set(DESTINATION_KEY, "not_a_list");
assertThat(jedis.rpoplpush(SOURCE_KEY, DESTINATION_KEY)).isNull();
}
```
We can also avoid having to retrieve the destination list from the region if
the source and destination are the same by moving the `getTypedRedisData()`
call inside the if statement, which will improve performance in that case. We
also don't need to check both that the source is not null and that the member
popped is not null, as the only time the member could be null would be if the
source was null.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]