This is an automated email from the ASF dual-hosted git repository. udo pushed a commit to branch revert-5752-feature/GEODE-8714 in repository https://gitbox.apache.org/repos/asf/geode.git
commit d7298924e184b729b34759c25c8714bb3f4f69a8 Author: Udo Kohlmeyer <kohlmu-pivo...@users.noreply.github.com> AuthorDate: Wed Nov 18 10:17:38 2020 +1100 Revert "GEODE-8714: return event to queue at stoping of gw sender (#5752)" This reverts commit 1eb9f344b3f83871e32a521c57068176230fb04e. --- .../wan/parallel/ParallelGatewaySenderQueue.java | 9 +- ...ANPersistenceEnabledGatewaySenderDUnitTest.java | 100 --------------------- 2 files changed, 6 insertions(+), 103 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 2788bb2..9366d5f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1731,9 +1731,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue { for (int i = helpArray.length - 1; i >= 0; i--) { GatewaySenderEventImpl event = (GatewaySenderEventImpl) helpArray[i]; final int bucketId = event.getBucketId(); - BucketRegionQueue brq = getBucketRegionQueueByBucketId(getRandomShadowPR(), bucketId); - if (brq != null) { - brq.pushKeyIntoQueue(event.getShadowKey()); + final PartitionedRegion region = (PartitionedRegion) event.getRegion(); + if (region.getRegionAdvisor().isPrimaryForBucket(bucketId)) { + BucketRegionQueue brq = getBucketRegionQueueByBucketId(getRandomShadowPR(), bucketId); + if (brq != null) { + brq.pushKeyIntoQueue(event.getShadowKey()); + } } } } diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java index 9364187..7070ae0 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java @@ -1897,106 +1897,6 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends WANTest vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 3000)); } - /** - * Enable persistence for PR and GatewaySender. Do some puts in local region. Restart 1 server, - * then stop gateway sender, and stop server. After that create receiver on remote site. - * Check if the remote site receives all the events. - */ - @Test - public void testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopServer() { - // create locator on local site - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - // create locator on remote site - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); - - // create receiver on remote site - createCacheInVMs(nyPort, vm2, vm3); - // createReceiverInVMs(vm2, vm3); - - // create cache in local site - createCacheInVMs(lnPort, vm4, vm5); - vm4.invoke(() -> setNumDispatcherThreadsForTheRun(2)); - vm5.invoke(() -> setNumDispatcherThreadsForTheRun(2)); - - // create senders with disk store - String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, - true, 100, 10, false, true, null, null, true)); - String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, - true, 100, 10, false, true, null, null, true)); - - LogWriterUtils.getLogWriter() - .info("The DS are: " + diskStore1 + "," + diskStore2); - - // create PR on remote site - vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1, - 13, isOffHeap())); - vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1, - 13, isOffHeap())); - - // create PR on local site - vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, - 13, isOffHeap())); - vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, - 13, isOffHeap())); - - // start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - // wait for senders to become running - vm4.invoke(waitForSenderRunnable()); - vm5.invoke(waitForSenderRunnable()); - - // start puts in region on local site - vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 10)); - LogWriterUtils.getLogWriter().info("Completed puts in the region"); - - // --------------------close and rebuild local site - // ------------------------------------------------- - // kill the sender in vm5 - vm5.invoke(killSenderRunnable()); - - LogWriterUtils.getLogWriter().info("Killed vm5 sender."); - - // restart the vm - createCacheInVMs(lnPort, vm5); - vm5.invoke(() -> setNumDispatcherThreadsForTheRun(2)); - - LogWriterUtils.getLogWriter().info("Created back the cache"); - - // create senders with disk store - vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true, - null, diskStore2, false)); - - LogWriterUtils.getLogWriter().info("Created the senders back from the disk store."); - // create PR on local site - vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, - 13, isOffHeap())); - - - LogWriterUtils.getLogWriter().info("Created back the partitioned regions"); - - LogWriterUtils.getLogWriter().info("Waiting for senders running."); - // wait for senders running - vm5.invoke(waitForSenderRunnable()); - - LogWriterUtils.getLogWriter().info("All the senders are now running..."); - - // ---------------------------------------------------------------------------------------------------- - - vm4.invoke(() -> WANTestBase.stopSender("ln")); - vm5.invoke(() -> WANTestBase.stopSender("ln")); - - vm5.invoke(killSenderRunnable()); - - vm4.invoke(() -> WANTestBase.startSender("ln")); - - - createReceiverInVMs(vm2, vm3); - - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 10)); - vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 10)); - } - /** * setIgnoreQueue has lots of callers by reflection