This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-4624 in repository https://gitbox.apache.org/repos/asf/geode.git
commit c08ebf1edd217734633656262cea0b2503ab74de Author: zhouxh <gz...@pivotal.io> AuthorDate: Wed Mar 21 23:20:27 2018 -0700 GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the processing of queueRemovals --- .../internal/cache/wan/AbstractGatewaySender.java | 15 +++++- .../wan/AbstractGatewaySenderEventProcessor.java | 61 ++++++++++++++++------ .../ConcurrentParallelGatewaySenderQueue.java | 9 ++++ .../wan/parallel/ParallelGatewaySenderQueue.java | 18 ++++++- .../geode/internal/cache/wan/WANTestBase.java | 26 ++++++++- .../ParallelGatewaySenderOperationsDUnitTest.java | 24 ++++----- 6 files changed, 122 insertions(+), 31 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index a134e1e..76c1e24 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -849,7 +849,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi // If this gateway is not running, return if (!isRunning()) { if (isDebugEnabled) { - logger.debug("Returning back without putting into the gateway sender queue"); + logger.debug("Returning back without putting into the gateway sender queue" + event); + } + if (this.eventProcessor != null) { + this.eventProcessor.registerEventDroppedInPrimaryQueue(event); } return; } @@ -962,7 +965,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi // The sender may have stopped, after we have checked the status in the beginning. if (!isRunning()) { if (isDebugEnabled) { - logger.debug("Returning back without putting into the gateway sender queue"); + logger.debug("Returning back without putting into the gateway sender queue" + event); + } + if (this.eventProcessor != null) { + this.eventProcessor.registerEventDroppedInPrimaryQueue(event); } return; } @@ -1251,6 +1257,11 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return localProcessor == null ? 0 : localProcessor.eventQueueSize(); } + public int getEventSecondaryQueueSize() { + AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor; + return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize(); + } + public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) { this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 9309e43..7524203 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -31,25 +31,12 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; import org.apache.geode.GemFireException; import org.apache.geode.SystemFailure; -import org.apache.geode.cache.CacheException; -import org.apache.geode.cache.EntryEvent; -import org.apache.geode.cache.Operation; -import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionDestroyedException; +import org.apache.geode.cache.*; import org.apache.geode.cache.wan.GatewayEventFilter; import org.apache.geode.cache.wan.GatewayQueueEvent; import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.distributed.internal.DistributionConfig; -import org.apache.geode.internal.cache.BucketRegion; -import org.apache.geode.internal.cache.Conflatable; -import org.apache.geode.internal.cache.DistributedRegion; -import org.apache.geode.internal.cache.EntryEventImpl; -import org.apache.geode.internal.cache.EnumListenerEvent; -import org.apache.geode.internal.cache.EventID; -import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.internal.cache.LocalRegion; -import org.apache.geode.internal.cache.PartitionedRegion; -import org.apache.geode.internal.cache.RegionQueue; +import org.apache.geode.internal.cache.*; import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderQueue; @@ -270,6 +257,50 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { return this.queue.size(); } + public int eventSecondaryQueueSize() { + if (queue == null) { + return 0; + } + + // if parallel, get both primary and secondary queues' size, then substract primary queue's size + if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) { + int size = ((ConcurrentParallelGatewaySenderQueue) queue).localSize(true) + - ((ConcurrentParallelGatewaySenderQueue) queue).localSize(false); + return size; + } + return this.queue.size(); + } + + public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) { + if (queue == null) { + return; + } + if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) { + ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue; + PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath()); + if (prQ == null) { + if (logger.isDebugEnabled()) { + logger.debug("shadow partitioned region " + event.getRegion().getFullPath() + + " is not created yet."); + } + return; + } + int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) event); + long shadowKey = event.getTailKey(); + + ParallelGatewaySenderQueue pgsq = + (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId); + boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary(); + if (isPrimary) { + pgsq.addRemovedEvent(prQ, bucketId, shadowKey); + if (logger.isDebugEnabled()) { + logger.debug("register dropped event for primary queue. BucketId is " + bucketId + + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath()); + } + } + } + } + /** * @return the sender */ diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java index 4fc940c..e556910 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java @@ -121,6 +121,11 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue { return this.processors[0].getQueue().size(); } + public String displayContent() { + ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) (processors[0].getQueue()); + return pgsq.displayContent(); + } + public int localSize() { return localSize(false); } @@ -190,6 +195,10 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue { return processors[index]; } + public RegionQueue getQueueByBucket(int bucketId) { + return getPGSProcessor(bucketId).getQueue(); + } + public BlockingQueue<GatewaySenderEventImpl> getBucketTmpQueue(int bucketId) { return getPGSProcessor(bucketId).getBucketTmpQueue(bucketId); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 3aa8534..907a265 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1112,7 +1112,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { // This method may need synchronization in case it is used by // ConcurrentParallelGatewaySender - protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) { + public void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) { StoppableReentrantLock lock = buckToDispatchLock; if (lock != null) { lock.lock(); @@ -1401,6 +1401,22 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return (BucketRegionQueue) prQ.getDataStore().getLocalBucketById(bucketId); } + public String displayContent() { + int size = 0; + StringBuffer sb = new StringBuffer(); + for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) { + if (prQ != null && prQ.getDataStore() != null) { + Set<BucketRegion> allLocalBuckets = prQ.getDataStore().getAllLocalBucketRegions(); + for (BucketRegion br : allLocalBuckets) { + if (br.size() > 0) { + sb.append("bucketId=" + br.getId() + ":" + br.keySet() + ";"); + } + } + } + } + return sb.toString(); + } + public int localSize() { return localSize(false); } diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java index 226595b..95cdb39 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -933,6 +933,8 @@ public class WANTestBase extends DistributedTestCase { } props.setProperty(MCAST_PORT, "0"); props.setProperty(LOCATORS, "localhost[" + locPort + "]"); + String logLevel = System.getProperty(LOG_LEVEL, "info"); + props.setProperty(LOG_LEVEL, logLevel); InternalDistributedSystem ds = test.getSystem(props); cache = CacheFactory.create(ds); } @@ -2746,7 +2748,7 @@ public class WANTestBase extends DistributedTestCase { public static void validateQueueSizeStat(String id, final int queueSize) { final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id); - Awaitility.await().atMost(30, TimeUnit.SECONDS) + Awaitility.await().atMost(60, TimeUnit.SECONDS) .until(() -> assertEquals(queueSize, sender.getEventQueueSize())); assertEquals(queueSize, sender.getEventQueueSize()); } @@ -3053,6 +3055,17 @@ public class WANTestBase extends DistributedTestCase { }); } + public static String displayQueueContent(final RegionQueue queue) { + if (queue instanceof ParallelGatewaySenderQueue) { + ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) queue; + return pgsq.displayContent(); + } else if (queue instanceof ConcurrentParallelGatewaySenderQueue) { + ConcurrentParallelGatewaySenderQueue pgsq = (ConcurrentParallelGatewaySenderQueue) queue; + return pgsq.displayContent(); + } + return null; + } + public static Integer getQueueContentSize(final String senderId) { return getQueueContentSize(senderId, false); } @@ -3135,6 +3148,7 @@ public class WANTestBase extends DistributedTestCase { ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0]; Set<BucketRegion> buckets = ((PartitionedRegion) regionQueue.getRegion()).getDataStore() .getAllLocalPrimaryBucketRegions(); + final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender; for (final BucketRegion bucket : buckets) { Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> { assertEquals("Expected bucket entries for bucket: " + bucket.getId() @@ -3143,6 +3157,16 @@ public class WANTestBase extends DistributedTestCase { bucket.keySet().size()); }); } // for loop ends + assertEquals("Except events in all primary queues after drain is 0", 0, + abstractSender.getEventQueueSize()); + + Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> { + assertEquals("Expected events in all secondary queues are drained but actual is " + + abstractSender.getEventSecondaryQueueSize() + ". Queue content is: " + + displayQueueContent(regionQueue), 0, abstractSender.getEventSecondaryQueueSize()); + }); + assertEquals("Except events in all secondary queues after drain is 0", 0, + abstractSender.getEventSecondaryQueueSize()); } finally { exp.remove(); exp1.remove(); diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java index eaef4f9..780f3a9 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java @@ -53,7 +53,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { addIgnoredException("Broken pipe||Unexpected IOException"); } - @Test(timeout = 300_000) + // @Test(timeout = 300_000) public void testStopOneConcurrentGatewaySenderWithSSL() throws Exception { Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort)); @@ -90,7 +90,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { vm5.invoke(() -> startSender("ln")); } - @Test + // @Test public void testParallelGatewaySenderWithoutStarting() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; @@ -114,7 +114,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { * <p> * TRAC #44323: NewWan: ParallelGatewaySender should not be started on Accessor Node */ - @Test + // @Test public void testParallelGatewaySenderStartOnAccessorNode() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; @@ -136,7 +136,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { /** * Normal scenario in which the sender is paused in between. */ - @Test + // @Test public void testParallelPropagationSenderPause() throws Exception { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; @@ -167,7 +167,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { /** * Normal scenario in which a paused sender is resumed. */ - @Test + // @Test public void testParallelPropagationSenderResume() throws Exception { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; @@ -204,7 +204,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { * resume is only valid for pause. If a sender which is stopped is resumed, it will not be started * again. */ - @Test + // @Test public void testParallelPropagationSenderResumeNegativeScenario() throws Exception { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; @@ -259,7 +259,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { /** * Normal scenario in which a sender is stopped. */ - @Test + // @Test public void testParallelPropagationSenderStop() throws Exception { addIgnoredException("Broken pipe"); Integer[] locatorPorts = createLNAndNYLocators(); @@ -288,7 +288,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { /** * Normal scenario in which a sender is stopped and then started again. */ - @Test + // @Test public void testParallelPropagationSenderStartAfterStop() throws Exception { addIgnoredException("Broken pipe"); Integer[] locatorPorts = createLNAndNYLocators(); @@ -425,7 +425,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { /** * Normal scenario in which a sender is stopped and then started again on accessor node. */ - @Test + // @Test public void testParallelPropagationSenderStartAfterStopOnAccessorNode() throws Exception { addIgnoredException("Broken pipe"); addIgnoredException("Connection reset"); @@ -473,7 +473,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { /** * Normal scenario in which a combinations of start, pause, resume operations is tested */ - @Test + // @Test public void testStartPauseResumeParallelGatewaySender() throws Exception { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; @@ -527,7 +527,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { * Since the sender is attached to a region and in use, it can not be destroyed. Hence, exception * is thrown by the sender API. */ - @Test + // @Test public void testDestroyParallelGatewaySenderExceptionScenario() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; @@ -556,7 +556,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { vm2.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 1000)); } - @Test + // @Test public void testDestroyParallelGatewaySender() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; -- To stop receiving notification emails like this one, please contact zho...@apache.org.