DonalEvans commented on code in PR #7513:
URL: https://github.com/apache/geode/pull/7513#discussion_r841954532


##########
geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/RPopLPushDUnitTest.java:
##########
@@ -142,51 +142,89 @@ public void 
givenBucketsMovedDuringRPopLPush_thenOperationsAreNotLostOrDuplicate
     future3.get();
   }
 
-  @Ignore("GEODE-10121")
   @Test
-  public void rpoplpush_isTransactional() {
-    String hashTag = "{" + clusterStartUp.getKeyOnServer("tag", 1) + "}";
-
-    // Create two real RedisList entries
-    String sourceKey = hashTag + KEY_1;
-    String[] sourceElements = {"sourceElement1", "sourceElement2"};
-    jedis.lpush(sourceKey, sourceElements);
-    String destinationKey = hashTag + KEY_2;
-    String destinationElement = "destinationElement";
-    jedis.lpush(destinationKey, destinationElement);
-
-    String throwingRedisListKey = hashTag + "ThrowingRedisList";
-    String throwingListElement = "shouldNotMove";
-
-    // Put a test version of RedisList directly into the region that throws if 
rpop() or lpush() are
-    // called on it
+  public void rpop_isTransactional() {

Review Comment:
   This should be "rpoplpush_isTransactional"



##########
geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractRPopLPushIntegrationTest.java:
##########
@@ -99,6 +99,14 @@ public void rPopLPush_withNonexistentSourceKey_returnsNull() 
{
     assertThat(jedis.rpoplpush(SOURCE_KEY, DESTINATION_KEY)).isNull();
   }
 
+  @Test
+  public void rPopLPush_withNonexistentDestinationKey_returnsPoppedElement() {

Review Comment:
   This test is duplicating 
"rPopLPush_createsListAtDestination_whenDestinationKeyIsEmpty" so should 
probably be removed.



##########
geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java:
##########
@@ -86,12 +93,19 @@ public static int smove(RedisKey sourceKey, RedisKey 
destKey, byte[] member,
       RegionProvider regionProvider) {
     RedisSet source = regionProvider.getTypedRedisData(REDIS_SET, sourceKey, 
false);
     RedisSet destination = regionProvider.getTypedRedisData(REDIS_SET, 
destKey, false);
-    List<byte[]> memberList = new ArrayList<>();
-    memberList.add(member);
-    if (source.srem(memberList, regionProvider.getDataRegion(), sourceKey) == 
0) {
+    if (!source.sismember(member)) {
       return 0;
     }
-    destination.sadd(memberList, regionProvider.getDataRegion(), destKey);
+    if (sourceKey.equals(destKey)) {
+      return 1;
+    }
+
+    List<byte[]> memberList = new ArrayList<>();
+    memberList.add(member);
+    RedisSet newSource = new RedisSet(source);
+    newSource.srem(memberList, regionProvider.getDataRegion(), sourceKey);
+    RedisSet newDestination = new RedisSet(destination);
+    newDestination.sadd(memberList, regionProvider.getDataRegion(), destKey);

Review Comment:
   This method can be improved to be more efficient and fix some incorrect 
behaviour similar to that in RPOPLPUSH:
   ```
       RedisSet source = regionProvider.getTypedRedisData(REDIS_SET, sourceKey, 
false);
   
       if (!source.sismember(member)) {
         return 0;
       }
   
       if (sourceKey.equals(destKey)) {
         return 1;
       }
   
       List<byte[]> memberList = new ArrayList<>();
       memberList.add(member);
       RedisSet newSource = new RedisSet(source);
       newSource.srem(memberList, regionProvider.getDataRegion(), sourceKey
   
       RedisSet destination = regionProvider.getTypedRedisData(REDIS_SET, 
destKey, false);
       RedisSet newDestination = new RedisSet(destination);
       newDestination.sadd(memberList, regionProvider.getDataRegion(), destKey);
       return 1;
   ```
   
   Without these changes, the below integration test fails with geode-for-redis 
but passes for open source Redis:
   ```
     @Test
     public void 
smove_withNonExistentSourceAndWrongTypeDestination_returnsZero() {
       jedis.set(DESTINATION_KEY, "not a RedisSet");
   
       assertThat(jedis.smove(NON_EXISTENT_SET_KEY, DESTINATION_KEY, 
MOVED_MEMBER))
           .isEqualTo(0);
     }
   ```



##########
geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/RPopLPushDUnitTest.java:
##########
@@ -142,51 +142,89 @@ public void 
givenBucketsMovedDuringRPopLPush_thenOperationsAreNotLostOrDuplicate
     future3.get();
   }
 
-  @Ignore("GEODE-10121")
   @Test
-  public void rpoplpush_isTransactional() {
-    String hashTag = "{" + clusterStartUp.getKeyOnServer("tag", 1) + "}";
-
-    // Create two real RedisList entries
-    String sourceKey = hashTag + KEY_1;
-    String[] sourceElements = {"sourceElement1", "sourceElement2"};
-    jedis.lpush(sourceKey, sourceElements);
-    String destinationKey = hashTag + KEY_2;
-    String destinationElement = "destinationElement";
-    jedis.lpush(destinationKey, destinationElement);
-
-    String throwingRedisListKey = hashTag + "ThrowingRedisList";
-    String throwingListElement = "shouldNotMove";
-
-    // Put a test version of RedisList directly into the region that throws if 
rpop() or lpush() are
-    // called on it
+  public void rpop_isTransactional() {
+    IgnoredException.addIgnoredException(THROWING_CACHE_WRITER_EXCEPTION);
+
+    int primaryVMIndex = 1;
+    final String tag = "{" + clusterStartUp.getKeyOnServer("tag", 
primaryVMIndex) + "}";
+
+    final String sourceKey = tag + KEY_1;
+    int sourceSize = 2;
+    List<String> initialElements = makeInitialElementsList(sourceSize);
+    jedis.lpush(sourceKey, initialElements.toArray(new String[0]));
+
+    String throwingKey = tag + "ThrowingRedisString";
+    int destinationSize = 2;
+    List<String> elementsForThrowingKey = makeInitialElementsList(sourceSize);
+    jedis.lpush(throwingKey, elementsForThrowingKey.toArray(new String[0]));
+
+    // Install a cache writer that will throw an exception if a key with a 
name equal to throwingKey
+    // is updated or created
     clusterStartUp.getMember(1).invoke(() -> {
-      RedisKey throwingKey = new 
RedisKey(throwingRedisListKey.getBytes(StandardCharsets.UTF_8));
-      ThrowingRedisList throwingRedisList = new ThrowingRedisList();
-      
throwingRedisList.elementInsert(throwingListElement.getBytes(StandardCharsets.UTF_8),
 0);
-      
ClusterStartupRule.getCache().getRegion(DEFAULT_REDIS_REGION_NAME).put(throwingKey,
-          throwingRedisList);
+      RedisClusterStartupRule.getCache()
+          .<RedisKey, RedisData>getRegion(DEFAULT_REDIS_REGION_NAME)
+          .getAttributesMutator()
+          .setCacheWriter(new 
RPopLPushDUnitTest.ThrowingCacheWriter(throwingKey));
     });
 
-    IgnoredException.addIgnoredException(THROWING_REDIS_LIST_EXCEPTION);
+    assertThatThrownBy(
+        () -> jedis.rpoplpush(sourceKey, throwingKey))
+            .hasMessage(SERVER_ERROR_MESSAGE);
 
-    // Test with an exception being thrown from the source RedisList
-    assertThatThrownBy(() -> jedis.rpoplpush(throwingRedisListKey, 
destinationKey))
-        .hasMessage(SERVER_ERROR_MESSAGE);
-
-    assertThat(jedis.lrange(throwingRedisListKey, 0, 
-1)).containsExactly(throwingListElement);
-    assertThat(jedis.lrange(destinationKey, 0, 
-1)).containsExactly(destinationElement);
+    List<String> reversedInitialElements = new ArrayList<>(initialElements);
+    Collections.reverse(reversedInitialElements);
 
-    // Test with an exception being thrown from the destination RedisList
-    assertThatThrownBy(() -> jedis.rpoplpush(sourceKey, throwingRedisListKey))
-        .hasMessage(SERVER_ERROR_MESSAGE);
+    List<String> reversedElementsForThrowingKey = new 
ArrayList<>(elementsForThrowingKey);
+    Collections.reverse(reversedElementsForThrowingKey);
 
-    assertThat(jedis.lrange(sourceKey, 0, 
-1)).containsExactlyInAnyOrder(sourceElements);
-    assertThat(jedis.lrange(throwingRedisListKey, 0, 
-1)).containsExactly(throwingRedisListKey);
+    // Assert rpoplpush has not happened
+    assertThat(jedis.llen(sourceKey)).isEqualTo(sourceSize);
+    assertThat(jedis.lrange(sourceKey, 0, 
-1)).containsExactlyElementsOf(reversedInitialElements);
+    assertThat(jedis.llen(throwingKey)).isEqualTo(destinationSize);

Review Comment:
   It's not necessary to check the length of the lists here, as 
`containsExactlyElementsOf()` implicitly confirms that the length is correct.



##########
geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/set/SMoveDunitTest.java:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.set;
+
+import static 
org.apache.geode.redis.internal.RedisConstants.SERVER_ERROR_MESSAGE;
+import static 
org.apache.geode.redis.internal.services.RegionProvider.DEFAULT_REDIS_REGION_NAME;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class SMoveDunitTest {
+
+  public static final String THROWING_CACHE_WRITER_EXCEPTION = "to be ignored";
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new 
RedisClusterStartupRule();
+
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  public static final String KEY_1 = "key1";
+  public static final String KEY_2 = "key2";
+  private static JedisCluster jedis;
+
+  @Before
+  public void testSetup() {
+    MemberVM locator = clusterStartUp.startLocatorVM(0);
+    clusterStartUp.startRedisVM(1, locator.getPort());
+    clusterStartUp.startRedisVM(2, locator.getPort());
+    clusterStartUp.startRedisVM(3, locator.getPort());
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort), 
20_000);
+    clusterStartUp.flushAll();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    jedis.close();
+  }
+
+
+  @Test
+  public void shouldDistributeDataAmongCluster_andRetainDataAfterServerCrash() 
{
+    int primaryVMIndex = 1;
+    final String tag = "{" + clusterStartUp.getKeyOnServer("tag", 
primaryVMIndex) + "}";
+    final String sourceKey = tag + KEY_1;
+    final String destinationKey = tag + KEY_2;
+
+
+    final int elementsToMove = 5;
+    final int initialElementCount = elementsToMove * 2;
+
+    List<String> members = makeMemberList(initialElementCount, "member1-");
+
+    jedis.sadd(sourceKey, members.toArray(new String[] {}));
+
+    // Move half the elements from the source set to the destination
+    for (int i = 0; i < elementsToMove; ++i) {
+      assertThat(jedis.smove(sourceKey, destinationKey, 
members.get(i))).isEqualTo(1);
+    }
+
+    clusterStartUp.crashVM(primaryVMIndex); // kill primary server
+
+    assertThat(jedis.smembers(sourceKey))
+        .containsExactlyInAnyOrderElementsOf(members.subList(elementsToMove, 
initialElementCount));
+    assertThat(jedis.smembers(destinationKey))
+        .containsExactlyInAnyOrderElementsOf(members.subList(0, 
elementsToMove));
+  }
+
+  @Test
+  public void 
givenBucketsMovedDuringSMove_thenOperationsAreNotLostOrDuplicated()
+      throws InterruptedException, ExecutionException {
+
+    final AtomicBoolean continueRunning = new AtomicBoolean(true);
+    final List<String> hashTags = getHashTagsForEachServer();
+
+    List<String> members1 = makeMemberList(10, "member1-");
+    List<String> members2 = makeMemberList(10, "member2-");
+    List<String> members3 = makeMemberList(10, "member3-");
+
+    jedis.sadd(hashTags.get(0) + KEY_1, members1.toArray(new String[] {}));
+    jedis.sadd(hashTags.get(1) + KEY_1, members2.toArray(new String[] {}));
+    jedis.sadd(hashTags.get(2) + KEY_1, members3.toArray(new String[] {}));
+
+    Future<Void> future1 = executor.runAsync(() -> repeatSMove(hashTags.get(0),
+        members1, continueRunning));
+    Future<Void> future2 = executor.runAsync(() -> repeatSMove(hashTags.get(1),
+        members2, continueRunning));
+    Future<Void> future3 =
+        executor.runAsync(() -> 
repeatSMoveWithSameSourceAndDest(hashTags.get(2),
+            members3, continueRunning));
+
+    for (int i = 0; i < 25 && continueRunning.get(); i++) {
+      clusterStartUp.moveBucketForKey(hashTags.get(i % hashTags.size()));
+      Thread.sleep(200);
+    }
+
+    continueRunning.set(false);
+
+    future1.get();
+    future2.get();
+    future3.get();
+  }
+
+  @Test
+  public void smove_isTransactional() {
+    IgnoredException.addIgnoredException(THROWING_CACHE_WRITER_EXCEPTION);
+
+    int primaryVMIndex = 1;
+    final String tag = "{" + clusterStartUp.getKeyOnServer("tag", 
primaryVMIndex) + "}";
+
+    final String sourceKey = tag + KEY_1;
+    int sourceSize = 2;
+    List<String> members = makeMemberList(sourceSize, "member1-");
+    jedis.sadd(sourceKey, members.toArray(new String[] {}));
+
+    String throwingKey = tag + "ThrowingRedisString";
+    int destinationSize = 2;
+    List<String> membersForThrowingKey = makeMemberList(2, 
"ThrowingRedisStringValue");
+    jedis.sadd(throwingKey, membersForThrowingKey.toArray(new String[] {}));
+
+    // Install a cache writer that will throw an exception if a key with a 
name equal to throwingKey
+    // is updated or created
+    clusterStartUp.getMember(1).invoke(() -> {
+      RedisClusterStartupRule.getCache()
+          .<RedisKey, RedisData>getRegion(DEFAULT_REDIS_REGION_NAME)
+          .getAttributesMutator()
+          .setCacheWriter(new SMoveDunitTest.ThrowingCacheWriter(throwingKey));
+    });
+
+    String memberToRemove = "member1-0";
+
+    assertThatThrownBy(
+        () -> jedis.smove(sourceKey, throwingKey, memberToRemove))
+            .hasMessage(SERVER_ERROR_MESSAGE);
+
+    // Assert smove has not happened
+    assertThat(jedis.scard(sourceKey)).isEqualTo(sourceSize);
+    assertThat(jedis.smembers(sourceKey)).contains(memberToRemove);
+    assertThat(jedis.scard(throwingKey)).isEqualTo(destinationSize);
+    assertThat(jedis.smembers(throwingKey)).doesNotContain(memberToRemove);

Review Comment:
   It would be better to assert on the contents of the sets here rather than 
the sizes and whether or not they contain a specific element:
   ```
       
assertThat(jedis.smembers(sourceKey)).containsExactlyInAnyOrderElementsOf(members);
       
assertThat(jedis.smembers(throwingKey)).containsExactlyInAnyOrderElementsOf(membersForThrowingKey);
   ```



##########
geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java:
##########
@@ -316,10 +323,25 @@ public long rpush(ExecutionHandlerContext context, 
List<byte[]> elementsToAdd, R
     RedisList sourceList = regionProvider.getTypedRedisData(REDIS_LIST, 
source, false);
     RedisList destinationList = regionProvider.getTypedRedisData(REDIS_LIST, 
destination, false);
     Region<RedisKey, RedisData> region = regionProvider.getDataRegion();
-    byte[] moved = sourceList.rpop(region, source);
+
+    if (!sourceList.exists()) {
+      return null;
+    }
+
+    if (source.equals(destination)) {
+      RedisList newSourceList = new RedisList(sourceList);
+      byte[] moved = newSourceList.rpop(region, source);
+      newSourceList.lpush(context, Collections.singletonList(moved), 
destination, false);
+      return moved;
+    }
+
+    RedisList newSourceList = new RedisList(sourceList);
+    byte[] moved = newSourceList.rpop(region, source);
     if (moved != null) {
-      destinationList.lpush(context, Collections.singletonList(moved), 
destination, false);
+      RedisList newDestinationList = new RedisList(destinationList);
+      newDestinationList.lpush(context, Collections.singletonList(moved), 
destination, false);
     }
+
     return moved;

Review Comment:
   This block of code can be simplified and made a little more efficient by 
changing the order in which we're doing things a little:
   ```
       RedisList sourceList = regionProvider.getTypedRedisData(REDIS_LIST, 
source, false);
   
       if (sourceList.isNull()) {
         return null;
       }
       
       RedisList newSourceList = new RedisList(sourceList);
       Region<RedisKey, RedisData> region = regionProvider.getDataRegion();
       byte[] moved = newSourceList.rpop(region, source);
   
       if (source.equals(destination)) {
         newSourceList.lpush(context, Collections.singletonList(moved), 
destination, false);
       } else {
         RedisList destinationList = 
regionProvider.getTypedRedisData(REDIS_LIST, destination, false);
         new RedisList(destinationList).lpush(context, 
Collections.singletonList(moved), destination, false);
       }
       return moved;
   ```
   
   By moving the check for whether the source list is null higher up, we can 
return early without having to retrieve the destination list, which also fixes 
some incorrect behaviour where a nonexistent source key with a non-list 
destination key would return an error instead of returning null. An integration 
test to show this behaviour is:
   ```
     @Test
     public void 
rPopLPush_withNonexistentSourceKeyAndNonListDestinationKey_returnsNull() {
       jedis.set(DESTINATION_KEY, "not_a_list");
   
       assertThat(jedis.rpoplpush(SOURCE_KEY, DESTINATION_KEY)).isNull();
     }
   ```
   
   We can also avoid having to retrieve the destination list from the region if 
the source and destination are the same by moving the `getTypedRedisData()` 
call inside the if statement, which will improve performance in that case. We 
also don't need to check both that the source is not null and that the member 
popped is not null, as the only time the member could be null would be if the 
source was null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to