[ 
https://issues.apache.org/jira/browse/GEODE-7971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104387#comment-17104387
 ] 

ASF GitHub Bot commented on GEODE-7971:
---------------------------------------

albertogpz commented on a change in pull request #4928:
URL: https://github.com/apache/geode/pull/4928#discussion_r422994223



##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -299,19 +312,316 @@ public void 
testPartitionedRegionParallelPropagation_AfterDispatch_NoRedundancy(
         (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
 
     assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + 
v7List.get(0)); // queue size
-    assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + 
v7List.get(1)); // eventsReceived
+    assertEquals(NUM_PUTS,
+        v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // 
eventsReceived
     assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + 
v7List.get(2)); // events
-                                                                               
            // queued
+    // queued
     assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + 
v7List.get(3)); // events
-                                                                               
            // distributed
+    // distributed
     assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) 
>= 10); // batches
-                                                                               
      // distributed
+    // distributed
     assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + 
v7List.get(5)); // batches
-                                                                               
     // redistributed
+    // redistributed
 
     vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, 
NUM_PUTS));
   }
 
+  @Test
+  public void 
testPartitionedRegionParallelPropagation_NoGroupTransactionEvents()
+      throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    int batchTimeInterval = 10000;
+    createSenders(lnPort, false, batchTimeInterval);
+
+    createReceiverCustomerOrderShipmentPR(vm2, 0);
+
+    createSenderCustomerOrderShipmentPRs(0);
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    final Map custKeyValue = new HashMap();
+    int intCustId = 1;
+    CustId custId = new CustId(intCustId);
+    custKeyValue.put(custId, new Customer());
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, 
custKeyValue));
+
+    int transactions = 3;
+    final Map keyValues = new HashMap();
+    for (int i = 0; i < transactions; i++) {
+      OrderId orderId = new OrderId(i, custId);
+      ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+      ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+      ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+      keyValues.put(orderId, new Order());
+      keyValues.put(shipmentId1, new Shipment());
+      keyValues.put(shipmentId2, new Shipment());
+      keyValues.put(shipmentId3, new Shipment());
+    }
+    int eventsPerTransaction = 4;
+    vm4.invoke(() -> 
WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues,
+        eventsPerTransaction));
+
+    int entries = (transactions * eventsPerTransaction) + 1;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
transactions));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, 
transactions * 3));
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + 
v7List.get(0)); // queue size
+    assertEquals(entries,
+        v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // 
eventsReceived
+    assertEquals(entries, v4List.get(2) + v5List.get(2) + v6List.get(2) + 
v7List.get(2)); // events
+    // queued
+    assertEquals(entries, v4List.get(3) + v5List.get(3) + v6List.get(3) + 
v7List.get(3)); // events
+    // distributed
+    assertEquals(2, v4List.get(4) + v5List.get(4) + v6List.get(4) + 
v7List.get(4)); // batches
+    // distributed
+    assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + 
v7List.get(5)); // batches
+    // redistributed
+  }
+
+  @Test
+  public void testPartitionedRegionParallelPropagation_GroupTransactionEvents()
+      throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    int batchTimeInterval = 10000;
+    createSenders(lnPort, true, batchTimeInterval);
+
+    createReceiverCustomerOrderShipmentPR(vm2, 0);
+
+    createSenderCustomerOrderShipmentPRs(0);
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+
+    final Map custKeyValue = new HashMap();
+    int intCustId = 1;
+    CustId custId = new CustId(intCustId);
+    custKeyValue.put(custId, new Customer());
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, 
custKeyValue));
+
+    int transactions = 3;
+    final Map keyValues = new HashMap();
+    for (int i = 0; i < transactions; i++) {
+      OrderId orderId = new OrderId(i, custId);
+      ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+      ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+      ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+      keyValues.put(orderId, new Order());
+      keyValues.put(shipmentId1, new Shipment());
+      keyValues.put(shipmentId2, new Shipment());
+      keyValues.put(shipmentId3, new Shipment());
+    }
+
+    // 3 transactions of 4 events each are sent so that the batch would
+    // initially contain the first 2 transactions complete and the first
+    // 2 events of the last transaction (10 entries).
+    // As --group-transaction-events is configured in the senders, the 
remaining
+    // 2 events of the last transaction are added to the batch which makes
+    // that only one batch of 12 events is sent.
+    int eventsPerTransaction = 4;
+    vm4.invoke(() -> 
WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues,
+        eventsPerTransaction));
+
+    int entries = (transactions * eventsPerTransaction) + 1;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
transactions));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, 
transactions * 3));
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + 
v7List.get(0)); // queue size
+    assertEquals(entries,
+        v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // 
eventsReceived
+    assertEquals(entries, v4List.get(2) + v5List.get(2) + v6List.get(2) + 
v7List.get(2)); // events
+    // queued
+    assertEquals(entries, v4List.get(3) + v5List.get(3) + v6List.get(3) + 
v7List.get(3)); // events
+    // distributed
+    assertEquals(1, v4List.get(4) + v5List.get(4) + v6List.get(4) + 
v7List.get(4)); // batches
+    // distributed
+    assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + 
v7List.get(5)); // batches
+    // redistributed
+  }
+
+  @Test
+  public void 
testPartitionedRegionParallelPropagationBatchRedistributed_NoGroupTransactionEvents()
+      throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    int batchTimeInterval = 10000;
+    createSenders(lnPort, false, batchTimeInterval);
+
+    createSenderCustomerOrderShipmentPRs(0);
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    final Map custKeyValue = new HashMap();
+    int intCustId = 1;
+    CustId custId = new CustId(intCustId);
+    custKeyValue.put(custId, new Customer());
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, 
custKeyValue));
+
+    int transactions = 6;
+    final Map keyValues = new HashMap();
+    for (int i = 0; i < transactions; i++) {
+      OrderId orderId = new OrderId(i, custId);
+      ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+      ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+      ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+      keyValues.put(orderId, new Order());
+      keyValues.put(shipmentId1, new Shipment());
+      keyValues.put(shipmentId2, new Shipment());
+      keyValues.put(shipmentId3, new Shipment());
+    }
+    int eventsPerTransaction = 4;
+    vm4.invoke(() -> 
WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues,
+        eventsPerTransaction));
+
+    int entries = (transactions * eventsPerTransaction) + 1;
+
+    // The receiver is started later in order for the batch to be 
redistributed (sent again)
+    Thread.sleep(2000);
+    createReceiverCustomerOrderShipmentPR(vm2, 0);
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
transactions));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, 
transactions * 3));
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + 
v7List.get(0)); // queue size
+    assertEquals(entries,
+        v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // 
eventsReceived
+    assertEquals(entries, v4List.get(2) + v5List.get(2) + v6List.get(2) + 
v7List.get(2)); // events
+    // queued
+    assertEquals(entries, v4List.get(3) + v5List.get(3) + v6List.get(3) + 
v7List.get(3)); // events
+    // distributed
+    assertEquals(3, v4List.get(4) + v5List.get(4) + v6List.get(4) + 
v7List.get(4)); // batches
+    // distributed
+    assertTrue("Batch was not redistributed",
+        (v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)) > 0); 
// batches
+    // redistributed
+  }
+
+  @Test
+  public void 
testPartitionedRegionParallelPropagationBatchRedistributed_GroupTransactionEvents()
+      throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+
+    int batchTimeInterval = 10000;
+    createSenders(lnPort, true, batchTimeInterval);
+
+    createReceiverCustomerOrderShipmentPR(vm2, 0);
+
+    createSenderCustomerOrderShipmentPRs(0);
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+
+    final Map custKeyValue = new HashMap();
+    int intCustId = 1;
+    CustId custId = new CustId(intCustId);
+    custKeyValue.put(custId, new Customer());
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, 
custKeyValue));
+
+    int transactions = 6;
+    final Map keyValues = new HashMap();
+    for (int i = 0; i < transactions; i++) {
+      OrderId orderId = new OrderId(i, custId);
+      ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+      ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+      ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+      keyValues.put(orderId, new Order());
+      keyValues.put(shipmentId1, new Shipment());
+      keyValues.put(shipmentId2, new Shipment());
+      keyValues.put(shipmentId3, new Shipment());
+    }
+
+    // 6 transactions of 4 events each are sent so that the first batch
+    // would initially contain the first 2 transactions complete and the first
+    // 2 events of the next transaction (10 entries).
+    // As --group-transaction-events is configured in the senders, the 
remaining
+    // 2 events of the second transaction are added to the batch which makes
+    // that the first batch is sent with 12 events. The same happens with the
+    // second batch which will contain 12 events too.
+    int eventsPerTransaction = 4;
+    vm4.invoke(() -> 
WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues,
+        eventsPerTransaction));
+
+    int entries = (transactions * eventsPerTransaction) + 1;
+
+    // The receiver is started later in order for the batch to be 
redistributed (sent again)
+    Thread.sleep(2000);

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


> Gateway sender to deliver transaction events atomically to gateway receivers
> ----------------------------------------------------------------------------
>
>                 Key: GEODE-7971
>                 URL: https://issues.apache.org/jira/browse/GEODE-7971
>             Project: Geode
>          Issue Type: Improvement
>          Components: wan
>            Reporter: Alberto Gomez
>            Assignee: Alberto Gomez
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> The goal of this ticket is to implement the necessary changes in the gateway 
> sender to prevent that events belonging to the same transaction are spread 
> across different batches. In other words, to ensure that events from the same 
> transaction are sent inside the same batch.
> This will be an optional feature on gateway senders to be enabled via a new 
> parameter (--group-transaction-events) and will be restricted to serial 
> gateway senders with just one dispatcher thread or to parallel gateway 
> senders.
> Apart from the above restriction, grouping of events for a transaction inside 
> the same batch may only be attained if the regions to which the events belong 
> are replicated by the same set of gateway senders with the 
> --group-transaction-events flag enabled. If this condition is not met, the 
> events will be correctly delivered by the gateway senders but it will not be 
> guaranteed that all events will always be sent inside the same batch.
> For more details see: 
> [https://cwiki.apache.org/confluence/display/GEODE/Gw+sender+to+deliver+transaction+events+atomically+to+receivers]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to