This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch revert-5752-feature/GEODE-8714
in repository https://gitbox.apache.org/repos/asf/geode.git

commit d7298924e184b729b34759c25c8714bb3f4f69a8
Author: Udo Kohlmeyer <kohlmu-pivo...@users.noreply.github.com>
AuthorDate: Wed Nov 18 10:17:38 2020 +1100

    Revert "GEODE-8714: return event to queue at stoping of gw sender (#5752)"
    
    This reverts commit 1eb9f344b3f83871e32a521c57068176230fb04e.
---
 .../wan/parallel/ParallelGatewaySenderQueue.java   |   9 +-
 ...ANPersistenceEnabledGatewaySenderDUnitTest.java | 100 ---------------------
 2 files changed, 6 insertions(+), 103 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 2788bb2..9366d5f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1731,9 +1731,12 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     for (int i = helpArray.length - 1; i >= 0; i--) {
       GatewaySenderEventImpl event = (GatewaySenderEventImpl) helpArray[i];
       final int bucketId = event.getBucketId();
-      BucketRegionQueue brq = 
getBucketRegionQueueByBucketId(getRandomShadowPR(), bucketId);
-      if (brq != null) {
-        brq.pushKeyIntoQueue(event.getShadowKey());
+      final PartitionedRegion region = (PartitionedRegion) event.getRegion();
+      if (region.getRegionAdvisor().isPrimaryForBucket(bucketId)) {
+        BucketRegionQueue brq = 
getBucketRegionQueueByBucketId(getRandomShadowPR(), bucketId);
+        if (brq != null) {
+          brq.pushKeyIntoQueue(event.getShadowKey());
+        }
       }
     }
   }
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
index 9364187..7070ae0 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -1897,106 +1897,6 @@ public class 
ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends WANTest
     vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 
3000));
   }
 
-  /**
-   * Enable persistence for PR and GatewaySender. Do some puts in local 
region. Restart 1 server,
-   * then stop gateway sender, and stop server. After that create receiver on 
remote site.
-   * Check if the remote site receives all the events.
-   */
-  @Test
-  public void 
testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopServer() {
-    // create locator on local site
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    // create locator on remote site
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
-
-    // create receiver on remote site
-    createCacheInVMs(nyPort, vm2, vm3);
-    // createReceiverInVMs(vm2, vm3);
-
-    // create cache in local site
-    createCacheInVMs(lnPort, vm4, vm5);
-    vm4.invoke(() -> setNumDispatcherThreadsForTheRun(2));
-    vm5.invoke(() -> setNumDispatcherThreadsForTheRun(2));
-
-    // create senders with disk store
-    String diskStore1 = (String) vm4.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
-        true, 100, 10, false, true, null, null, true));
-    String diskStore2 = (String) vm5.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
-        true, 100, 10, false, true, null, null, true));
-
-    LogWriterUtils.getLogWriter()
-        .info("The DS are: " + diskStore1 + "," + diskStore2);
-
-    // create PR on remote site
-    vm2.invoke(() -> 
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
-        13, isOffHeap()));
-    vm3.invoke(() -> 
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
-        13, isOffHeap()));
-
-    // create PR on local site
-    vm4.invoke(() -> 
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
-        13, isOffHeap()));
-    vm5.invoke(() -> 
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
-        13, isOffHeap()));
-
-    // start the senders on local site
-    startSenderInVMs("ln", vm4, vm5);
-
-    // wait for senders to become running
-    vm4.invoke(waitForSenderRunnable());
-    vm5.invoke(waitForSenderRunnable());
-
-    // start puts in region on local site
-    vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 10));
-    LogWriterUtils.getLogWriter().info("Completed puts in the region");
-
-    // --------------------close and rebuild local site
-    // -------------------------------------------------
-    // kill the sender in vm5
-    vm5.invoke(killSenderRunnable());
-
-    LogWriterUtils.getLogWriter().info("Killed vm5 sender.");
-
-    // restart the vm
-    createCacheInVMs(lnPort, vm5);
-    vm5.invoke(() -> setNumDispatcherThreadsForTheRun(2));
-
-    LogWriterUtils.getLogWriter().info("Created back the cache");
-
-    // create senders with disk store
-    vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 
10, false, true,
-        null, diskStore2, false));
-
-    LogWriterUtils.getLogWriter().info("Created the senders back from the disk 
store.");
-    // create PR on local site
-    vm5.invoke(() -> 
WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
-        13, isOffHeap()));
-
-
-    LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
-
-    LogWriterUtils.getLogWriter().info("Waiting for senders running.");
-    // wait for senders running
-    vm5.invoke(waitForSenderRunnable());
-
-    LogWriterUtils.getLogWriter().info("All the senders are now running...");
-
-    // 
----------------------------------------------------------------------------------------------------
-
-    vm4.invoke(() -> WANTestBase.stopSender("ln"));
-    vm5.invoke(() -> WANTestBase.stopSender("ln"));
-
-    vm5.invoke(killSenderRunnable());
-
-    vm4.invoke(() -> WANTestBase.startSender("ln"));
-
-
-    createReceiverInVMs(vm2, vm3);
-
-    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 10));
-    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 10));
-  }
-
 
   /**
    * setIgnoreQueue has lots of callers by reflection

Reply via email to