Repository: geode
Updated Branches:
  refs/heads/develop 0b4a1a239 -> 5ec406984


GEODE-3030: Set possibleDuplicate=true for all bucket events after failover


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/5ec40698
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/5ec40698
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/5ec40698

Branch: refs/heads/develop
Commit: 5ec4069848a0ec48466240c17003b8f25fe2fcdd
Parents: 0b4a1a2
Author: Barry Oglesby <bogle...@pivotal.io>
Authored: Thu Jul 27 17:02:45 2017 -0700
Committer: Barry Oglesby <bogle...@pivotal.io>
Committed: Fri Jul 28 12:19:43 2017 -0700

----------------------------------------------------------------------
 .../cache/AbstractBucketRegionQueue.java        | 10 +-
 .../geode/internal/cache/BucketRegionQueue.java |  3 +-
 .../PossibleDuplicateAsyncEventListener.java    | 78 ++++++++++++++++
 .../asyncqueue/AsyncEventListenerDUnitTest.java | 96 ++++++++++++++++++++
 4 files changed, 180 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/5ec40698/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index eacb8fd..3674474 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -298,13 +298,13 @@ public abstract class AbstractBucketRegionQueue extends 
BucketRegion {
   }
 
   /**
-   * Marks batchSize number of events in the iterator as duplicate
+   * Marks all events in the iterator as duplicate
    */
-  protected void markEventsAsDuplicate(int batchSize, Iterator itr) {
+  protected void markEventsAsDuplicate(Iterator itr) {
     int i = 0;
-    // mark number of event equal to the batchSize for setPossibleDuplicate to
-    // true before this bucket becomes primary on the node
-    while (i < batchSize && itr.hasNext()) {
+    // mark setPossibleDuplicate to true for all events in this bucket before 
it becomes primary on
+    // the node
+    while (itr.hasNext()) {
       Object key = itr.next();
       Object senderEvent = getNoLRU(key, true, false, false);
 

http://git-wip-us.apache.org/repos/asf/geode/blob/5ec40698/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 5ce5963..567f46f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -203,9 +203,8 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
 
   @Override
   public void beforeAcquiringPrimaryState() {
-    int batchSize = 
this.getPartitionedRegion().getParallelGatewaySender().getBatchSize();
     Iterator<Object> itr = eventSeqNumDeque.iterator();
-    markEventsAsDuplicate(batchSize, itr);
+    markEventsAsDuplicate(itr);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/geode/blob/5ec40698/geode-core/src/test/java/org/apache/geode/internal/cache/wan/PossibleDuplicateAsyncEventListener.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/PossibleDuplicateAsyncEventListener.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/PossibleDuplicateAsyncEventListener.java
new file mode 100644
index 0000000..45f3dc5
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/PossibleDuplicateAsyncEventListener.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PossibleDuplicateAsyncEventListener implements AsyncEventListener 
{
+
+  private final CountDownLatch latch = new CountDownLatch(1);
+
+  private final AtomicInteger numberOfEvents = new AtomicInteger();
+
+  private final AtomicInteger numberOfPossibleDuplicateEvents = new 
AtomicInteger();
+
+  public boolean processEvents(List<AsyncEvent> events) {
+    try {
+      waitToStartProcessingEvents();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(
+          "PossibleDuplicateAsyncEventListener processEvents was interrupted");
+    }
+    for (AsyncEvent event : events) {
+      process(event);
+    }
+    return true;
+  }
+
+  private void process(AsyncEvent event) {
+    if (event.getPossibleDuplicate()) {
+      incrementTotalPossibleDuplicateEvents();
+    }
+    incrementTotalEvents();
+  }
+
+  private void waitToStartProcessingEvents() throws InterruptedException {
+    this.latch.await(60, TimeUnit.SECONDS);
+  }
+
+  public void startProcessingEvents() {
+    this.latch.countDown();
+  }
+
+  private int incrementTotalEvents() {
+    return this.numberOfEvents.incrementAndGet();
+  }
+
+  public int getTotalEvents() {
+    return this.numberOfEvents.get();
+  }
+
+  private void incrementTotalPossibleDuplicateEvents() {
+    this.numberOfPossibleDuplicateEvents.incrementAndGet();
+  }
+
+  public int getTotalPossibleDuplicateEvents() {
+    return this.numberOfPossibleDuplicateEvents.get();
+  }
+
+  public void close() {}
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/5ec40698/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index 1a57bde..6964edf 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -27,6 +27,8 @@ import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
@@ -44,6 +46,7 @@ import 
org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage;
 import 
org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse;
 import org.apache.geode.internal.cache.wan.AsyncEventQueueTestBase;
 import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
+import org.apache.geode.internal.cache.wan.PossibleDuplicateAsyncEventListener;
 import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
 import org.apache.geode.test.dunit.VM;
@@ -1728,6 +1731,99 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     });
   }
 
+  @Test
+  public void testParallelAsyncEventQueueWithPossibleDuplicateEvents() {
+    // Set disable move primaries on start up
+    vm1.invoke(() -> setDisableMovePrimary());
+    vm2.invoke(() -> setDisableMovePrimary());
+
+    try {
+      // Create locator
+      Integer lnPort = (Integer) vm0.invoke(() -> 
createFirstLocatorWithDSId(1));
+
+      // Create cache and async event queue in member 1
+      String aeqId = "ln";
+      vm1.invoke(() -> createCache(lnPort));
+      vm1.invoke(() -> createAsyncEventQueue(aeqId, true, 100, 1, false, 
false, null, true,
+          "PossibleDuplicateAsyncEventListener"));
+
+      // Create region with async event queue in member 1
+      String regionName = getTestMethodName() + "_PR";
+      vm1.invoke(
+          () -> createPRWithRedundantCopyWithAsyncEventQueue(regionName, 
aeqId, isOffHeap()));
+
+      // Do puts so that all primaries are in member 1
+      int numPuts = 30;
+      vm1.invoke(() -> doPuts(regionName, numPuts));
+
+      // Create cache and async event queue in member 2
+      vm2.invoke(() -> createCache(lnPort));
+      vm2.invoke(() -> createAsyncEventQueue(aeqId, true, 100, 1, false, 
false, null, true,
+          "PossibleDuplicateAsyncEventListener"));
+
+      // Create region with paused async event queue in member 2
+      vm2.invoke(
+          () -> createPRWithRedundantCopyWithAsyncEventQueue(regionName, 
aeqId, isOffHeap()));
+      vm2.invoke(() -> pauseAsyncEventQueue(aeqId));
+
+      // Close cache in member 1 (all AEQ buckets will fail over to member 2)
+      vm1.invoke(() -> closeCache());
+
+      // Start processing async event queue in member 2
+      vm2.invoke(() -> resumeAsyncEventQueue(aeqId));
+      vm2.invoke(() -> startProcessingAsyncEvents(aeqId));
+
+      // Wait for queue to be empty
+      vm2.invoke(() -> waitForAsyncQueueToGetEmpty(aeqId));
+
+      // Verify all events were processed in member 2
+      vm2.invoke(() -> verifyAsyncEventProcessing(aeqId, numPuts));
+    } finally {
+      // Clear disable move primaries on start up
+      vm1.invoke(() -> clearDisableMovePrimary());
+      vm2.invoke(() -> clearDisableMovePrimary());
+    }
+  }
+
+  public static void setDisableMovePrimary() {
+    System.setProperty("gemfire.DISABLE_MOVE_PRIMARIES_ON_STARTUP", "true");
+  }
+
+  public static void clearDisableMovePrimary() {
+    System.clearProperty("gemfire.DISABLE_MOVE_PRIMARIES_ON_STARTUP");
+  }
+
+  public static void startProcessingAsyncEvents(String aeqId) {
+    // Get the async event listener
+    PossibleDuplicateAsyncEventListener listener = 
getPossibleDuplicateAsyncEventListener(aeqId);
+
+    // Start processing waiting events
+    listener.startProcessingEvents();
+  }
+
+  public static void verifyAsyncEventProcessing(String aeqId, int numEvents) {
+    // Get the async event listener
+    PossibleDuplicateAsyncEventListener listener = 
getPossibleDuplicateAsyncEventListener(aeqId);
+
+    // Verify all events were processed
+    assertEquals(numEvents, listener.getTotalEvents());
+
+    // Verify all events are possibleDuplicate
+    assertEquals(numEvents, listener.getTotalPossibleDuplicateEvents());
+  }
+
+  private static PossibleDuplicateAsyncEventListener 
getPossibleDuplicateAsyncEventListener(
+      String aeqId) {
+    // Get the async event queue
+    AsyncEventQueue aeq = cache.getAsyncEventQueue(aeqId);
+    assertNotNull(aeq);
+
+    // Get and return the async event listener
+    AsyncEventListener aeqListener = aeq.getAsyncEventListener();
+    assertTrue(aeqListener instanceof PossibleDuplicateAsyncEventListener);
+    return (PossibleDuplicateAsyncEventListener) aeqListener;
+  }
+
   private void createPersistentPartitionRegion() {
     AttributesFactory fact = new AttributesFactory();
 

Reply via email to