This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-4647 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 44821a0c2ac277ce936c5e401d4e1db726d66233 Author: zhouxh <gz...@pivotal.io> AuthorDate: Wed Feb 21 21:31:07 2018 -0800 GEODE-4647: add a stats eventSecondaryQueueSizeId to track events in secondary gateway sender queue --- .../asyncqueue/internal/AsyncEventQueueStats.java | 3 + .../internal/cache/AbstractBucketRegionQueue.java | 8 + .../geode/internal/cache/AbstractRegionMap.java | 4 + .../apache/geode/internal/cache/BucketAdvisor.java | 3 + .../geode/internal/cache/BucketRegionQueue.java | 2 + .../internal/cache/PartitionedRegionDataStore.java | 6 +- .../internal/cache/wan/AbstractGatewaySender.java | 6 + .../wan/AbstractGatewaySenderEventProcessor.java | 17 +++ .../internal/cache/wan/GatewaySenderStats.java | 61 ++++++++ .../wan/parallel/ParallelGatewaySenderQueue.java | 2 +- .../wan/parallel/ParallelQueueRemovalMessage.java | 3 + .../SerialAsyncEventQueueImplJUnitTest.java | 3 + .../cache/wan/AsyncEventQueueTestBase.java | 22 ++- .../asyncqueue/AsyncEventQueueStatsDUnitTest.java | 45 ++++-- .../ParallelQueueRemovalMessageJUnitTest.java | 12 ++ .../bean/stats/AsyncEventQueueStatsJUnitTest.java | 2 - .../geode/internal/cache/wan/WANTestBase.java | 105 +++++++++++-- .../parallel/ParallelWANConflationDUnitTest.java | 56 +++++-- .../wan/parallel/ParallelWANStatsDUnitTest.java | 167 +++++++++++++++++++++ .../serial/SerialGatewaySenderQueueDUnitTest.java | 13 +- .../wan/serial/SerialWANConflationDUnitTest.java | 73 ++++++++- .../wan/serial/SerialWANPropagationDUnitTest.java | 1 + 22 files changed, 574 insertions(+), 40 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java index dee2c92..77f6eff 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java @@ -42,6 +42,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats { f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.", "nanoseconds"), f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false), + f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the secondary event queue.", + "operations", false), f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events queue.", "operations", false), f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED, @@ -106,6 +108,7 @@ public class AsyncEventQueueStats extends GatewaySenderStats { eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED); eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME); eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE); + eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE); eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE); eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED); eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java index d6822da..267ec36 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java @@ -230,6 +230,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { this.gatewaySenderStats.decQueueSize(size); } + public void decSecondaryQueueSize(int size) { + this.gatewaySenderStats.decSecondaryQueueSize(size); + } + public void decQueueSize() { this.gatewaySenderStats.decQueueSize(); } @@ -238,6 +242,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { this.gatewaySenderStats.incQueueSize(size); } + public void incSecondaryQueueSize(int size) { + this.gatewaySenderStats.incSecondaryQueueSize(size); + } + public void incQueueSize() { this.gatewaySenderStats.incQueueSize(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java index 68d7134..6c47b71 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java @@ -1042,6 +1042,10 @@ public abstract class AbstractRegionMap } finally { if (done && result) { initialImagePutEntry(newRe); + if (owner instanceof BucketRegionQueue) { + BucketRegionQueue brq = (BucketRegionQueue) owner; + brq.addToEventQueue(key, done, event); + } } if (!done) { removeEntry(key, newRe, false); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java index fc14773..0012a08 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java @@ -282,6 +282,8 @@ public class BucketAdvisor extends CacheDistributionAdvisor { if (this.getBucket() instanceof BucketRegionQueue) { BucketRegionQueue brq = (BucketRegionQueue) this.getBucket(); brq.decQueueSize(brq.size()); + // TODO should check if secondary stats exists + brq.incSecondaryQueueSize(brq.size()); } sendProfileUpdate(); } @@ -1192,6 +1194,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor { if (br instanceof BucketRegionQueue) { // Shouldn't it be AbstractBucketRegionQueue BucketRegionQueue brq = (BucketRegionQueue) br; brq.incQueueSize(brq.size()); + brq.decSecondaryQueueSize(brq.size()); } if (br != null && br instanceof BucketRegion) { ((BucketRegion) br).afterAcquiringPrimaryState(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index 4e5451e..0baa204 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -449,6 +449,8 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { } if (this.getBucketAdvisor().isPrimary()) { incQueueSize(1); + } else { + incSecondaryQueueSize(1); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java index ef8eb99..d468ef4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java @@ -2450,10 +2450,10 @@ public class PartitionedRegionDataStore implements HasCachePerfStats { return sizeOfLocalPrimaries; } - public int getSizeOfLocalBuckets(boolean includeSecondary) { + public int getSizeOfLocalBuckets() { int sizeOfLocal = 0; - Set<BucketRegion> primaryBuckets = getAllLocalBucketRegions(); - for (BucketRegion br : primaryBuckets) { + Set<BucketRegion> allLocalBuckets = getAllLocalBucketRegions(); + for (BucketRegion br : allLocalBuckets) { sizeOfLocal += br.size(); } return sizeOfLocal; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index a134e1e..268bbb2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -1095,6 +1095,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi } statistics.setQueueSize(0); + statistics.setSecondaryQueueSize(0); statistics.setTempQueueSize(0); } @@ -1251,6 +1252,11 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return localProcessor == null ? 0 : localProcessor.eventQueueSize(); } + public int getEventSecondaryQueueSize() { + AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor; + return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize(); + } + public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) { this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 9309e43..1a12abf 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -270,6 +270,23 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { return this.queue.size(); } + public int eventSecondaryQueueSize() { + if (queue == null) { + return 0; + } + + // if parallel, get both primary and secondary queues' size, then substract primary queue's size + if (this.queue instanceof ParallelGatewaySenderQueue) { + return ((ParallelGatewaySenderQueue) queue).localSize(true) + - ((ParallelGatewaySenderQueue) queue).localSize(false); + } + if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) { + return ((ConcurrentParallelGatewaySenderQueue) queue).localSize(true) + - ((ConcurrentParallelGatewaySenderQueue) queue).localSize(false); + } + return this.queue.size(); + } + /** * @return the sender */ diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java index c7fd370..c2866d1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java @@ -46,6 +46,8 @@ public class GatewaySenderStats { protected static final String EVENT_QUEUE_TIME = "eventQueueTime"; /** Name of the event queue size statistic */ protected static final String EVENT_QUEUE_SIZE = "eventQueueSize"; + /** Name of the event secondary queue size statistic */ + protected static final String EVENT_SECONDARY_QUEUE_SIZE = "eventSecondaryQueueSize"; /** Name of the event temporary queue size statistic */ protected static final String TMP_EVENT_QUEUE_SIZE = "tempQueueSize"; /** Name of the events distributed statistic */ @@ -102,6 +104,8 @@ public class GatewaySenderStats { protected static int eventQueueTimeId; /** Id of the event queue size statistic */ protected static int eventQueueSizeId; + /** Id of the event in secondary queue size statistic */ + protected static int eventSecondaryQueueSizeId; /** Id of the temp event queue size statistic */ protected static int eventTmpQueueSizeId; /** Id of the events distributed statistic */ @@ -164,6 +168,8 @@ public class GatewaySenderStats { f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.", "nanoseconds"), f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false), + f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of secondary event queue.", + "operations", false), f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events.", "operations", false), f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED, @@ -232,6 +238,7 @@ public class GatewaySenderStats { eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED); eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME); eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE); + eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE); eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE); eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED); eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD); @@ -350,6 +357,15 @@ public class GatewaySenderStats { } /** + * Returns the current value of the "eventSecondaryQueueSize" stat. + * + * @return the current value of the "eventSecondaryQueueSize" stat + */ + public int getEventSecondaryQueueSize() { + return this.stats.getInt(eventSecondaryQueueSizeId); + } + + /** * Returns the current value of the "tempQueueSize" stat. * * @return the current value of the "tempQueueSize" stat. @@ -454,6 +470,15 @@ public class GatewaySenderStats { } /** + * Sets the "eventSecondaryQueueSize" stat. + * + * @param size The size of the secondary queue + */ + public void setSecondaryQueueSize(int size) { + this.stats.setInt(eventSecondaryQueueSizeId, size); + } + + /** * Sets the "tempQueueSize" stat. * * @param size The size of the temp queue @@ -471,6 +496,14 @@ public class GatewaySenderStats { } /** + * Increments the "eventSecondaryQueueSize" stat by 1. + */ + public void incSecondaryQueueSize() { + this.stats.incInt(eventSecondaryQueueSizeId, 1); + assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0; + } + + /** * Increments the "tempQueueSize" stat by 1. */ public void incTempQueueSize() { @@ -487,6 +520,16 @@ public class GatewaySenderStats { } /** + * Increments the "eventSecondaryQueueSize" stat by given delta. + * + * @param delta an integer by which secondary queue size to be increased + */ + public void incSecondaryQueueSize(int delta) { + this.stats.incInt(eventSecondaryQueueSizeId, delta); + assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0; + } + + /** * Increments the "tempQueueSize" stat by given delta. * * @param delta an integer by which temp queue size to be increased @@ -503,6 +546,14 @@ public class GatewaySenderStats { } /** + * Decrements the "eventSecondaryQueueSize" stat by 1. + */ + public void decSecondaryQueueSize() { + this.stats.incInt(eventSecondaryQueueSizeId, -1); + assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0; + } + + /** * Decrements the "tempQueueSize" stat by 1. */ public void decTempQueueSize() { @@ -519,6 +570,16 @@ public class GatewaySenderStats { } /** + * Decrements the "eventSecondaryQueueSize" stat by given delta. + * + * @param delta an integer by which secondary queue size to be increased + */ + public void decSecondaryQueueSize(int delta) { + this.stats.incInt(eventSecondaryQueueSizeId, -delta); + assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0; + } + + /** * Decrements the "tempQueueSize" stat by given delta. * * @param delta an integer by which temp queue size to be increased diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 03f3464..06bc583 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1376,7 +1376,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) { if (prQ != null && prQ.getDataStore() != null) { if (includeSecondary) { - size += prQ.getDataStore().getSizeOfLocalBuckets(true); + size += prQ.getDataStore().getSizeOfLocalBuckets(); } else { size += prQ.getDataStore().getSizeOfLocalPrimaryBuckets(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java index 39fedbf..df89e36 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java @@ -183,6 +183,9 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage { final boolean isDebugEnabled = logger.isDebugEnabled(); try { brq.destroyKey(key); + if (!brq.getBucketAdvisor().isPrimary()) { + prQ.getParallelGatewaySender().getStatistics().decSecondaryQueueSize(); + } if (isDebugEnabled) { logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", key, prQ.getName(), brq.getId()); diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java index cdef8bd..aea1fb4 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java @@ -50,14 +50,17 @@ public class SerialAsyncEventQueueImplJUnitTest { attrs.id = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id"; SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache, attrs); queue.getStatistics().incQueueSize(5); + queue.getStatistics().incSecondaryQueueSize(6); queue.getStatistics().incTempQueueSize(10); assertEquals(5, queue.getStatistics().getEventQueueSize()); + assertEquals(6, queue.getStatistics().getEventSecondaryQueueSize()); assertEquals(10, queue.getStatistics().getTempEventQueueSize()); queue.stop(); assertEquals(0, queue.getStatistics().getEventQueueSize()); + assertEquals(0, queue.getStatistics().getEventSecondaryQueueSize()); assertEquals(0, queue.getStatistics().getTempEventQueueSize()); } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java index de03f2b..26ac48f 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java @@ -44,8 +44,10 @@ import java.util.Set; import java.util.StringTokenizer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.awaitility.Awaitility; import org.junit.experimental.categories.Category; import org.apache.geode.DataSerializable; @@ -737,17 +739,35 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase { } public static void checkAsyncEventQueueStats(String queueId, final int queueSize, - final int eventsReceived, final int eventsQueued, final int eventsDistributed) { + int secondaryQueueSize, final int eventsReceived, final int eventsQueued, + final int eventsDistributed) { Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues(); AsyncEventQueue queue = null; + boolean isParallel = false; for (AsyncEventQueue q : asyncQueues) { + isParallel = q.isParallel(); if (q.getId().equals(queueId)) { queue = q; break; } } final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl) queue).getStatistics(); + Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS) + .pollInterval(1000, TimeUnit.MILLISECONDS) + .until(() -> assertEquals("Expected queue size: " + queueSize + " but actual queue size: " + + statistics.getEventQueueSize(), queueSize, statistics.getEventQueueSize())); assertEquals(queueSize, statistics.getEventQueueSize()); + if (isParallel) { + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { + assertEquals( + "Expected events in the secondary queue is " + secondaryQueueSize + ", but actual is " + + statistics.getEventSecondaryQueueSize(), + secondaryQueueSize, statistics.getEventSecondaryQueueSize()); + }); + } else { + // for serial queue, evenvSecondaryQueueSize is not used + assertEquals(0, statistics.getEventSecondaryQueueSize()); + } assertEquals(eventsReceived, statistics.getEventsReceived()); assertEquals(eventsQueued, statistics.getEventsQueued()); assert (statistics.getEventsDistributed() >= eventsDistributed); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java index b131b46..17288e0 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java @@ -75,9 +75,10 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase { // sender Wait.pause(2000);// give some time for system to become stable - vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1000, 1000, 1000)); + vm1.invoke( + () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1000, 1000, 1000)); vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln", 10)); - vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1000, 0, 0)); + vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1000, 0, 0)); vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln", 0)); } @@ -120,19 +121,43 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase { vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln1,ln2", isOffHeap())); + vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1")); + vm2.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1")); + vm3.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1")); + vm4.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1")); + vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2")); + vm2.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2")); + vm3.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2")); + vm4.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2")); + vm1.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + "_RR", 1000)); + vm1.invoke( + () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 1000, 0, 1000, 1000, 0)); + vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 1000, 0, 1000, 0, 0)); + + vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1")); + vm2.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1")); + vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1")); + vm4.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1")); + vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2")); + vm2.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2")); + vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2")); + vm4.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2")); + vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln1", 1000)); vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln2", 1000)); Wait.pause(2000);// give some time for system to become stable - vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 1000, 1000, 1000)); + vm1.invoke( + () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 0, 1000, 1000, 1000)); vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln1", 10)); - vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 1000, 1000, 1000)); + vm1.invoke( + () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 0, 1000, 1000, 1000)); vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln2", 10)); - vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 1000, 0, 0)); + vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 0, 1000, 0, 0)); vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln1", 0)); - vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 1000, 0, 0)); + vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 0, 1000, 0, 0)); vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln2", 0)); } @@ -230,11 +255,12 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase { vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 1500)); Wait.pause(2000);// give some time for system to become stable - vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1500, 1500, 1500)); + vm1.invoke( + () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1500, 1500, 1500)); vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueUnprocessedStats("ln", 0)); - vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1500, 0, 0)); + vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1500, 0, 0)); vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueUnprocessedStats("ln", 1500)); } @@ -302,7 +328,8 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase { vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 1000)); Wait.pause(2000);// give some time for system to become stable - vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 2000, 2000, 1000)); + vm1.invoke( + () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 2000, 2000, 1000)); vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueConflatedStats("ln", 500)); } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java index 5e0f704..d1ea59f 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java @@ -65,6 +65,8 @@ import org.apache.geode.internal.cache.eviction.EvictionController; import org.apache.geode.internal.cache.partitioned.RegionAdvisor; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; +import org.apache.geode.internal.cache.wan.GatewaySenderStats; +import org.apache.geode.internal.statistics.DummyStatisticsFactory; import org.apache.geode.test.fake.Fakes; import org.apache.geode.test.junit.categories.UnitTest; @@ -81,6 +83,7 @@ public class ParallelQueueRemovalMessageJUnitTest { private PartitionedRegion rootRegion; private BucketRegionQueue bucketRegionQueue; private BucketRegionQueueHelper bucketRegionQueueHelper; + private GatewaySenderStats stats; @Before public void setUpGemFire() { @@ -116,6 +119,8 @@ public class ParallelQueueRemovalMessageJUnitTest { when(this.queueRegion.getParallelGatewaySender()).thenReturn(this.sender); when(this.sender.getQueues()).thenReturn(null); when(this.sender.getDispatcherThreads()).thenReturn(1); + stats = new GatewaySenderStats(new DummyStatisticsFactory(), "ln"); + when(this.sender.getStatistics()).thenReturn(stats); } private void createRootRegion() { @@ -183,6 +188,7 @@ public class ParallelQueueRemovalMessageJUnitTest { // Validate initial BucketRegionQueue state assertFalse(this.bucketRegionQueue.isInitialized()); assertEquals(0, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size()); + stats.setSecondaryQueueSize(1); // Create and process a ParallelQueueRemovalMessage (causes the failedBatchRemovalMessageKeys to // add a key) @@ -190,6 +196,8 @@ public class ParallelQueueRemovalMessageJUnitTest { // Validate BucketRegionQueue after processing ParallelQueueRemovalMessage assertEquals(1, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size()); + // failed BatchRemovalMessage will not modify stats + assertEquals(1, stats.getEventSecondaryQueueSize()); } @Test @@ -201,6 +209,7 @@ public class ParallelQueueRemovalMessageJUnitTest { // Add an event to the BucketRegionQueue and verify BucketRegionQueue state this.bucketRegionQueueHelper.addEvent(KEY); assertEquals(1, this.bucketRegionQueue.size()); + assertEquals(1, stats.getEventSecondaryQueueSize()); // Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to // DESTROYED) @@ -210,6 +219,7 @@ public class ParallelQueueRemovalMessageJUnitTest { // Clean up destroyed tokens and validate BucketRegionQueue this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete(); assertEquals(0, this.bucketRegionQueue.size()); + assertEquals(0, stats.getEventSecondaryQueueSize()); } @Test @@ -247,6 +257,7 @@ public class ParallelQueueRemovalMessageJUnitTest { // Add an event to the BucketRegionQueue and verify BucketRegionQueue state GatewaySenderEventImpl event = this.bucketRegionQueueHelper.addEvent(KEY); assertEquals(1, this.bucketRegionQueue.size()); + assertEquals(1, stats.getEventSecondaryQueueSize()); // Add a mock GatewaySenderEventImpl to the temp queue BlockingQueue<GatewaySenderEventImpl> tempQueue = createTempQueueAndAddEvent(processor, event); @@ -259,6 +270,7 @@ public class ParallelQueueRemovalMessageJUnitTest { // Validate temp queue is empty after processing ParallelQueueRemovalMessage assertEquals(0, tempQueue.size()); + assertEquals(0, stats.getEventSecondaryQueueSize()); // Clean up destroyed tokens this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete(); diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java index ea246bc..e7d38b4 100644 --- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java @@ -52,8 +52,6 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase { sample(); assertEquals(0, getEventQueueSize()); - - } private int getEventQueueSize() { diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java index c29d66e..3333584 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -120,9 +120,11 @@ import org.apache.geode.distributed.Locator; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.InternalLocator; import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.AvailablePort; import org.apache.geode.internal.AvailablePortHelper; import org.apache.geode.internal.admin.remote.DistributionLocatorId; +import org.apache.geode.internal.cache.AbstractBucketRegionQueue; import org.apache.geode.internal.cache.BucketRegion; import org.apache.geode.internal.cache.CacheConfig; import org.apache.geode.internal.cache.CacheServerImpl; @@ -139,6 +141,8 @@ import org.apache.geode.internal.cache.execute.data.Order; import org.apache.geode.internal.cache.execute.data.OrderId; import org.apache.geode.internal.cache.execute.data.Shipment; import org.apache.geode.internal.cache.execute.data.ShipmentId; +import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage; +import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse; import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException; import org.apache.geode.internal.cache.tier.sockets.CacheServerStats; import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil; @@ -1175,12 +1179,57 @@ public class WANTestBase extends JUnit4DistributedTestCase { return connectionInfo; } + public static void moveAllPrimaryBuckets(String senderId, final DistributedMember destination, + final String regionName) { + + AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId); + final RegionQueue regionQueue; + regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0]; + if (sender.isParallel()) { + ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue = + (ConcurrentParallelGatewaySenderQueue) regionQueue; + PartitionedRegion prQ = + parallelGatewaySenderQueue.getRegions().toArray(new PartitionedRegion[1])[0]; + + Set<Integer> primaryBucketIds = prQ.getDataStore().getAllLocalPrimaryBucketIds(); + for (int bid : primaryBucketIds) { + movePrimary(destination, regionName, bid); + } + + // double check after moved all primary buckets + primaryBucketIds = prQ.getDataStore().getAllLocalPrimaryBucketIds(); + assertTrue(primaryBucketIds.isEmpty()); + } + } + + public static void movePrimary(final DistributedMember destination, final String regionName, + final int bucketId) { + PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName); + + BecomePrimaryBucketResponse response = BecomePrimaryBucketMessage + .send((InternalDistributedMember) destination, region, bucketId, true); + assertNotNull(response); + assertTrue(response.waitForResponse()); + } + + public static int getSecondaryQueueSizeInStats(String senderId) { + AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId); + GatewaySenderStats statistics = sender.getStatistics(); + return statistics.getEventSecondaryQueueSize(); + } + public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize) { AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId); GatewaySenderStats statistics = sender.getStatistics(); if (expectedQueueSize != -1) { final RegionQueue regionQueue; regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0]; + if (sender.isParallel()) { + ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue = + (ConcurrentParallelGatewaySenderQueue) regionQueue; + PartitionedRegion pr = + parallelGatewaySenderQueue.getRegions().toArray(new PartitionedRegion[1])[0]; + } Awaitility.await().atMost(120, TimeUnit.SECONDS) .until(() -> assertEquals("Expected queue entries: " + expectedQueueSize + " but actual entries: " + regionQueue.size(), expectedQueueSize, @@ -1197,9 +1246,28 @@ public class WANTestBase extends JUnit4DistributedTestCase { stats.add(statistics.getEventsNotQueuedConflated()); stats.add(statistics.getEventsConflatedFromBatches()); stats.add(statistics.getConflationIndexesMapSize()); + stats.add(statistics.getEventSecondaryQueueSize()); return stats; } + protected static int getTotalBucketQueueSize(PartitionedRegion prQ, boolean isPrimary) { + int size = 0; + if (prQ != null) { + Set<Map.Entry<Integer, BucketRegion>> allBuckets = prQ.getDataStore().getAllLocalBuckets(); + List<Integer> thisProcessorBuckets = new ArrayList<Integer>(); + + for (Map.Entry<Integer, BucketRegion> bucketEntry : allBuckets) { + BucketRegion bucket = bucketEntry.getValue(); + int bId = bucket.getId(); + if ((isPrimary && bucket.getBucketAdvisor().isPrimary()) + || (!isPrimary && !bucket.getBucketAdvisor().isPrimary())) { + size += bucket.size(); + } + } + } + return size; + } + public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived, final int eventsQueued, final int eventsDistributed) { GatewaySenderStats statistics = getGatewaySenderStats(senderId); @@ -3102,6 +3170,19 @@ public class WANTestBase extends JUnit4DistributedTestCase { return getQueueContentSize(senderId, false); } + public static Integer getSecondaryQueueContentSize(final String senderId) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender; + return abstractSender.getEventSecondaryQueueSize(); + } + public static Integer getQueueContentSize(final String senderId, boolean includeSecondary) { Set<GatewaySender> senders = cache.getGatewaySenders(); GatewaySender sender = null; @@ -3113,9 +3194,7 @@ public class WANTestBase extends JUnit4DistributedTestCase { } if (!sender.isParallel()) { - if (includeSecondary) { - fail("Not implemented yet"); - } + // if sender is serial, the queues will be all primary or all secondary at one member final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues(); int size = 0; for (RegionQueue q : queues) { @@ -3130,11 +3209,7 @@ public class WANTestBase extends JUnit4DistributedTestCase { } else if (regionQueue instanceof ParallelGatewaySenderQueue) { return ((ParallelGatewaySenderQueue) regionQueue).localSize(includeSecondary); } else { - if (includeSecondary) { - fail("Not Implemented yet"); - } - regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0]; - return regionQueue.getRegion().size(); + fail("Not implemented yet"); } } fail("Not yet implemented?"); @@ -3180,14 +3255,26 @@ public class WANTestBase extends JUnit4DistributedTestCase { ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0]; Set<BucketRegion> buckets = ((PartitionedRegion) regionQueue.getRegion()).getDataStore() .getAllLocalPrimaryBucketRegions(); + final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender; + for (final BucketRegion bucket : buckets) { - Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> { + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { assertEquals("Expected bucket entries for bucket: " + bucket.getId() + " is: 0 but actual entries: " + bucket.keySet().size() + " This bucket isPrimary: " + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet(), 0, bucket.keySet().size()); }); } // for loop ends + assertEquals("Except events in all primary queues after drain is 0", 0, + abstractSender.getEventQueueSize()); + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { + assertEquals( + "Expected events in all secondary queues are drained but actual is " + + abstractSender.getEventSecondaryQueueSize(), + 0, abstractSender.getEventSecondaryQueueSize()); + }); + assertEquals("Except events in all secondary queues after drain is 0", 0, + abstractSender.getEventSecondaryQueueSize()); } finally { exp.remove(); exp1.remove(); diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java index 1613501..c9b968f 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java @@ -67,6 +67,11 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { vm4.invoke(() -> checkQueueSize("ln", (keyValues.size() + updateKeyValues.size()))); + vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), updateKeyValues)); + + // Since no conflation, all updates are in queue + vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + 2 * updateKeyValues.size())); + vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0)); resumeSenders(); @@ -92,7 +97,7 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { vm6.invoke(() -> createSender("ln", 2, true, 100, 50, false, false, null, true)); vm7.invoke(() -> createSender("ln", 2, true, 100, 50, false, false, null, true)); - createSenderPRs(); + createSenderPRs(1); startSenderInVMs("ln", vm4, vm5, vm6, vm7); @@ -109,24 +114,35 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), keyValues)); } + // sender did not turn on conflation, so queue size will be 100 (otherwise it will be 20) + vm4.invoke(() -> checkQueueSize("ln", 100)); vm4.invoke(() -> enableConflation("ln")); vm5.invoke(() -> enableConflation("ln")); vm6.invoke(() -> enableConflation("ln")); vm7.invoke(() -> enableConflation("ln")); - resumeSenders(); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 100)); ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 100)); ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 100)); ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 100)); + assertTrue("Event in secondary queue should be 100", + (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 100); + + resumeSenders(); + + v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); assertTrue("No events conflated in batch", (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0); + assertEquals("Event in secondary queue should be 0 after dispatched", 0, + (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10))); vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10)); @@ -161,12 +177,14 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + updateKeyValues.size())); // creates // aren't // conflated + validateEventSecondaryQueueSize(keyValues.size() + updateKeyValues.size(), redundancy); vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), updateKeyValues)); - vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + updateKeyValues.size())); // creates - // aren't - // conflated + int expectedEventNumAfterConflation = keyValues.size() + updateKeyValues.size(); + vm4.invoke(() -> checkQueueSize("ln", expectedEventNumAfterConflation)); + + validateEventSecondaryQueueSize(expectedEventNumAfterConflation, redundancy); vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0)); @@ -174,6 +192,24 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { keyValues.putAll(updateKeyValues); validateReceiverRegionSize(keyValues); + + // after dispatch, both primary and secondary queues are empty + vm4.invoke(() -> checkQueueSize("ln", 0)); + validateEventSecondaryQueueSize(0, redundancy); + } + + private void validateEventSecondaryQueueSize(int expectedNum, int redundancy) { + ArrayList<Integer> v4List = + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + ArrayList<Integer> v5List = + (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + ArrayList<Integer> v6List = + (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + ArrayList<Integer> v7List = + (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + assertTrue("Event in secondary queue should be 100", + (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == expectedNum + * redundancy); } @Test diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java index a54a67d..07c0d86 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java @@ -28,6 +28,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.cache.Region; +import org.apache.geode.distributed.DistributedMember; import org.apache.geode.internal.cache.wan.WANTestBase; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.VM; @@ -53,6 +54,172 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { } @Test + public void testQueueSizeInSecondaryBucketRegionQueuesWithMemberRestart() throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + + createSendersWithConflation(lnPort); + + createSenderPRs(1); + + startPausedSenders(); + + createReceiverPR(vm2, 1); + putKeyValues(); + + ArrayList<Integer> v4List = + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + ArrayList<Integer> v5List = + (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + ArrayList<Integer> v6List = + (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + ArrayList<Integer> v7List = + (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + + assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue + // size + assertEquals(NUM_PUTS * 2, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived + assertEquals(NUM_PUTS * 2, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events + // queued + assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events + // distributed + assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary + // queue + // size + + // stop vm7 to trigger rebalance and move some primary buckets + System.out.println("Current secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10) + + ":" + v6List.get(10) + ":" + v7List.get(10)); + vm7.invoke(() -> WANTestBase.closeCache()); + Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> { + int v4secondarySize = vm4.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln")); + int v5secondarySize = vm5.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln")); + int v6secondarySize = vm6.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln")); + assertEquals(NUM_PUTS, v4secondarySize + v5secondarySize + v6secondarySize); // secondary + // queue + // size + }); + System.out.println("New secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10) + ":" + + v6List.get(10)); + + vm7.invoke(() -> WANTestBase.createCache(lnPort)); + vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true)); + vm7.invoke(() -> WANTestBase.createPartitionedRegion(testName, "ln", 1, 10, isOffHeap())); + startSenderInVMs("ln", vm7); + vm7.invoke(() -> pauseSender("ln")); + + v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary + // queue + // size + System.out.println("After restart vm7, secondary queue sizes:" + v4List.get(10) + ":" + + v5List.get(10) + ":" + v6List.get(10) + ":" + v7List.get(10)); + + vm4.invoke(() -> WANTestBase.resumeSender("ln")); + vm5.invoke(() -> WANTestBase.resumeSender("ln")); + vm6.invoke(() -> WANTestBase.resumeSender("ln")); + vm7.invoke(() -> WANTestBase.resumeSender("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS)); + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, NUM_PUTS)); + + vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0)); + + v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events + // distributed + assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary + // queue + // size + } + + // TODO: add a test without redudency for primary switch + @Test + public void testQueueSizeInSecondaryWithPrimarySwitch() throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + + createSendersWithConflation(lnPort); + + createSenderPRs(1); + + startPausedSenders(); + + createReceiverPR(vm2, 1); + + putKeyValues(); + + ArrayList<Integer> v4List = + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + ArrayList<Integer> v5List = + (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + ArrayList<Integer> v6List = + (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + ArrayList<Integer> v7List = + (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + + assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue + // size + assertEquals(NUM_PUTS * 2, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived + assertEquals(NUM_PUTS * 2, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events + // queued + assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events + // distributed + assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary + // queue + // size + // int vm7secondarySizeBeforeMovePrimary = v7List.get(10); + // System.out.println("Current secondary queue + // sizes:"+v4List.get(10)+":"+v5List.get(10)+":"+v6List.get(10)+":"+v7List.get(10)); + // System.out.println("Now move a primary bucket"); + // final DistributedMember vm6member = vm6.invoke(() -> WANTestBase.getMember()); + // vm7.invoke(() -> WANTestBase.moveAllPrimaryBuckets("ln", vm6member, testName)); + // + // v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + // v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + // v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + // v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + // assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // + // secondary + // // queue + // // size + // assertTrue(v7List.get(10) > vm7secondarySizeBeforeMovePrimary); + + vm4.invoke(() -> WANTestBase.resumeSender("ln")); + vm5.invoke(() -> WANTestBase.resumeSender("ln")); + vm6.invoke(() -> WANTestBase.resumeSender("ln")); + vm7.invoke(() -> WANTestBase.resumeSender("ln")); + vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS)); + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, NUM_PUTS)); + + vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0)); + + v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events + // distributed + assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary + // queue + // size + } + + @Test public void testPartitionedRegionParallelPropagation_BeforeDispatch() throws Exception { Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java index ab673e6..dfd6a1d 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java @@ -18,6 +18,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.*; import static org.junit.Assert.*; import java.io.File; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Properties; @@ -46,6 +47,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.cache.RegionQueue; import org.apache.geode.internal.cache.ha.ThreadIdentifier; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.cache.wan.GatewaySenderStats; import org.apache.geode.internal.cache.wan.WANTestBase; import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.VM; @@ -103,7 +105,13 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.pauseSender("ln")); vm6.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); - Wait.pause(5000); + ArrayList<Integer> v4List = + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 1000)); + ArrayList<Integer> v5List = + (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 1000)); + // secondary queue size stats in serial queue should be 0 + assertEquals(0, v4List.get(10) + v5List.get(10)); + HashMap primarySenderUpdates = (HashMap) vm4.invoke(() -> WANTestBase.checkQueue()); HashMap secondarySenderUpdates = (HashMap) vm5.invoke(() -> WANTestBase.checkQueue()); assertEquals(primarySenderUpdates, secondarySenderUpdates); @@ -138,6 +146,9 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { // removing all the keys. secondarySenderUpdates = (HashMap) vm5.invoke(() -> WANTestBase.checkQueue()); assertEquals(secondarySenderUpdates.get("Destroy"), receiverUpdates.get("Create")); + + vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); } protected void checkPrimarySenderUpdatesOnVM5(HashMap primarySenderUpdates) { diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java index 7297179..091befd 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java @@ -44,10 +44,10 @@ public class SerialWANConflationDUnitTest extends WANTestBase { createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap())); - vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap())); - vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap())); - vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap())); + vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); + vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); + vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); + vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); vm4.invoke(() -> createSender("ln", 2, false, 100, 50, false, false, null, true)); vm5.invoke(() -> createSender("ln", 2, false, 100, 50, false, false, null, true)); @@ -92,6 +92,71 @@ public class SerialWANConflationDUnitTest extends WANTestBase { assertTrue("No events conflated in batch", (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0); + } + + @Test + public void testSerialPropagationPartitionRegionConflationDuringEnqueue() throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer) vm1.invoke(() -> createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2, vm3); + + vm2.invoke(() -> createPartitionedRegion(getTestMethodName(), null, 1, 8, isOffHeap())); + vm3.invoke(() -> createPartitionedRegion(getTestMethodName(), null, 1, 8, isOffHeap())); + + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); + vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); + vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); + vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); + + vm4.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true)); + vm5.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true)); + vm6.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true)); + vm7.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true)); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> pauseSender("ln")); + vm5.invoke(() -> pauseSender("ln")); + vm6.invoke(() -> pauseSender("ln")); + vm7.invoke(() -> pauseSender("ln")); + + + final Map keyValues = new HashMap(); + + for (int i = 1; i <= 10; i++) { + for (int j = 1; j <= 10; j++) { + keyValues.put(j, i); + } + vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), keyValues)); + } + + ArrayList<Integer> v4List = + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 20)); + assertTrue("After conflation during enqueue, there should be only 20 events", + v4List.get(0) == 20); + + vm4.invoke(() -> resumeSender("ln")); + vm5.invoke(() -> resumeSender("ln")); + vm6.invoke(() -> resumeSender("ln")); + vm7.invoke(() -> resumeSender("ln")); + + v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + ArrayList<Integer> v5List = + (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + ArrayList<Integer> v6List = + (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + ArrayList<Integer> v7List = + (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + assertTrue("No events in secondary queue stats since it's serial sender", + (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 0); + assertTrue("Total queued events should be 100", + (v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)) == 100); vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10)); diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java index e84fd89..87c90e0 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java @@ -163,6 +163,7 @@ public class SerialWANPropagationDUnitTest extends WANTestBase { IgnoredException.addIgnoredException(BatchException70.class.getName()); IgnoredException.addIgnoredException(ServerOperationException.class.getName()); IgnoredException.addIgnoredException(IOException.class.getName()); + IgnoredException.addIgnoredException(java.net.SocketException.class.getName()); vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 10000)); -- To stop receiving notification emails like this one, please contact zho...@apache.org.