DonalEvans commented on a change in pull request #7422:
URL: https://github.com/apache/geode/pull/7422#discussion_r824134546
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
##########
@@ -469,6 +469,16 @@ protected void processQueue() {
boolean interrupted = Thread.interrupted();
try {
if (resetLastPeekedEvents) {
+ if (!batchIdToEventsMap.isEmpty()) {
+ for (Map.Entry<Integer, List<GatewaySenderEventImpl>[]> entry
: batchIdToEventsMap
+ .entrySet()) {
+ for (GatewaySenderEventImpl event : entry.getValue()[0]) {
+ if (!event.getPossibleDuplicate()) {
+ event.setPossibleDuplicate(true);
+ }
+ }
+ }
+ }
resetLastPeekedEvents();
resetLastPeekedEvents = false;
}
Review comment:
This entire block is duplicated on line 1229, so it should probably be
extracted to a method. Also, it's not clear to me why we set all events as
possible duplicates here, since the PR title says we're only doing that when
stopping the gateway sender.
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2231,6 +2240,102 @@ public void
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectN
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
}
+
+ /**
+ * Enable persistence for region as well as GatewaySender and see if remote
site receives all the
+ * events.
+ */
+ @Test
+ public void
testPersistentPartitionedRegionWithGatewaySenderStartStopEventsDispatchedNoChangesInQueue()
{
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5);
+ vm4.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+ vm5.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+
+ vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
true, null, false));
+ vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
true, null, false));
+
+ vm4.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+ vm5.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+
+
+ vm2.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+ vm3.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
+
+
+ vm4.invoke(() -> WANTestBase.stopSender("ln"));
+ vm5.invoke(() -> WANTestBase.stopSender("ln"));
+
+ logger.info("Stopped all the senders.");
+
+ // wait for senders to stop
+ vm4.invoke(waitForSenderNonRunnable());
+ vm5.invoke(waitForSenderNonRunnable());
+
+ createReceiverInVMs(vm2, vm3);
+
+ AsyncInvocation inv1 = vm4.invokeAsync(startSenderRunnable());
+ AsyncInvocation inv2 = vm5.invokeAsync(startSenderRunnable());
+
+ try {
+ inv1.join();
+ inv2.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ logger.info("Waiting for senders running.");
+ // wait for senders running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+
+ logger.info("All the senders are now running...");
+
+ AsyncInvocation inv3 = vm4.invokeAsync(stopSenderRunnable());
+ AsyncInvocation inv4 = vm5.invokeAsync(stopSenderRunnable());
+
+ try {
+ inv3.join();
+ inv4.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ vm4.invoke(waitForSenderNonRunnable());
+ vm5.invoke(waitForSenderNonRunnable());
+
+ Integer localSize1 = vm4.invoke(() -> WANTestBase.getPRQLocalSize("ln"));
+ Integer localSize2 = vm5.invoke(() -> WANTestBase.getPRQLocalSize("ln"));
+
+ assertThat(localSize1 + localSize2).isEqualTo(1000);
+
+ Integer regionSize1 = vm2.invoke(() ->
WANTestBase.getRegionSize(getTestMethodName()));
+ Integer regionSize2 = vm3.invoke(() ->
WANTestBase.getRegionSize(getTestMethodName()));
+
+ assertThat(regionSize1).isGreaterThan(0);
+ assertThat(regionSize2).isGreaterThan(0);
Review comment:
Given the comment at the top of this test, which says that we want to
ensure that the remote site receives all of the events, it seems strange to
only be asserting that there is at least one thing in the region, rather than
all of the entries we expect to be there. Could these assertions be made
stronger?
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2231,6 +2240,102 @@ public void
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectN
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
}
+
+ /**
+ * Enable persistence for region as well as GatewaySender and see if remote
site receives all the
+ * events.
+ */
Review comment:
This comment doesn't seem to accurately reflect what this test is
intending to show, since it doesn't mention possible duplicates at all. Would
it be possible to get a better description of what this test is doing?
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2231,6 +2240,102 @@ public void
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectN
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
}
+
+ /**
+ * Enable persistence for region as well as GatewaySender and see if remote
site receives all the
+ * events.
+ */
+ @Test
+ public void
testPersistentPartitionedRegionWithGatewaySenderStartStopEventsDispatchedNoChangesInQueue()
{
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5);
+ vm4.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+ vm5.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+
+ vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
true, null, false));
+ vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
true, null, false));
+
+ vm4.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+ vm5.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+
+
+ vm2.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+ vm3.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
+
+
+ vm4.invoke(() -> WANTestBase.stopSender("ln"));
+ vm5.invoke(() -> WANTestBase.stopSender("ln"));
+
+ logger.info("Stopped all the senders.");
+
+ // wait for senders to stop
+ vm4.invoke(waitForSenderNonRunnable());
+ vm5.invoke(waitForSenderNonRunnable());
+
+ createReceiverInVMs(vm2, vm3);
+
+ AsyncInvocation inv1 = vm4.invokeAsync(startSenderRunnable());
+ AsyncInvocation inv2 = vm5.invokeAsync(startSenderRunnable());
+
+ try {
+ inv1.join();
+ inv2.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ logger.info("Waiting for senders running.");
+ // wait for senders running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+
+ logger.info("All the senders are now running...");
+
+ AsyncInvocation inv3 = vm4.invokeAsync(stopSenderRunnable());
+ AsyncInvocation inv4 = vm5.invokeAsync(stopSenderRunnable());
+
+ try {
+ inv3.join();
+ inv4.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
Review comment:
This would be better as:
```
AsyncInvocation<Void> inv3 = vm4.invokeAsync(stopSenderRunnable());
AsyncInvocation<Void> inv4 = vm5.invokeAsync(stopSenderRunnable());
inv3.get();
inv4.get();
```
to avoid using the deprecated method `AsyncInvocation.join()` or using
`fail()` which does not provide any details about why the test failed in the
error message.
I'm also a little concerned that the test may become flaky if for some
reason there is a delay between the two async invocations completing, due to GC
or something, which would then make the assertion on line 2322 fail.
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2231,6 +2240,102 @@ public void
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectN
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
}
+
+ /**
+ * Enable persistence for region as well as GatewaySender and see if remote
site receives all the
+ * events.
+ */
+ @Test
+ public void
testPersistentPartitionedRegionWithGatewaySenderStartStopEventsDispatchedNoChangesInQueue()
{
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5);
+ vm4.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+ vm5.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+
+ vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
true, null, false));
+ vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
true, null, false));
+
+ vm4.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+ vm5.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+
+
+ vm2.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+ vm3.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
+
+
+ vm4.invoke(() -> WANTestBase.stopSender("ln"));
+ vm5.invoke(() -> WANTestBase.stopSender("ln"));
+
+ logger.info("Stopped all the senders.");
+
+ // wait for senders to stop
+ vm4.invoke(waitForSenderNonRunnable());
+ vm5.invoke(waitForSenderNonRunnable());
+
+ createReceiverInVMs(vm2, vm3);
+
+ AsyncInvocation inv1 = vm4.invokeAsync(startSenderRunnable());
+ AsyncInvocation inv2 = vm5.invokeAsync(startSenderRunnable());
+
+ try {
+ inv1.join();
+ inv2.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ logger.info("Waiting for senders running.");
+ // wait for senders running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+
+ logger.info("All the senders are now running...");
+
+ AsyncInvocation inv3 = vm4.invokeAsync(stopSenderRunnable());
+ AsyncInvocation inv4 = vm5.invokeAsync(stopSenderRunnable());
+
+ try {
+ inv3.join();
+ inv4.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ vm4.invoke(waitForSenderNonRunnable());
+ vm5.invoke(waitForSenderNonRunnable());
+
+ Integer localSize1 = vm4.invoke(() -> WANTestBase.getPRQLocalSize("ln"));
+ Integer localSize2 = vm5.invoke(() -> WANTestBase.getPRQLocalSize("ln"));
+
+ assertThat(localSize1 + localSize2).isEqualTo(1000);
+
+ Integer regionSize1 = vm2.invoke(() ->
WANTestBase.getRegionSize(getTestMethodName()));
+ Integer regionSize2 = vm3.invoke(() ->
WANTestBase.getRegionSize(getTestMethodName()));
+
+ assertThat(regionSize1).isGreaterThan(0);
+ assertThat(regionSize2).isGreaterThan(0);
+
+ Integer vm4NumDupplicate = vm4.invoke(() ->
WANTestBase.getNumOfPosssibleDuplicateEvents("ln"));
+ Integer vm5NumDupplicate = vm5.invoke(() ->
WANTestBase.getNumOfPosssibleDuplicateEvents("ln"));
Review comment:
Typo here, should be "Duplicate"
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2231,6 +2240,102 @@ public void
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectN
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
}
+
+ /**
+ * Enable persistence for region as well as GatewaySender and see if remote
site receives all the
+ * events.
+ */
+ @Test
+ public void
testPersistentPartitionedRegionWithGatewaySenderStartStopEventsDispatchedNoChangesInQueue()
{
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5);
+ vm4.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+ vm5.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+
+ vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
true, null, false));
+ vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
true, null, false));
+
+ vm4.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+ vm5.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+
+
+ vm2.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+ vm3.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
+
+
+ vm4.invoke(() -> WANTestBase.stopSender("ln"));
+ vm5.invoke(() -> WANTestBase.stopSender("ln"));
+
+ logger.info("Stopped all the senders.");
+
+ // wait for senders to stop
+ vm4.invoke(waitForSenderNonRunnable());
+ vm5.invoke(waitForSenderNonRunnable());
+
+ createReceiverInVMs(vm2, vm3);
+
+ AsyncInvocation inv1 = vm4.invokeAsync(startSenderRunnable());
+ AsyncInvocation inv2 = vm5.invokeAsync(startSenderRunnable());
+
+ try {
+ inv1.join();
+ inv2.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
Review comment:
This would be better as:
```
AsyncInvocation<Void> inv1 = vm4.invokeAsync(startSenderRunnable());
AsyncInvocation<Void> inv2 = vm5.invokeAsync(startSenderRunnable());
inv1.get();
inv2.get();
```
with `throws InterruptedException` added to the test method signature.
Also, it's not clear to me why these calls need to be asynchronous. If I
change the above to:
```
vm4.invoke(startSenderRunnable());
vm5.invoke(startSenderRunnable());
```
then the test still passes.
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2231,6 +2240,102 @@ public void
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectN
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
}
+
+ /**
+ * Enable persistence for region as well as GatewaySender and see if remote
site receives all the
+ * events.
+ */
+ @Test
+ public void
testPersistentPartitionedRegionWithGatewaySenderStartStopEventsDispatchedNoChangesInQueue()
{
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5);
+ vm4.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+ vm5.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+
+ vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
true, null, false));
+ vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
true, null, false));
+
+ vm4.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+ vm5.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+
+
+ vm2.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+ vm3.invoke(() ->
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
+
+
+ vm4.invoke(() -> WANTestBase.stopSender("ln"));
+ vm5.invoke(() -> WANTestBase.stopSender("ln"));
+
+ logger.info("Stopped all the senders.");
+
+ // wait for senders to stop
+ vm4.invoke(waitForSenderNonRunnable());
+ vm5.invoke(waitForSenderNonRunnable());
+
+ createReceiverInVMs(vm2, vm3);
+
+ AsyncInvocation inv1 = vm4.invokeAsync(startSenderRunnable());
+ AsyncInvocation inv2 = vm5.invokeAsync(startSenderRunnable());
+
+ try {
+ inv1.join();
+ inv2.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ logger.info("Waiting for senders running.");
+ // wait for senders running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+
+ logger.info("All the senders are now running...");
+
+ AsyncInvocation inv3 = vm4.invokeAsync(stopSenderRunnable());
+ AsyncInvocation inv4 = vm5.invokeAsync(stopSenderRunnable());
+
+ try {
+ inv3.join();
+ inv4.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ vm4.invoke(waitForSenderNonRunnable());
+ vm5.invoke(waitForSenderNonRunnable());
+
+ Integer localSize1 = vm4.invoke(() -> WANTestBase.getPRQLocalSize("ln"));
+ Integer localSize2 = vm5.invoke(() -> WANTestBase.getPRQLocalSize("ln"));
+
+ assertThat(localSize1 + localSize2).isEqualTo(1000);
+
+ Integer regionSize1 = vm2.invoke(() ->
WANTestBase.getRegionSize(getTestMethodName()));
+ Integer regionSize2 = vm3.invoke(() ->
WANTestBase.getRegionSize(getTestMethodName()));
+
+ assertThat(regionSize1).isGreaterThan(0);
+ assertThat(regionSize2).isGreaterThan(0);
+
+ Integer vm4NumDupplicate = vm4.invoke(() ->
WANTestBase.getNumOfPosssibleDuplicateEvents("ln"));
+ Integer vm5NumDupplicate = vm5.invoke(() ->
WANTestBase.getNumOfPosssibleDuplicateEvents("ln"));
+
+ assertThat(vm4NumDupplicate + vm5NumDupplicate).isGreaterThan(100);
Review comment:
Why are we asserting that this value is greater than 100 specifically?
Could this assertion be stronger, perhaps asserting that the total number of
duplicate events is equal to the total size of the queues?
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
##########
@@ -646,7 +646,7 @@ public boolean isReadyForPeek() {
}
@VisibleForTesting
- List<Object> getHelperQueueList() {
+ public List<Object> getHelperQueueList() {
Review comment:
This method should no longer be marked `@VisibleForTesting` as it's
public due to use by non-test code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]