DonalEvans commented on a change in pull request #7408:
URL: https://github.com/apache/geode/pull/7408#discussion_r825176132



##########
File path: 
geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractBLPopIntegrationTest.java
##########
@@ -92,6 +97,15 @@ public void testBLPopWhenValueExists() {
     assertThat(jedis.lpop(KEY)).isEqualTo("value1");
   }
 
+  @Test
+  public void testBLPopWithDoesNotError_whenTimeoutHasExponent() {

Review comment:
       Typo in this test name, I think. Should this be 
"testBLPopDoesNotError_whenTimeoutHasExponent"?

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventListener.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+/**
+ * Interface intended to be implemented in order to receive Redis events. 
Specifically this would be
+ * keyspace events or blocking commands. EventListeners are registered with the
+ * {@link EventDistributor}.
+ */
+public interface EventListener {
+
+  /**
+   * Receive and process an event. This method should execute very quickly. 
The return value
+   * determines additional process steps for the given event.
+   *
+   * @param commandType the command triggering the event
+   * @param key the key triggering the event
+   * @return response determining subsequent processing steps
+   */
+  EventResponse process(RedisCommandType commandType, RedisKey key);
+
+  /**
+   * Return the list of keys this listener is interested in.
+   */
+  List<RedisKey> keys();
+
+  /**
+   * Method to resubmit a command if appropriate. This is only relevant for 
listeners that process
+   * events for blocking commands. Listeners that handle keyspace event 
notification will not use
+   * this.
+   */
+  void resubmitCommand();
+
+  /**
+   * Retrieve the timeout in nanoseconds for this listener
+   */
+  // long getTimeout();

Review comment:
       This should probably be removed.

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/BlockingCommandListener.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.redis.internal.commands.Command;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.commands.executor.RedisResponse;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class BlockingCommandListener implements EventListener {
+
+  private final ExecutionHandlerContext context;
+  private final Command command;
+  private final List<RedisKey> keys;
+  private final double timeoutSeconds;
+  private final long timeSubmitted;
+  private AtomicBoolean active = new AtomicBoolean(true);

Review comment:
       This can be `final`

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventListener.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+/**
+ * Interface intended to be implemented in order to receive Redis events. 
Specifically this would be
+ * keyspace events or blocking commands. EventListeners are registered with the
+ * {@link EventDistributor}.
+ */
+public interface EventListener {
+
+  /**
+   * Receive and process an event. This method should execute very quickly. 
The return value
+   * determines additional process steps for the given event.
+   *
+   * @param commandType the command triggering the event
+   * @param key the key triggering the event
+   * @return response determining subsequent processing steps
+   */
+  EventResponse process(RedisCommandType commandType, RedisKey key);
+
+  /**
+   * Return the list of keys this listener is interested in.
+   */
+  List<RedisKey> keys();
+
+  /**
+   * Method to resubmit a command if appropriate. This is only relevant for 
listeners that process
+   * events for blocking commands. Listeners that handle keyspace event 
notification will not use
+   * this.
+   */
+  void resubmitCommand();
+
+  /**
+   * Retrieve the timeout in nanoseconds for this listener
+   */
+  // long getTimeout();
+
+  void scheduleTimeout(ScheduledExecutorService executor, EventDistributor 
distributor);

Review comment:
       For consistency with other methods here, this should have some javadoc 
comments.

##########
File path: 
geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractBLPopIntegrationTest.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public abstract class AbstractBLPopIntegrationTest implements 
RedisIntegrationTest {
+  private static final String KEY = "key";
+
+  protected JedisCluster jedis;
+
+  public abstract void awaitEventDistributorSize(int size) throws Exception;
+
+  @ClassRule
+  public static ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void testInvalidArguments_throwErrors() {
+    assertAtLeastNArgs(jedis, Protocol.Command.BLPOP, 2);
+  }
+
+  @Test
+  public void testInvalidTimeout_throwsError() {
+    assertThatThrownBy(() -> jedis.sendCommand("key1", Protocol.Command.BLPOP, 
"key1",
+        "0.A"))
+            .hasMessage(RedisConstants.ERROR_TIMEOUT_INVALID);
+  }
+
+  @Test
+  public void testKeysInDifferentSlots_throwsError() {
+    assertThatThrownBy(() -> jedis.sendCommand("key1", Protocol.Command.BLPOP, 
"key1",
+        "key2", "0"))
+            .hasMessage(RedisConstants.ERROR_WRONG_SLOT);
+  }
+
+  @Test
+  public void testNegativeTimeout_throwsError() {
+    assertThatThrownBy(() -> jedis.blpop(-1, "key1"))
+        .hasMessage(RedisConstants.ERROR_NEGATIVE_TIMEOUT);
+  }
+
+  @Test
+  public void testBLPopForNonListKey() {
+    jedis.set("not-a-list", "value");
+
+    assertThatThrownBy(() -> jedis.blpop(0, "not-a-list"))
+        .hasMessage(RedisConstants.ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void testBLPopWhenValueExists() {
+    jedis.lpush(KEY, "value1", "value2");
+
+    List<String> result = jedis.blpop(0, KEY);
+
+    assertThat(result).containsExactly(KEY, "value2");
+    assertThat(jedis.lpop(KEY)).isEqualTo("value1");
+  }
+
+  @Test
+  public void testBLPopWithDoesNotError_whenTimeoutHasExponent() {
+    jedis.lpush(KEY, "value1", "value2");
+
+    Object result = jedis.sendCommand(KEY, Protocol.Command.BLPOP, KEY, 
"1E+3");
+
+    assertThat(result).isNotNull();
+  }
+
+  @Test
+  public void testBLPopWhenValueDoesNotExist() throws Exception {
+    Future<List<String>> future = executor.submit(() -> jedis.blpop(0, KEY));
+
+    awaitEventDistributorSize(1);
+    jedis.lpush(KEY, "value1", "value2");
+
+    assertThat(future.get()).containsExactly(KEY, "value2");
+    assertThat(jedis.lpop(KEY)).isEqualTo("value1");
+  }
+
+  @Test
+  public void testBLPopWhenTimeoutIsExceeded() {
+    int timeout = 10;
+    Future<List<String>> future = executor.submit(() -> jedis.blpop(timeout, 
KEY));
+    GeodeAwaitility.await().atMost(timeout * 2, TimeUnit.SECONDS)
+        .untilAsserted(() -> assertThat(future.get()).isNull());
+  }
+
+  @Test
+  public void testConcurrentBLPop() throws Exception {
+    int totalElements = 10_000;
+    List<Object> accumulated = Collections.synchronizedList(new 
ArrayList<>(totalElements + 2));
+    AtomicBoolean running = new AtomicBoolean(true);
+
+    Future<Void> future1 = executor.submit(() -> doBLPop(running, 
accumulated));
+    Future<Void> future2 = executor.submit(() -> doBLPop(running, 
accumulated));
+
+    List<String> result = new ArrayList<>();
+    for (int i = 0; i < totalElements; i++) {
+      jedis.lpush(KEY, "value" + i);
+      result.add("value" + i);
+    }
+
+    GeodeAwaitility.await().atMost(Duration.ofSeconds(5))
+        .untilAsserted(() -> 
assertThat(accumulated.size()).isEqualTo(totalElements));

Review comment:
       This await should probably use the default value, since we're not trying 
to test the speed of commands here, just that they execute correctly. If the 
machine this test is running on happens to encounter some resource contention, 
then it's possible this test could fail due to this timeout being small.

##########
File path: 
geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopDUnitTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.concurrent.Future;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+
+import org.apache.geode.redis.internal.GeodeRedisService;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class BLPopDUnitTest {

Review comment:
       Would there be value in adding a test for what happens when we submit a 
BLPOP command, then crash the server the key would be hosted on if it existed, 
then do an LPUSH?

##########
File path: 
geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractBLPopIntegrationTest.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public abstract class AbstractBLPopIntegrationTest implements 
RedisIntegrationTest {

Review comment:
       Could we also get a test for the behaviour when we pass multiple keys to 
BLPOP, but only one of them already exists (but it's not the first one), and a 
test for the behaviour when we pass multiple keys to BLPOP and multiple of them 
already exist (we should return the first key in the list of arguments that we 
passed that exists). The last test should show the issue described in 
`BLPopExecutor` to do with the list of keys being modified in-place by the 
locked execute call.

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/BLPopExecutor.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.redis.internal.RedisConstants.ERROR_NEGATIVE_TIMEOUT;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.redis.internal.commands.Command;
+import org.apache.geode.redis.internal.commands.executor.CommandExecutor;
+import org.apache.geode.redis.internal.commands.executor.RedisResponse;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.data.RedisList;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class BLPopExecutor implements CommandExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext 
context) {
+    List<byte[]> arguments = command.getCommandArguments();
+    double timeoutSeconds;
+    try {
+      timeoutSeconds = Coder.bytesToDouble(arguments.get(arguments.size() - 
1));
+    } catch (NumberFormatException e) {
+      return RedisResponse.error(RedisConstants.ERROR_TIMEOUT_INVALID);
+    }
+
+    if (timeoutSeconds < 0) {
+      return RedisResponse.error(ERROR_NEGATIVE_TIMEOUT);
+    }
+
+    int keyCount = arguments.size() - 1;
+    List<RedisKey> keys = new ArrayList<>(keyCount);
+    for (int i = 0; i < keyCount; i++) {
+      keys.add(new RedisKey(arguments.get(i)));
+    }
+
+    List<byte[]> popped = context.lockedExecute(keys.get(0), keys,
+        () -> RedisList.blpop(context, command, keys, timeoutSeconds));

Review comment:
       Passing `keys` to both `blpop()` and `lockedExecute()` could result in 
incorrect behaviour in `blpop()` since `LockingStripedCoordinator.execute()` 
(which is called by `lockedExecute()` sorts the list in-place, which will 
result in the list passed to `blpop()` also being modified. In order to prevent 
this, a copy of the list needs to be passed to one of the methods.

##########
File path: 
geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/EventDistributorTest.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class EventDistributorTest {
+
+  public static class TestEventListener implements EventListener {
+    private final List<RedisKey> keys;
+    private int fired = 0;
+
+    public TestEventListener(RedisKey... keys) {
+      this.keys = Arrays.stream(keys).collect(Collectors.toList());
+    }
+
+    public int getFired() {
+      return fired;
+    }
+
+    @Override
+    public EventResponse process(RedisCommandType commandType, RedisKey key) {
+      fired += 1;
+      return EventResponse.REMOVE_AND_STOP;
+    }
+
+    @Override
+    public List<RedisKey> keys() {
+      return keys;
+    }
+
+    @Override
+    public void resubmitCommand() {}
+
+    @Override
+    public void scheduleTimeout(ScheduledExecutorService executor, 
EventDistributor distributor) {}
+  }
+
+  @Test
+  public void firingEventRemovesListener() {
+    RedisKey keyA = new RedisKey("a".getBytes());
+    RedisKey keyB = new RedisKey("b".getBytes());
+    EventDistributor distributor = new EventDistributor();
+    TestEventListener listener = new TestEventListener(keyA, keyB);
+    distributor.registerListener(listener);
+
+    distributor.fireEvent(null, keyA);
+    assertThat(listener.getFired()).isEqualTo(1);
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(0);
+  }
+
+  @Test
+  public void firingEventRemovesFirstListener_whenMultipleExist() {
+    RedisKey keyA = new RedisKey("a".getBytes());
+    RedisKey keyB = new RedisKey("b".getBytes());
+    EventDistributor distributor = new EventDistributor();
+    TestEventListener listener1 = new TestEventListener(keyA, keyB);
+    TestEventListener listener2 = new TestEventListener(keyA, keyB);
+    distributor.registerListener(listener1);
+    distributor.registerListener(listener2);
+
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(4);
+
+    distributor.fireEvent(null, keyA);
+    assertThat(listener1.getFired()).isEqualTo(1);
+    assertThat(listener2.getFired()).isEqualTo(0);
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(2);
+
+    distributor.fireEvent(null, keyA);
+    assertThat(listener1.getFired()).isEqualTo(1);
+    assertThat(listener2.getFired()).isEqualTo(1);
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(0);
+  }
+
+  @Test
+  public void listenerIsRemovedAfterBucketMoves() {
+    RedisKey keyA = new RedisKey("a".getBytes());
+    RedisKey keyB = new RedisKey("b".getBytes());
+    EventDistributor distributor = new EventDistributor();
+    TestEventListener listener1 = new TestEventListener(keyA);
+    distributor.registerListener(listener1);
+    distributor.registerListener(new TestEventListener(keyB));
+
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(2);
+
+    distributor.afterBucketRemoved(keyA.getBucketId(), null);
+
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(1);
+  }
+
+  @Test
+  public void concurrencyOfManyRegistrationsAndBucketMovementForTheSameKeys() {
+    EventDistributor distributor = new EventDistributor();
+    RedisKey keyA = new RedisKey("keyA".getBytes());
+    RedisKey keyB = new RedisKey("keyB".getBytes());
+    RedisKey keyC = new RedisKey("keyC".getBytes());
+
+    // Should not produce any exceptions
+    new ConcurrentLoopingThreads(10_000,
+        i -> distributor.registerListener(new TestEventListener(keyA, keyB, 
keyC)),
+        i -> distributor.registerListener(new TestEventListener(keyB, keyC, 
keyA)),
+        i -> distributor.registerListener(new TestEventListener(keyC, keyA, 
keyB)),
+        i -> distributor.afterBucketRemoved(keyA.getBucketId(), null),
+        i -> distributor.afterBucketRemoved(keyB.getBucketId(), null),
+        i -> distributor.afterBucketRemoved(keyC.getBucketId(), null),
+        i -> distributor.fireEvent(null, keyA),
+        i -> distributor.fireEvent(null, keyB),
+        i -> distributor.fireEvent(null, keyC))
+            .runInLockstep();
+
+    distributor.fireEvent(null, keyA);
+    distributor.fireEvent(null, keyB);
+    distributor.fireEvent(null, keyC);
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(0);
+  }
+
+  @Test
+  public void concurrencyOfManyRegistrationAndRemovalOfSameListener() {
+    EventDistributor distributor = new EventDistributor();
+    RedisKey keyA = new RedisKey("keyA".getBytes());
+    RedisKey keyB = new RedisKey("keyB".getBytes());
+    RedisKey keyC = new RedisKey("keyC".getBytes());
+    AtomicReference<EventListener> listenerRef1 =
+        new AtomicReference<>(new TestEventListener(keyA, keyB, keyC));
+    AtomicReference<EventListener> listenerRef2 =
+        new AtomicReference<>(new TestEventListener(keyB, keyC, keyA));
+    AtomicReference<EventListener> listenerRef3 =
+        new AtomicReference<>(new TestEventListener(keyC, keyA, keyB));
+
+    // Should not produce any exceptions
+    new ConcurrentLoopingThreads(10_000,
+        i -> distributor.registerListener(listenerRef1.get()),
+        i -> distributor.registerListener(listenerRef2.get()),
+        i -> distributor.registerListener(listenerRef3.get()),
+        i -> distributor.removeListener(listenerRef1.get()),
+        i -> distributor.removeListener(listenerRef2.get()),
+        i -> distributor.removeListener(listenerRef3.get()),
+        i -> distributor.fireEvent(null, keyA),
+        i -> distributor.fireEvent(null, keyB),
+        i -> distributor.fireEvent(null, keyC))
+            .runWithAction(() -> {
+              listenerRef1.set(new TestEventListener(keyA, keyB, keyC));
+              listenerRef2.set(new TestEventListener(keyB, keyC, keyA));
+              listenerRef3.set(new TestEventListener(keyC, keyA, keyB));
+            });
+
+    distributor.fireEvent(null, keyA);
+    distributor.fireEvent(null, keyB);
+    distributor.fireEvent(null, keyC);
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(0);
+  }
+
+  @Test
+  public void ensureNotRegisteringListenerOnQueueJustRemoved() {

Review comment:
       This test name confuses me a little. Could it be reworded to better 
explain what the test is doing or intending to show please?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to