This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 26f976b Revert "GEODE-4942: when sender is starting, and not running yet, put event at primary should be saved for QueueRemovalMessage (#1740)" 26f976b is described below commit 26f976b12637b64f34d0c58833e094fce999956e Author: zhouxh <gz...@pivotal.io> AuthorDate: Tue Apr 10 14:08:49 2018 -0700 Revert "GEODE-4942: when sender is starting, and not running yet, put event at primary should be saved for QueueRemovalMessage (#1740)" This reverts commit e42ebec32518c014ab6bd3ffe6d1ee3cab813762. Due to acceptance test failed after merge. Need to double check the merge. --- .../asyncqueue/internal/AsyncEventQueueStats.java | 5 -- .../geode/internal/cache/EntryEventImpl.java | 3 - .../internal/cache/PartitionedRegionDataStore.java | 6 +- .../internal/cache/wan/AbstractGatewaySender.java | 15 +---- .../wan/AbstractGatewaySenderEventProcessor.java | 50 +------------- .../internal/cache/wan/GatewaySenderStats.java | 16 ----- .../ConcurrentParallelGatewaySenderQueue.java | 9 --- .../wan/parallel/ParallelGatewaySenderQueue.java | 20 +----- .../geode/internal/cache/wan/WANTestBase.java | 78 +++------------------- .../ParallelGatewaySenderOperationsDUnitTest.java | 38 ++--------- .../SerialGatewaySenderOperationsDUnitTest.java | 49 +------------- 11 files changed, 26 insertions(+), 263 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java index b8259a3..dee2c92 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java @@ -47,9 +47,6 @@ public class AsyncEventQueueStats extends GatewaySenderStats { f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED, "Number of events received but not added to the event queue because the queue already contains an event with the event's key.", "operations"), - f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added to queue.", "events"), - f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER, - "Number of events not added to primary queue due to sender yet runing.", "events"), f.createIntCounter(EVENTS_CONFLATED_FROM_BATCHES, "Number of events conflated from batches.", "operations"), f.createIntCounter(EVENTS_DISTRIBUTED, @@ -125,8 +122,6 @@ public class AsyncEventQueueStats extends GatewaySenderStats { unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE); conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE); notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS); - notQueuedEventsAtYetRunningPrimarySenderId = - type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER); eventsFilteredId = type.nameToId(EVENTS_FILTERED); eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES); loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java index 664d054..c91d236 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java @@ -2158,9 +2158,6 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent, if (this.getInhibitDistribution()) { buf.append(";inhibitDistribution"); } - if (this.tailKey != -1) { - buf.append(";tailKey=" + tailKey); - } buf.append("]"); return buf.toString(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java index d468ef4..ef8eb99 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java @@ -2450,10 +2450,10 @@ public class PartitionedRegionDataStore implements HasCachePerfStats { return sizeOfLocalPrimaries; } - public int getSizeOfLocalBuckets() { + public int getSizeOfLocalBuckets(boolean includeSecondary) { int sizeOfLocal = 0; - Set<BucketRegion> allLocalBuckets = getAllLocalBucketRegions(); - for (BucketRegion br : allLocalBuckets) { + Set<BucketRegion> primaryBuckets = getAllLocalBucketRegions(); + for (BucketRegion br : primaryBuckets) { sizeOfLocal += br.size(); } return sizeOfLocal; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 034d810..a134e1e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -849,10 +849,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi // If this gateway is not running, return if (!isRunning()) { if (isDebugEnabled) { - logger.debug("Returning back without putting into the gateway sender queue:" + event); - } - if (this.eventProcessor != null) { - this.eventProcessor.registerEventDroppedInPrimaryQueue(event); + logger.debug("Returning back without putting into the gateway sender queue"); } return; } @@ -965,10 +962,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi // The sender may have stopped, after we have checked the status in the beginning. if (!isRunning()) { if (isDebugEnabled) { - logger.debug("Returning back without putting into the gateway sender queue:" + event); - } - if (this.eventProcessor != null) { - this.eventProcessor.registerEventDroppedInPrimaryQueue(event); + logger.debug("Returning back without putting into the gateway sender queue"); } return; } @@ -1257,11 +1251,6 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return localProcessor == null ? 0 : localProcessor.eventQueueSize(); } - public int getEventSecondaryQueueSize() { - AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor; - return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize(); - } - public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) { this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index eea7480..9309e43 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -33,7 +33,6 @@ import org.apache.geode.GemFireException; import org.apache.geode.SystemFailure; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.EntryEvent; -import org.apache.geode.cache.EntryOperation; import org.apache.geode.cache.Operation; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; @@ -50,7 +49,6 @@ import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; -import org.apache.geode.internal.cache.PartitionedRegionHelper; import org.apache.geode.internal.cache.RegionQueue; import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; @@ -263,57 +261,15 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } // This should be local size instead of pr size - if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) { - return ((ConcurrentParallelGatewaySenderQueue) queue).localSize(); - } - return this.queue.size(); - } - - public int eventSecondaryQueueSize() { - if (queue == null) { - return 0; + if (this.queue instanceof ParallelGatewaySenderQueue) { + return ((ParallelGatewaySenderQueue) queue).localSize(); } - - // if parallel, get both primary and secondary queues' size, then substract primary queue's size if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) { - int size = ((ConcurrentParallelGatewaySenderQueue) queue).localSize(true) - - ((ConcurrentParallelGatewaySenderQueue) queue).localSize(false); - return size; + return ((ConcurrentParallelGatewaySenderQueue) queue).localSize(); } return this.queue.size(); } - public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) { - if (queue == null) { - return; - } - if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) { - ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue; - PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath()); - if (prQ == null) { - if (logger.isDebugEnabled()) { - logger.debug("shadow partitioned region " + event.getRegion().getFullPath() - + " is not created yet."); - } - return; - } - int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) event); - long shadowKey = event.getTailKey(); - - ParallelGatewaySenderQueue pgsq = - (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId); - boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary(); - if (isPrimary) { - pgsq.addRemovedEvent(prQ, bucketId, shadowKey); - this.sender.getStatistics().incEventsNotQueuedAtYetRunningPrimarySender(); - if (logger.isDebugEnabled()) { - logger.debug("register dropped event for primary queue. BucketId is " + bucketId - + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath()); - } - } - } - } - /** * @return the sender */ diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java index adaf928..c7fd370 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java @@ -84,8 +84,6 @@ public class GatewaySenderStats { protected static final String EVENTS_FILTERED = "eventsFiltered"; protected static final String NOT_QUEUED_EVENTS = "notQueuedEvent"; - protected static final String NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER = - "notQueuedEventAtYetRunningPrimarySender"; protected static final String LOAD_BALANCES_COMPLETED = "loadBalancesCompleted"; protected static final String LOAD_BALANCES_IN_PROGRESS = "loadBalancesInProgress"; @@ -137,8 +135,6 @@ public class GatewaySenderStats { protected static int eventsFilteredId; /** Id of not queued events */ protected static int notQueuedEventsId; - /** Id of not queued events due to the primary sender is yet running */ - protected static int notQueuedEventsAtYetRunningPrimarySenderId; /** Id of events conflated in batch */ protected static int eventsConflatedFromBatchesId; /** Id of load balances completed */ @@ -217,8 +213,6 @@ public class GatewaySenderStats { f.createIntGauge(CONFLATION_INDEXES_MAP_SIZE, "Current number of entries in the conflation indexes map.", "events"), f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added to queue.", "events"), - f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER, - "Number of events not added to primary queue due to sender yet runing.", "events"), f.createIntCounter(EVENTS_FILTERED, "Number of events filtered through GatewayEventFilter.", "events"), f.createIntCounter(LOAD_BALANCES_COMPLETED, "Number of load balances completed", @@ -255,8 +249,6 @@ public class GatewaySenderStats { unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE); conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE); notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS); - notQueuedEventsAtYetRunningPrimarySenderId = - type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER); eventsFilteredId = type.nameToId(EVENTS_FILTERED); eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES); loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED); @@ -607,14 +599,6 @@ public class GatewaySenderStats { return this.stats.getInt(notQueuedEventsId); } - public void incEventsNotQueuedAtYetRunningPrimarySender() { - this.stats.incInt(notQueuedEventsAtYetRunningPrimarySenderId, 1); - } - - public int getEventsNotQueuedAtYetRunningPrimarySender() { - return this.stats.getInt(notQueuedEventsAtYetRunningPrimarySenderId); - } - public void incEventsFiltered() { this.stats.incInt(eventsFilteredId, 1); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java index e556910..4fc940c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java @@ -121,11 +121,6 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue { return this.processors[0].getQueue().size(); } - public String displayContent() { - ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) (processors[0].getQueue()); - return pgsq.displayContent(); - } - public int localSize() { return localSize(false); } @@ -195,10 +190,6 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue { return processors[index]; } - public RegionQueue getQueueByBucket(int bucketId) { - return getPGSProcessor(bucketId).getQueue(); - } - public BlockingQueue<GatewaySenderEventImpl> getBucketTmpQueue(int bucketId) { return getPGSProcessor(bucketId).getBucketTmpQueue(bucketId); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 89880fc..3aa8534 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1112,7 +1112,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { // This method may need synchronization in case it is used by // ConcurrentParallelGatewaySender - public void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) { + protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) { StoppableReentrantLock lock = buckToDispatchLock; if (lock != null) { lock.lock(); @@ -1405,28 +1405,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return localSize(false); } - public String displayContent() { - int size = 0; - StringBuffer sb = new StringBuffer(); - for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) { - if (prQ != null && prQ.getDataStore() != null) { - Set<BucketRegion> allLocalBuckets = prQ.getDataStore().getAllLocalBucketRegions(); - for (BucketRegion br : allLocalBuckets) { - if (br.size() > 0) { - sb.append("bucketId=" + br.getId() + ":" + br.keySet() + ";"); - } - } - } - } - return sb.toString(); - } - public int localSize(boolean includeSecondary) { int size = 0; for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) { if (prQ != null && prQ.getDataStore() != null) { if (includeSecondary) { - size += prQ.getDataStore().getSizeOfLocalBuckets(); + size += prQ.getDataStore().getSizeOfLocalBuckets(true); } else { size += prQ.getDataStore().getSizeOfLocalPrimaryBuckets(); } diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java index 3799083..226595b 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -933,8 +933,6 @@ public class WANTestBase extends DistributedTestCase { } props.setProperty(MCAST_PORT, "0"); props.setProperty(LOCATORS, "localhost[" + locPort + "]"); - String logLevel = System.getProperty(LOG_LEVEL, "info"); - props.setProperty(LOG_LEVEL, logLevel); InternalDistributedSystem ds = test.getSystem(props); cache = CacheFactory.create(ds); } @@ -1157,21 +1155,6 @@ public class WANTestBase extends DistributedTestCase { return stats; } - public static List<Integer> getSenderStatsForDroppedEvents(String senderId) { - AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId); - GatewaySenderStats statistics = sender.getStatistics(); - ArrayList<Integer> stats = new ArrayList<Integer>(); - int eventNotQueued = statistics.getEventsNotQueuedAtYetRunningPrimarySender(); - if (eventNotQueued > 0) { - logger.info( - "Found " + eventNotQueued + " not queued events due to primary sender is yet running"); - } - stats.add(eventNotQueued); - stats.add(statistics.getEventsNotQueued()); - stats.add(statistics.getEventsNotQueuedConflated()); - return stats; - } - public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived, final int eventsQueued, final int eventsDistributed) { GatewaySenderStats statistics = getGatewaySenderStats(senderId); @@ -2763,21 +2746,11 @@ public class WANTestBase extends DistributedTestCase { public static void validateQueueSizeStat(String id, final int queueSize) { final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id); - Awaitility.await().atMost(60, TimeUnit.SECONDS) + Awaitility.await().atMost(30, TimeUnit.SECONDS) .until(() -> assertEquals(queueSize, sender.getEventQueueSize())); assertEquals(queueSize, sender.getEventQueueSize()); } - public static void validateSecondaryQueueSizeStat(String id, final int queueSize) { - final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id); - Awaitility.await().atMost(120, TimeUnit.SECONDS) - .until(() -> assertEquals( - "Expected unprocessedEventMap is drained but actual is " - + sender.getStatistics().getUnprocessedEventMapSize(), - queueSize, sender.getStatistics().getUnprocessedEventMapSize())); - assertEquals(queueSize, sender.getStatistics().getUnprocessedEventMapSize()); - } - /** * This method is specifically written for pause and stop operations. This method validates that * the region size remains same for at least minimum number of verification attempts and also it @@ -3080,31 +3053,6 @@ public class WANTestBase extends DistributedTestCase { }); } - public static Integer getSecondaryQueueContentSize(final String senderId) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender; - int size = abstractSender.getEventSecondaryQueueSize(); - return size; - } - - public static String displayQueueContent(final RegionQueue queue) { - if (queue instanceof ParallelGatewaySenderQueue) { - ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) queue; - return pgsq.displayContent(); - } else if (queue instanceof ConcurrentParallelGatewaySenderQueue) { - ConcurrentParallelGatewaySenderQueue pgsq = (ConcurrentParallelGatewaySenderQueue) queue; - return pgsq.displayContent(); - } - return null; - } - public static Integer getQueueContentSize(final String senderId) { return getQueueContentSize(senderId, false); } @@ -3187,22 +3135,14 @@ public class WANTestBase extends DistributedTestCase { ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0]; Set<BucketRegion> buckets = ((PartitionedRegion) regionQueue.getRegion()).getDataStore() .getAllLocalPrimaryBucketRegions(); - final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender; - RegionQueue queue = abstractSender.getEventProcessor().queue; - Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { - assertEquals("Expected events in all primary queues are drained but actual is " - + abstractSender.getEventQueueSize() + ". Queue content is: " - + displayQueueContent(queue), 0, abstractSender.getEventQueueSize()); - }); - assertEquals("Expected events in all primary queues after drain is 0", 0, - abstractSender.getEventQueueSize()); - Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> { - assertEquals("Expected events in all secondary queues are drained but actual is " - + abstractSender.getEventSecondaryQueueSize() + ". Queue content is: " - + displayQueueContent(queue), 0, abstractSender.getEventSecondaryQueueSize()); - }); - assertEquals("Except events in all secondary queues after drain is 0", 0, - abstractSender.getEventSecondaryQueueSize()); + for (final BucketRegion bucket : buckets) { + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> { + assertEquals("Expected bucket entries for bucket: " + bucket.getId() + + " is: 0 but actual entries: " + bucket.keySet().size() + " This bucket isPrimary: " + + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet(), 0, + bucket.keySet().size()); + }); + } // for loop ends } finally { exp.remove(); exp1.remove(); diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java index f5b98b7..eaef4f9 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java @@ -20,8 +20,6 @@ import static org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_S import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException; import static org.assertj.core.api.Assertions.assertThat; -import java.util.ArrayList; - import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -409,42 +407,18 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { vm2.invoke(() -> validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200)); // SECOND RUN: start async puts on region - ArrayList<Integer> vm4List = null; - ArrayList<Integer> vm5List = null; - ArrayList<Integer> vm6List = null; - ArrayList<Integer> vm7List = null; - boolean foundDroppedAtYetStartedPrimarySender = false; - int count = 0; - - do { - stopSenders(); - AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 5000)); - - // when puts are happening by another thread, start the senders - startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); - - async.join(); - vm4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); - vm5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); - vm6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); - vm7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); - if (vm4List.get(0) + vm5List.get(0) + vm6List.get(0) + vm7List.get(0) > 0) { - foundDroppedAtYetStartedPrimarySender = true; - } - count++; - } while (foundDroppedAtYetStartedPrimarySender == false && count < 5); - assertThat(foundDroppedAtYetStartedPrimarySender); + AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 5000)); + + // when puts are happening by another thread, start the senders + startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); + + async.join(); // verify all the buckets on all the sender nodes are drained validateParallelSenderQueueAllBucketsDrained(); // verify that the queue size ultimately becomes zero. That means all the events propagate to // remote site. - vm4.invoke(() -> validateQueueContents("ln", 0)); } diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java index 8df5650..ee43b83 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java @@ -269,53 +269,6 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase { } @Test - public void testRestartSerialGatewaySendersWhilePutting() throws Throwable { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createSenderCaches(lnPort); - - createSenderVM4(); - createSenderVM5(); - - createReceiverRegions(); - - createSenderRegions(); - - vm7.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 20)); - - startSenderInVMs("ln", vm4, vm5); - - vm7.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 20)); - - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 20)); - vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 20)); - - vm4.invoke(() -> WANTestBase.stopSender("ln")); - vm5.invoke(() -> WANTestBase.stopSender("ln")); - - vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState("ln")); - vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState("ln")); - - vm4.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0)); - vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0)); - - // do a lot of puts while senders are restarting - AsyncInvocation async = vm7.invokeAsync(() -> doPuts(getTestMethodName() + "_RR", 5000)); - - startSenderInVMsAsync("ln", vm4, vm5); - async.join(); - - vm4.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0)); - vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0)); - vm4.invoke(() -> WANTestBase.validateSecondaryQueueSizeStat("ln", 0)); - vm5.invoke(() -> WANTestBase.validateSecondaryQueueSizeStat("ln", 0)); - } - - @Test public void testStopOneSerialGatewaySenderBothPrimary() throws Throwable { Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -345,7 +298,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 200)); vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 200)); - // Do some puts from both vm4 and vm5 while restarting a sender + // Do some puts while restarting a sender AsyncInvocation asyncPuts = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 300)); -- To stop receiving notification emails like this one, please contact zho...@apache.org.