This is an automated email from the ASF dual-hosted git repository. mkevo pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 4c00984 GEODE-8765: Fix NullPointerException when group-transaction-events an… (#5829) 4c00984 is described below commit 4c00984eb3c32972b4b03fdbe2b6397decdc177f Author: Alberto Gomez <alberto.go...@est.tech> AuthorDate: Mon Dec 14 10:56:31 2020 +0100 GEODE-8765: Fix NullPointerException when group-transaction-events an… (#5829) * GEODE-8765: Fix NullPointerException when group-transaction-events and events in and not in transactions are sent. --- .../wan/parallel/ParallelGatewaySenderQueue.java | 60 +++++--- .../cache/wan/serial/SerialGatewaySenderQueue.java | 58 ++++---- .../internal/cache/BucketRegionQueueJUnitTest.java | 41 +++--- .../parallel/ParallelWANPropagationDUnitTest.java | 137 ++++++++++++++++-- ...lWANPropagation_PartitionedRegionDUnitTest.java | 160 ++++++++++++++++++--- 5 files changed, 351 insertions(+), 105 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 18c1624..2100d54 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1260,7 +1260,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { addPeekedEvents(batch, batchSize == BATCH_BASED_ON_TIME_ONLY ? DEFAULT_BATCH_SIZE : batchSize); int bId; - Map<TransactionId, Integer> incompleteTransactionsInBatch = new HashMap<>(); while (batchSize == BATCH_BASED_ON_TIME_ONLY || batch.size() < batchSize) { if (areLocalBucketQueueRegionsPresent() && ((bId = getRandomPrimaryBucket(prQ)) != -1)) { GatewaySenderEventImpl object = (GatewaySenderEventImpl) peekAhead(prQ, bId); @@ -1280,13 +1279,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { logger.debug("The gatewayEventImpl in peek is {}", object); } batch.add(object); - if (object.getTransactionId() != null) { - if (object.isLastEventInTransaction()) { - incompleteTransactionsInBatch.remove(object.getTransactionId()); - } else { - incompleteTransactionsInBatch.put(object.getTransactionId(), bId); - } - } peekedEvents.add(object); } else { @@ -1316,7 +1308,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } if (batch.size() > 0) { - peekEventsFromIncompleteTransactions(batch, incompleteTransactionsInBatch, prQ); + peekEventsFromIncompleteTransactions(batch, prQ); } if (isDebugEnabled) { @@ -1351,12 +1343,14 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } private void peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> batch, - Map<TransactionId, Integer> incompleteTransactionIdsInBatch, PartitionedRegion prQ) { + PartitionedRegion prQ) { if (!mustGroupTransactionEvents()) { return; } - if (areAllTransactionsCompleteInBatch(incompleteTransactionIdsInBatch)) { + Map<TransactionId, Integer> incompleteTransactionIdsInBatch = + getIncompleteTransactionsInBatch(batch); + if (incompleteTransactionIdsInBatch.size() == 0) { return; } @@ -1389,8 +1383,19 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } } - private boolean areAllTransactionsCompleteInBatch(Map incompleteTransactions) { - return (incompleteTransactions.size() == 0); + private Map<TransactionId, Integer> getIncompleteTransactionsInBatch( + List<GatewaySenderEventImpl> batch) { + Map<TransactionId, Integer> incompleteTransactionsInBatch = new HashMap<>(); + for (GatewaySenderEventImpl event : batch) { + if (event.getTransactionId() != null) { + if (event.isLastEventInTransaction()) { + incompleteTransactionsInBatch.remove(event.getTransactionId()); + } else { + incompleteTransactionsInBatch.put(event.getTransactionId(), event.getBucketId()); + } + } + } + return incompleteTransactionsInBatch; } @VisibleForTesting @@ -1472,19 +1477,18 @@ public class ParallelGatewaySenderQueue implements RegionQueue { for (int i = 0; i < batchSize || incompleteTransactionsInBatch.size() != 0; i++) { GatewaySenderEventImpl event = this.peekedEventsProcessing.remove(); batch.add(event); - if (event.getTransactionId() != null) { - if (event.isLastEventInTransaction()) { - incompleteTransactionsInBatch.remove(event.getTransactionId()); - } else { - incompleteTransactionsInBatch.add(event.getTransactionId()); + if (mustGroupTransactionEvents()) { + if (event.getTransactionId() != null) { + if (event.isLastEventInTransaction()) { + incompleteTransactionsInBatch.remove(event.getTransactionId()); + } else { + incompleteTransactionsInBatch.add(event.getTransactionId()); + } } } if (this.peekedEventsProcessing.isEmpty()) { this.resetLastPeeked = false; this.peekedEventsProcessingInProgress = false; - if (incompleteTransactionsInBatch.size() != 0) { - logger.error("A batch with incomplete transactions has been sent."); - } break; } } @@ -1547,9 +1551,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { try { Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate = - x -> x.getTransactionId().equals(transactionId); + getHasTransactionIdPredicate(transactionId); Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate = - x -> x.isLastEventInTransaction(); + getIsLastEventInTransactionPredicate(); objects = brq.getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate); } catch (BucketRegionQueueUnavailableException e) { @@ -1561,6 +1565,16 @@ public class ParallelGatewaySenderQueue implements RegionQueue { // finished with peeked objects. } + @VisibleForTesting + public static Predicate<GatewaySenderEventImpl> getIsLastEventInTransactionPredicate() { + return x -> x.isLastEventInTransaction(); + } + + @VisibleForTesting + public static Predicate<GatewaySenderEventImpl> getHasTransactionIdPredicate( + TransactionId transactionId) { + return x -> transactionId.equals(x.getTransactionId()); + } protected BucketRegionQueue getBucketRegionQueueByBucketId(final PartitionedRegion prQ, final int bucketId) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java index 6b57fa0..192af19 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java @@ -413,12 +413,12 @@ public class SerialGatewaySenderQueue implements RegionQueue { } @Override - public List<AsyncEvent> peek(int size) throws CacheException { + public List<AsyncEvent<?, ?>> peek(int size) throws CacheException { return peek(size, -1); } @Override - public List<AsyncEvent> peek(int size, int timeToWait) throws CacheException { + public List<AsyncEvent<?, ?>> peek(int size, int timeToWait) throws CacheException { final boolean isTraceEnabled = logger.isTraceEnabled(); long start = System.currentTimeMillis(); @@ -428,27 +428,16 @@ public class SerialGatewaySenderQueue implements RegionQueue { timeToWait); } - List<AsyncEvent> batch = - new ArrayList<AsyncEvent>(size == BATCH_BASED_ON_TIME_ONLY ? DEFAULT_BATCH_SIZE : size); - Set<TransactionId> incompleteTransactionsInBatch = new HashSet<>(); + List<AsyncEvent<?, ?>> batch = + new ArrayList<>(size == BATCH_BASED_ON_TIME_ONLY ? DEFAULT_BATCH_SIZE : size); long lastKey = -1; while (size == BATCH_BASED_ON_TIME_ONLY || batch.size() < size) { KeyAndEventPair pair = peekAhead(); // Conflate here if (pair != null) { - AsyncEvent object = pair.event; + AsyncEvent<?, ?> object = pair.event; lastKey = pair.key; batch.add(object); - if (object instanceof GatewaySenderEventImpl) { - GatewaySenderEventImpl event = (GatewaySenderEventImpl) object; - if (event.getTransactionId() != null) { - if (event.isLastEventInTransaction()) { - incompleteTransactionsInBatch.remove(event.getTransactionId()); - } else { - incompleteTransactionsInBatch.add(event.getTransactionId()); - } - } - } } else { // If time to wait is -1 (don't wait) or time interval has elapsed long currentTime = System.currentTimeMillis(); @@ -476,7 +465,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { } } if (batch.size() > 0) { - peekEventsFromIncompleteTransactions(batch, incompleteTransactionsInBatch, lastKey); + peekEventsFromIncompleteTransactions(batch, lastKey); } if (isTraceEnabled) { @@ -487,13 +476,13 @@ public class SerialGatewaySenderQueue implements RegionQueue { // so no need to worry about off-heap refCount. } - private void peekEventsFromIncompleteTransactions(List<AsyncEvent> batch, - Set<TransactionId> incompleteTransactionIdsInBatch, long lastKey) { + private void peekEventsFromIncompleteTransactions(List<AsyncEvent<?, ?>> batch, long lastKey) { if (!mustGroupTransactionEvents()) { return; } - if (areAllTransactionsCompleteInBatch(incompleteTransactionIdsInBatch)) { + Set<TransactionId> incompleteTransactionIdsInBatch = getIncompleteTransactionsInBatch(batch); + if (incompleteTransactionIdsInBatch.size() == 0) { return; } @@ -530,8 +519,21 @@ public class SerialGatewaySenderQueue implements RegionQueue { return sender.mustGroupTransactionEvents(); } - private boolean areAllTransactionsCompleteInBatch(Set incompleteTransactions) { - return (incompleteTransactions.size() == 0); + private Set<TransactionId> getIncompleteTransactionsInBatch(List<AsyncEvent<?, ?>> batch) { + Set<TransactionId> incompleteTransactionsInBatch = new HashSet<>(); + for (Object object : batch) { + if (object instanceof GatewaySenderEventImpl) { + GatewaySenderEventImpl event = (GatewaySenderEventImpl) object; + if (event.getTransactionId() != null) { + if (event.isLastEventInTransaction()) { + incompleteTransactionsInBatch.remove(event.getTransactionId()); + } else { + incompleteTransactionsInBatch.add(event.getTransactionId()); + } + } + } + } + return incompleteTransactionsInBatch; } @Override @@ -557,9 +559,9 @@ public class SerialGatewaySenderQueue implements RegionQueue { public void removeCacheListener() { AttributesMutator mutator = this.region.getAttributesMutator(); CacheListener[] listeners = this.region.getAttributes().getCacheListeners(); - for (int i = 0; i < listeners.length; i++) { - if (listeners[i] instanceof SerialSecondaryGatewayListener) { - mutator.removeCacheListener(listeners[i]); + for (CacheListener listener : listeners) { + if (listener instanceof SerialSecondaryGatewayListener) { + mutator.removeCacheListener(listener); break; } } @@ -591,7 +593,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { try { Map<Object, Long> latestIndexesForRegion = this.indexes.get(rName); if (latestIndexesForRegion == null) { - latestIndexesForRegion = new HashMap<Object, Long>(); + latestIndexesForRegion = new HashMap<>(); this.indexes.put(rName, latestIndexesForRegion); } @@ -664,7 +666,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { Object o = null; try { o = lr.getValueInVMOrDiskWithoutFaultIn(k); - if (o != null && o instanceof CachedDeserializable) { + if (o instanceof CachedDeserializable) { o = ((CachedDeserializable) o).getDeserializedValue(lr, lr.getRegionEntry(k)); } } catch (EntryNotFoundException ok) { @@ -842,7 +844,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { private EventsAndLastKey peekEventsWithTransactionId(TransactionId transactionId, long lastKey) { Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate = - x -> x.getTransactionId().equals(transactionId); + x -> transactionId.equals(x.getTransactionId()); Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate = x -> x.isLastEventInTransaction(); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java index 15906b1..4279832 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java @@ -37,8 +37,8 @@ import org.apache.geode.distributed.DistributedMember; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.cache.wan.GatewaySenderStats; -import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor; import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderHelper; +import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; import org.apache.geode.internal.statistics.DummyStatisticsFactory; import org.apache.geode.test.fake.Fakes; @@ -138,38 +138,39 @@ public class BucketRegionQueueJUnitTest { } @Test - public void testGetElementsMatchingWithHasTransactionIdPredicateAndIsLastEventInTransactionPredicate() + public void testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesAndSomeEventsNotInTransactions() throws ForceReattemptException { - ParallelGatewaySenderEventProcessor processor = - ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender); + ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender); TransactionId tx1 = new TXId(null, 1); TransactionId tx2 = new TXId(null, 2); TransactionId tx3 = new TXId(null, 3); GatewaySenderEventImpl event1 = createMockGatewaySenderEvent(1, tx1, false); - GatewaySenderEventImpl event2 = createMockGatewaySenderEvent(2, tx2, false); - GatewaySenderEventImpl event3 = createMockGatewaySenderEvent(3, tx1, true); - GatewaySenderEventImpl event4 = createMockGatewaySenderEvent(4, tx2, true); - GatewaySenderEventImpl event5 = createMockGatewaySenderEvent(5, tx3, false); - GatewaySenderEventImpl event6 = createMockGatewaySenderEvent(6, tx3, false); - GatewaySenderEventImpl event7 = createMockGatewaySenderEvent(7, tx1, true); + GatewaySenderEventImpl eventNotInTransaction1 = createMockGatewaySenderEvent(2, null, false); + GatewaySenderEventImpl event2 = createMockGatewaySenderEvent(3, tx2, false); + GatewaySenderEventImpl event3 = createMockGatewaySenderEvent(4, tx1, true); + GatewaySenderEventImpl event4 = createMockGatewaySenderEvent(5, tx2, true); + GatewaySenderEventImpl event5 = createMockGatewaySenderEvent(6, tx3, false); + GatewaySenderEventImpl event6 = createMockGatewaySenderEvent(7, tx3, false); + GatewaySenderEventImpl event7 = createMockGatewaySenderEvent(8, tx1, true); this.bucketRegionQueue .cleanUpDestroyedTokensAndMarkGIIComplete(InitialImageOperation.GIIStatus.NO_GII); - this.bucketRegionQueue.addToQueue(Long.valueOf(1), event1); - this.bucketRegionQueue.addToQueue(Long.valueOf(2), event2); - this.bucketRegionQueue.addToQueue(Long.valueOf(3), event3); - this.bucketRegionQueue.addToQueue(Long.valueOf(4), event4); - this.bucketRegionQueue.addToQueue(Long.valueOf(5), event5); - this.bucketRegionQueue.addToQueue(Long.valueOf(6), event6); - this.bucketRegionQueue.addToQueue(Long.valueOf(7), event7); + this.bucketRegionQueue.addToQueue(1L, event1); + this.bucketRegionQueue.addToQueue(2L, eventNotInTransaction1); + this.bucketRegionQueue.addToQueue(3L, event2); + this.bucketRegionQueue.addToQueue(4L, event3); + this.bucketRegionQueue.addToQueue(5L, event4); + this.bucketRegionQueue.addToQueue(6L, event5); + this.bucketRegionQueue.addToQueue(7L, event6); + this.bucketRegionQueue.addToQueue(8L, event7); Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate = - x -> x.getTransactionId().equals(tx1); + ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx1); Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate = - x -> x.isLastEventInTransaction(); + ParallelGatewaySenderQueue.getIsLastEventInTransactionPredicate(); List<Object> objects = this.bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate); @@ -182,7 +183,7 @@ public class BucketRegionQueueJUnitTest { assertEquals(objects, Arrays.asList(new Object[] {event7})); hasTransactionIdPredicate = - x -> x.getTransactionId().equals(tx2); + ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx2); objects = this.bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate); assertEquals(2, objects.size()); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java index 7423dbc..67ef206 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java @@ -17,6 +17,8 @@ package org.apache.geode.internal.cache.wan.parallel; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import junitparams.JUnitParamsRunner; @@ -33,6 +35,11 @@ import org.apache.geode.cache.client.ServerOperationException; import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.RegionQueue; +import org.apache.geode.internal.cache.execute.data.CustId; +import org.apache.geode.internal.cache.execute.data.Order; +import org.apache.geode.internal.cache.execute.data.OrderId; +import org.apache.geode.internal.cache.execute.data.Shipment; +import org.apache.geode.internal.cache.execute.data.ShipmentId; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.BatchException70; import org.apache.geode.internal.cache.wan.WANTestBase; @@ -55,7 +62,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { @Test public void test_ParallelGatewaySenderMetaRegionNotExposedToUser_Bug44216() { Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); createCache(lnPort); createSender("ln", 2, true, 100, 300, false, false, null, true); @@ -77,10 +84,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { } GemFireCacheImpl gemCache = (GemFireCacheImpl) cache; - Set regionSet = gemCache.rootRegions(); + Set<?> regionSet = gemCache.rootRegions(); for (Object r : regionSet) { - if (((Region) r).getName() + if (((Region<?, ?>) r).getName() .equals(((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0] .getRegion().getName())) { fail("The shadowPR is exposed to the user"); @@ -683,18 +690,18 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm2.invoke(createReceiverPartitionedRegionRedundancy1()); vm3.invoke(createReceiverPartitionedRegionRedundancy1()); - AsyncInvocation inv1 = + AsyncInvocation<Void> inv1 = vm7.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 5000)); Wait.pause(500); - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); - AsyncInvocation inv3 = + AsyncInvocation<Void> inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); + AsyncInvocation<Void> inv3 = vm6.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 10000)); Wait.pause(1500); - AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender()); - inv1.join(); - inv2.join(); - inv3.join(); - inv4.join(); + AsyncInvocation<Void> inv4 = vm5.invokeAsync(() -> WANTestBase.killSender()); + inv1.await(); + inv2.await(); + inv3.await(); + inv4.await(); vm6.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10000)); vm7.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10000)); @@ -1047,7 +1054,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "PARENT_PR", null, 0, 100, isOffHeap(), shortcut)); String parentRegionFullPath = - (String) vm3.invoke(() -> WANTestBase.getRegionFullPath(getTestMethodName() + "PARENT_PR")); + vm3.invoke(() -> WANTestBase.getRegionFullPath(getTestMethodName() + "PARENT_PR")); // create colocated (child) PR on site1 vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegion(getTestMethodName() + "CHILD_PR", @@ -1235,6 +1242,112 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln")); } + @Test + public void testPartitionedParallelPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> setNumDispatcherThreadsForTheRun(2)); + vm5.invoke(() -> setNumDispatcherThreadsForTheRun(2)); + vm6.invoke(() -> setNumDispatcherThreadsForTheRun(2)); + vm7.invoke(() -> setNumDispatcherThreadsForTheRun(2)); + + vm4.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true)); + vm5.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true)); + vm6.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true)); + vm7.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true)); + + vm4.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm5.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm6.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm7.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap())); + vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap())); + + int customers = 4; + + int transactionsPerCustomer = 1000; + final Map<Object, Object> keyValuesInTransactions = new HashMap<>(); + for (int custId = 0; custId < customers; custId++) { + for (int i = 0; i < transactionsPerCustomer; i++) { + CustId custIdObject = new CustId(custId); + OrderId orderId = new OrderId(i, custIdObject); + ShipmentId shipmentId1 = new ShipmentId(i, orderId); + ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId); + ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId); + keyValuesInTransactions.put(orderId, new Order()); + keyValuesInTransactions.put(shipmentId1, new Shipment()); + keyValuesInTransactions.put(shipmentId2, new Shipment()); + keyValuesInTransactions.put(shipmentId3, new Shipment()); + } + } + + int ordersPerCustomerNotInTransactions = 1000; + + final Map<Object, Object> keyValuesNotInTransactions = new HashMap<>(); + for (int custId = 0; custId < customers; custId++) { + for (int i = 0; i < ordersPerCustomerNotInTransactions; i++) { + CustId custIdObject = new CustId(custId); + OrderId orderId = new OrderId(i + transactionsPerCustomer * customers, custIdObject); + keyValuesNotInTransactions.put(orderId, new Order()); + } + } + + // eventsPerTransaction is 1 (orders) + 3 (shipments) + int eventsPerTransaction = 4; + AsyncInvocation<Void> inv1 = + vm7.invokeAsync( + () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions, + eventsPerTransaction)); + + AsyncInvocation<Void> inv2 = + vm6.invokeAsync( + () -> WANTestBase.putGivenKeyValue(orderRegionName, keyValuesNotInTransactions)); + + inv1.await(); + inv2.await(); + + int entries = + ordersPerCustomerNotInTransactions * customers + transactionsPerCustomer * customers; + + vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries)); + vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries)); + vm6.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries)); + vm7.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries)); + vm3.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries)); + + vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0)); + vm5.invoke(() -> WANTestBase.checkConflatedStats("ln", 0)); + vm6.invoke(() -> WANTestBase.checkConflatedStats("ln", 0)); + vm7.invoke(() -> WANTestBase.checkConflatedStats("ln", 0)); + + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } private static RegionShortcut[] getRegionShortcuts() { return new RegionShortcut[] {RegionShortcut.PARTITION, RegionShortcut.PARTITION_PERSISTENT}; diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java index 59b5b7a..5604a96 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java @@ -14,12 +14,20 @@ */ package org.apache.geode.internal.cache.wan.serial; +import java.util.HashMap; +import java.util.Map; + import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.CancelException; import org.apache.geode.cache.CacheClosedException; import org.apache.geode.internal.cache.ForceReattemptException; +import org.apache.geode.internal.cache.execute.data.CustId; +import org.apache.geode.internal.cache.execute.data.Order; +import org.apache.geode.internal.cache.execute.data.OrderId; +import org.apache.geode.internal.cache.execute.data.Shipment; +import org.apache.geode.internal.cache.execute.data.ShipmentId; import org.apache.geode.internal.cache.wan.WANTestBase; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.IgnoredException; @@ -35,10 +43,10 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase } @Test - public void testPartitionedSerialPropagation() throws Exception { + public void testPartitionedSerialPropagation() { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); createCacheInVMs(nyPort, vm2, vm3); createReceiverInVMs(vm2, vm3); @@ -71,10 +79,10 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase } @Test - public void testBothReplicatedAndPartitionedSerialPropagation() throws Exception { + public void testBothReplicatedAndPartitionedSerialPropagation() { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); createCacheInVMs(nyPort, vm2, vm3); createReceiverInVMs(vm2, vm3); @@ -122,10 +130,10 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase } @Test - public void testSerialReplicatedAndPartitionedPropagation() throws Exception { + public void testSerialReplicatedAndPartitionedPropagation() { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); createCacheInVMs(nyPort, vm2, vm3); createReceiverInVMs(vm2, vm3); @@ -176,10 +184,10 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase } @Test - public void testSerialReplicatedAndSerialPartitionedPropagation() throws Exception { + public void testSerialReplicatedAndSerialPartitionedPropagation() { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); createCacheInVMs(nyPort, vm2, vm3); createReceiverInVMs(vm2, vm3); @@ -236,11 +244,11 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase } @Test - public void testPartitionedSerialPropagationToTwoWanSites() throws Exception { + public void testPartitionedSerialPropagationToTwoWanSites() { Integer lnPort = createFirstLocatorWithDSId(1); - Integer nyPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); - Integer tkPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(3, lnPort)); + Integer nyPort = vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer tkPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(3, lnPort)); createCacheInVMs(nyPort, vm2); vm2.invoke(() -> WANTestBase.createReceiver()); @@ -288,8 +296,8 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase IgnoredException.addIgnoredException("Connection reset"); IgnoredException.addIgnoredException("Unexpected IOException"); - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); createCacheInVMs(nyPort, vm2, vm3); createReceiverInVMs(vm2, vm3); @@ -322,12 +330,12 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase IgnoredException.addIgnoredException(CacheClosedException.class.getName()); IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); // start async puts - AsyncInvocation inv = + AsyncInvocation<Void> inv = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); // close the cache on vm4 in between the puts vm4.invoke(() -> WANTestBase.killSender()); - inv.join(); + inv.await(); vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0)); @@ -335,10 +343,10 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase } @Test - public void testPartitionedSerialPropagationWithParallelThreads() throws Exception { + public void testPartitionedSerialPropagationWithParallelThreads() { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); createCacheInVMs(nyPort, vm2, vm3); createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); @@ -371,4 +379,112 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); } + + @Test + public void testPartitionedSerialPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> setNumDispatcherThreadsForTheRun(1)); + vm5.invoke(() -> setNumDispatcherThreadsForTheRun(1)); + vm6.invoke(() -> setNumDispatcherThreadsForTheRun(1)); + vm7.invoke(() -> setNumDispatcherThreadsForTheRun(1)); + + vm4.invoke( + () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true, true)); + vm5.invoke( + () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true, true)); + vm6.invoke( + () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true, true)); + vm7.invoke( + () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true, true)); + + + vm4.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm5.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm6.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm7.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap())); + vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap())); + + int customers = 4; + + int transactionsPerCustomer = 1000; + final Map<Object, Object> keyValuesInTransactions = new HashMap<>(); + for (int custId = 0; custId < customers; custId++) { + for (int i = 0; i < transactionsPerCustomer; i++) { + CustId custIdObject = new CustId(custId); + OrderId orderId = new OrderId(i, custIdObject); + ShipmentId shipmentId1 = new ShipmentId(i, orderId); + ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId); + ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId); + keyValuesInTransactions.put(orderId, new Order()); + keyValuesInTransactions.put(shipmentId1, new Shipment()); + keyValuesInTransactions.put(shipmentId2, new Shipment()); + keyValuesInTransactions.put(shipmentId3, new Shipment()); + } + } + + int ordersPerCustomerNotInTransactions = 1000; + + final Map<Object, Object> keyValuesNotInTransactions = new HashMap<>(); + for (int custId = 0; custId < customers; custId++) { + for (int i = 0; i < ordersPerCustomerNotInTransactions; i++) { + CustId custIdObject = new CustId(custId); + OrderId orderId = new OrderId(i + transactionsPerCustomer * customers, custIdObject); + keyValuesNotInTransactions.put(orderId, new Order()); + } + } + + // eventsPerTransaction is 1 (orders) + 3 (shipments) + int eventsPerTransaction = 4; + AsyncInvocation<Void> inv1 = + vm7.invokeAsync( + () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions, + eventsPerTransaction)); + + AsyncInvocation<Void> inv2 = + vm6.invokeAsync( + () -> WANTestBase.putGivenKeyValue(orderRegionName, keyValuesNotInTransactions)); + + inv1.await(); + inv2.await(); + + int entries = + ordersPerCustomerNotInTransactions * customers + transactionsPerCustomer * customers; + + vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries)); + vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries)); + vm6.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries)); + vm7.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries)); + vm3.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries)); + + vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0)); + vm5.invoke(() -> WANTestBase.checkConflatedStats("ln", 0)); + vm6.invoke(() -> WANTestBase.checkConflatedStats("ln", 0)); + vm7.invoke(() -> WANTestBase.checkConflatedStats("ln", 0)); + + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } }