[ 
https://issues.apache.org/jira/browse/GEODE-8971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17297420#comment-17297420
 ] 

ASF GitHub Bot commented on GEODE-8971:
---------------------------------------

albertogpz commented on a change in pull request #6052:
URL: https://github.com/apache/geode/pull/6052#discussion_r589476132



##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void 
testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void 
testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new 
LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> 
WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after stop: " + batchesReceived);
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
-1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
-1));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int shipmentRegionSize = vm2.invoke(() -> 
getRegionSize(shipmentRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+      if (shipmentRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("orderRegionSize: " + shipmentRegionSize);
+    System.out.println("shipmentRegionSize: " + shipmentRegionSize);
+
+    System.out.println("v4List.get(0): " + v4List.get(0));
+    System.out.println("v5List.get(0): " + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    // Only complete transactions (1 order + 10 shipments) must be replicated
+    assertEquals(10, shipmentRegionSize / orderRegionSize);

Review comment:
       Indeed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batches with incomplete transactions when stopping the gateway sender
> ---------------------------------------------------------------------
>
>                 Key: GEODE-8971
>                 URL: https://issues.apache.org/jira/browse/GEODE-8971
>             Project: Geode
>          Issue Type: Improvement
>          Components: wan
>    Affects Versions: 1.14.0
>            Reporter: Alberto Gomez
>            Assignee: Alberto Gomez
>            Priority: Major
>              Labels: pull-request-available
>
> When the gateway sender is stopped there is a high probability that batches 
> with incomplete transactions are sent even if group-transaction-events is 
> enabled.
> The reason is that once the stop command reaches the gateway sender, it 
> immediately stops queueing events, and this could happen in the middle of 
> receiving events for the same transaction. If this is the case, some events 
> for the transaction may have reached the queue right before the stop command 
> was received and the rest of events for that transaction would not make it to 
> the queue (they would be dropped) because they arrived right after the stop 
> command was received at the gateway sender.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to