Repository: geode Updated Branches: refs/heads/develop 0b4a1a239 -> 5ec406984
GEODE-3030: Set possibleDuplicate=true for all bucket events after failover Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/5ec40698 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/5ec40698 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/5ec40698 Branch: refs/heads/develop Commit: 5ec4069848a0ec48466240c17003b8f25fe2fcdd Parents: 0b4a1a2 Author: Barry Oglesby <bogle...@pivotal.io> Authored: Thu Jul 27 17:02:45 2017 -0700 Committer: Barry Oglesby <bogle...@pivotal.io> Committed: Fri Jul 28 12:19:43 2017 -0700 ---------------------------------------------------------------------- .../cache/AbstractBucketRegionQueue.java | 10 +- .../geode/internal/cache/BucketRegionQueue.java | 3 +- .../PossibleDuplicateAsyncEventListener.java | 78 ++++++++++++++++ .../asyncqueue/AsyncEventListenerDUnitTest.java | 96 ++++++++++++++++++++ 4 files changed, 180 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/5ec40698/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java index eacb8fd..3674474 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java @@ -298,13 +298,13 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { } /** - * Marks batchSize number of events in the iterator as duplicate + * Marks all events in the iterator as duplicate */ - protected void markEventsAsDuplicate(int batchSize, Iterator itr) { + protected void markEventsAsDuplicate(Iterator itr) { int i = 0; - // mark number of event equal to the batchSize for setPossibleDuplicate to - // true before this bucket becomes primary on the node - while (i < batchSize && itr.hasNext()) { + // mark setPossibleDuplicate to true for all events in this bucket before it becomes primary on + // the node + while (itr.hasNext()) { Object key = itr.next(); Object senderEvent = getNoLRU(key, true, false, false); http://git-wip-us.apache.org/repos/asf/geode/blob/5ec40698/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index 5ce5963..567f46f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -203,9 +203,8 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { @Override public void beforeAcquiringPrimaryState() { - int batchSize = this.getPartitionedRegion().getParallelGatewaySender().getBatchSize(); Iterator<Object> itr = eventSeqNumDeque.iterator(); - markEventsAsDuplicate(batchSize, itr); + markEventsAsDuplicate(itr); } @Override http://git-wip-us.apache.org/repos/asf/geode/blob/5ec40698/geode-core/src/test/java/org/apache/geode/internal/cache/wan/PossibleDuplicateAsyncEventListener.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/PossibleDuplicateAsyncEventListener.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/PossibleDuplicateAsyncEventListener.java new file mode 100644 index 0000000..45f3dc5 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/PossibleDuplicateAsyncEventListener.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan; + +import org.apache.geode.cache.asyncqueue.AsyncEvent; +import org.apache.geode.cache.asyncqueue.AsyncEventListener; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class PossibleDuplicateAsyncEventListener implements AsyncEventListener { + + private final CountDownLatch latch = new CountDownLatch(1); + + private final AtomicInteger numberOfEvents = new AtomicInteger(); + + private final AtomicInteger numberOfPossibleDuplicateEvents = new AtomicInteger(); + + public boolean processEvents(List<AsyncEvent> events) { + try { + waitToStartProcessingEvents(); + } catch (InterruptedException e) { + throw new RuntimeException( + "PossibleDuplicateAsyncEventListener processEvents was interrupted"); + } + for (AsyncEvent event : events) { + process(event); + } + return true; + } + + private void process(AsyncEvent event) { + if (event.getPossibleDuplicate()) { + incrementTotalPossibleDuplicateEvents(); + } + incrementTotalEvents(); + } + + private void waitToStartProcessingEvents() throws InterruptedException { + this.latch.await(60, TimeUnit.SECONDS); + } + + public void startProcessingEvents() { + this.latch.countDown(); + } + + private int incrementTotalEvents() { + return this.numberOfEvents.incrementAndGet(); + } + + public int getTotalEvents() { + return this.numberOfEvents.get(); + } + + private void incrementTotalPossibleDuplicateEvents() { + this.numberOfPossibleDuplicateEvents.incrementAndGet(); + } + + public int getTotalPossibleDuplicateEvents() { + return this.numberOfPossibleDuplicateEvents.get(); + } + + public void close() {} +} http://git-wip-us.apache.org/repos/asf/geode/blob/5ec40698/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java index 1a57bde..6964edf 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java @@ -27,6 +27,8 @@ import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.Region; import org.apache.geode.cache.asyncqueue.AsyncEvent; +import org.apache.geode.cache.asyncqueue.AsyncEventListener; +import org.apache.geode.cache.asyncqueue.AsyncEventQueue; import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; @@ -44,6 +46,7 @@ import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage; import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse; import org.apache.geode.internal.cache.wan.AsyncEventQueueTestBase; import org.apache.geode.internal.cache.wan.MyAsyncEventListener; +import org.apache.geode.internal.cache.wan.PossibleDuplicateAsyncEventListener; import org.apache.geode.test.dunit.LogWriterUtils; import org.apache.geode.test.dunit.SerializableRunnableIF; import org.apache.geode.test.dunit.VM; @@ -1728,6 +1731,99 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { }); } + @Test + public void testParallelAsyncEventQueueWithPossibleDuplicateEvents() { + // Set disable move primaries on start up + vm1.invoke(() -> setDisableMovePrimary()); + vm2.invoke(() -> setDisableMovePrimary()); + + try { + // Create locator + Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1)); + + // Create cache and async event queue in member 1 + String aeqId = "ln"; + vm1.invoke(() -> createCache(lnPort)); + vm1.invoke(() -> createAsyncEventQueue(aeqId, true, 100, 1, false, false, null, true, + "PossibleDuplicateAsyncEventListener")); + + // Create region with async event queue in member 1 + String regionName = getTestMethodName() + "_PR"; + vm1.invoke( + () -> createPRWithRedundantCopyWithAsyncEventQueue(regionName, aeqId, isOffHeap())); + + // Do puts so that all primaries are in member 1 + int numPuts = 30; + vm1.invoke(() -> doPuts(regionName, numPuts)); + + // Create cache and async event queue in member 2 + vm2.invoke(() -> createCache(lnPort)); + vm2.invoke(() -> createAsyncEventQueue(aeqId, true, 100, 1, false, false, null, true, + "PossibleDuplicateAsyncEventListener")); + + // Create region with paused async event queue in member 2 + vm2.invoke( + () -> createPRWithRedundantCopyWithAsyncEventQueue(regionName, aeqId, isOffHeap())); + vm2.invoke(() -> pauseAsyncEventQueue(aeqId)); + + // Close cache in member 1 (all AEQ buckets will fail over to member 2) + vm1.invoke(() -> closeCache()); + + // Start processing async event queue in member 2 + vm2.invoke(() -> resumeAsyncEventQueue(aeqId)); + vm2.invoke(() -> startProcessingAsyncEvents(aeqId)); + + // Wait for queue to be empty + vm2.invoke(() -> waitForAsyncQueueToGetEmpty(aeqId)); + + // Verify all events were processed in member 2 + vm2.invoke(() -> verifyAsyncEventProcessing(aeqId, numPuts)); + } finally { + // Clear disable move primaries on start up + vm1.invoke(() -> clearDisableMovePrimary()); + vm2.invoke(() -> clearDisableMovePrimary()); + } + } + + public static void setDisableMovePrimary() { + System.setProperty("gemfire.DISABLE_MOVE_PRIMARIES_ON_STARTUP", "true"); + } + + public static void clearDisableMovePrimary() { + System.clearProperty("gemfire.DISABLE_MOVE_PRIMARIES_ON_STARTUP"); + } + + public static void startProcessingAsyncEvents(String aeqId) { + // Get the async event listener + PossibleDuplicateAsyncEventListener listener = getPossibleDuplicateAsyncEventListener(aeqId); + + // Start processing waiting events + listener.startProcessingEvents(); + } + + public static void verifyAsyncEventProcessing(String aeqId, int numEvents) { + // Get the async event listener + PossibleDuplicateAsyncEventListener listener = getPossibleDuplicateAsyncEventListener(aeqId); + + // Verify all events were processed + assertEquals(numEvents, listener.getTotalEvents()); + + // Verify all events are possibleDuplicate + assertEquals(numEvents, listener.getTotalPossibleDuplicateEvents()); + } + + private static PossibleDuplicateAsyncEventListener getPossibleDuplicateAsyncEventListener( + String aeqId) { + // Get the async event queue + AsyncEventQueue aeq = cache.getAsyncEventQueue(aeqId); + assertNotNull(aeq); + + // Get and return the async event listener + AsyncEventListener aeqListener = aeq.getAsyncEventListener(); + assertTrue(aeqListener instanceof PossibleDuplicateAsyncEventListener); + return (PossibleDuplicateAsyncEventListener) aeqListener; + } + private void createPersistentPartitionRegion() { AttributesFactory fact = new AttributesFactory();