kirklund commented on a change in pull request #7083:
URL: https://github.com/apache/geode/pull/7083#discussion_r758764625



##########
File path: 
geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/cli/commands/CreateTxGroupingGatewaySenderDUnitTest.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.wan.txgrouping.cli.commands;
+
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+
+import java.util.Properties;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.assertions.CommandResultAssert;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+public class CreateTxGroupingGatewaySenderDUnitTest {
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  @Rule
+  public GfshCommandRule gfsh = new GfshCommandRule();
+
+  MemberVM locator;

Review comment:
       Please use `private`

##########
File path: 
geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingBaseDUnitTest.java
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.txgrouping;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
+import static 
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.CacheTransactionManager;
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionException;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.cache.wan.GatewayReceiverFactory;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.GatewaySenderFactory;
+import org.apache.geode.distributed.LocatorLauncher;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.CustomerIDPartitionResolver;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.execute.data.OrderId;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
+import org.apache.geode.internal.cache.wan.InternalGatewaySenderFactory;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import org.apache.geode.test.dunit.Invoke;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.categories.WanTest;
+import 
org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.runners.GeodeParamsRunner;
+
+@Category({WanTest.class})
+@RunWith(GeodeParamsRunner.class)
+public class TxGroupingBaseDUnitTest implements Serializable {
+
+  protected static final String REGION_NAME = "TheRegion";
+
+  protected final String shipmentRegionName = "ShipmentsRegion";
+  protected final String customerRegionName = "CustomersRegion";
+  protected final String orderRegionName = "OrdersRegion";
+
+  protected static LocatorLauncher locatorLauncher;
+  protected static ServerLauncher serverLauncher;
+
+  protected VM londonLocatorVM;
+  protected VM newYorkLocatorVM;
+  protected VM newYorkServerVM;
+  protected VM londonServer1VM;
+  protected VM londonServer2VM;
+  protected VM londonServer3VM;
+  protected VM londonServer4VM;
+  protected VM[] londonServersVM;
+
+  protected String newYorkName;
+
+  protected int londonId;
+  protected int newYorkId;
+
+  protected int londonLocatorPort;
+  protected int newYorkLocatorPort;
+
+  protected int newYorkReceiverPort;
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new 
SerializableTemporaryFolder();
+
+  private static List<Integer> dispatcherThreads = new 
ArrayList<>(Arrays.asList(1, 3, 5));
+  // this will be set for each test method run with one of the values from 
above list
+  private static int numDispatcherThreadsForTheRun = 1;
+
+  @Before
+  public void setUp() {
+    londonLocatorVM = getVM(0);
+    newYorkLocatorVM = getVM(1);
+    newYorkServerVM = getVM(2);
+    londonServer1VM = getVM(3);
+    londonServer2VM = getVM(4);
+    londonServer3VM = getVM(5);
+    londonServer4VM = getVM(6);
+    londonServersVM = new VM[] {londonServer1VM, londonServer2VM, 
londonServer3VM, londonServer4VM};
+
+    newYorkName = "ny";
+
+    londonId = 1;
+    newYorkId = 2;
+
+    int[] ports = getRandomAvailableTCPPorts(3);
+    londonLocatorPort = ports[0];
+    newYorkLocatorPort = ports[1];
+    newYorkReceiverPort = ports[2];
+
+    newYorkLocatorVM.invoke("start New York locator", () -> {
+      Properties config = createLocatorConfig(newYorkId, newYorkLocatorPort, 
londonLocatorPort);
+      cacheRule.createCache(config);
+    });
+
+    londonLocatorVM.invoke("start London locator", () -> {
+      Properties config = createLocatorConfig(londonId, londonLocatorPort, 
newYorkLocatorPort);
+      cacheRule.createCache(config);
+    });
+    Collections.shuffle(dispatcherThreads);
+    int dispatcherThreadsNo = dispatcherThreads.get(0);
+    Invoke.invokeInEveryVM(() -> 
setNumDispatcherThreadsForTheRun(dispatcherThreadsNo));
+
+  }
+
+  @After
+  public void tearDown() {
+    newYorkServerVM.invoke(() -> {
+      if (serverLauncher != null) {
+        serverLauncher.stop();
+        serverLauncher = null;
+      }
+    });
+
+    for (VM server : londonServersVM) {
+      server.invoke(() -> {
+        if (serverLauncher != null) {
+          serverLauncher.stop();
+          serverLauncher = null;
+        }
+      });
+    }
+
+    newYorkLocatorVM.invoke(() -> {
+      if (locatorLauncher != null) {
+        locatorLauncher.stop();
+        locatorLauncher = null;
+      }
+    });
+
+    londonLocatorVM.invoke(() -> {
+      if (locatorLauncher != null) {
+        locatorLauncher.stop();
+        locatorLauncher = null;
+      }
+    });
+  }
+
+  protected Properties createLocatorConfig(int systemId, int locatorPort, int 
remoteLocatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(DISTRIBUTED_SYSTEM_ID, String.valueOf(systemId));
+    config.setProperty(LOCATORS, "localhost[" + locatorPort + ']');
+    config.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocatorPort + 
']');
+    config.setProperty(START_LOCATOR,
+        "localhost[" + locatorPort + 
"],server=true,peer=true,hostname-for-clients=localhost");
+    return config;
+  }
+
+  protected void startServerWithSender(int systemId, int locatorPort, int 
remoteSystemId,
+      String remoteName, boolean isParallel, boolean groupTransactionEvents, 
int batchSize)
+      throws IOException {
+    startServerWithSender(systemId, locatorPort, remoteSystemId, remoteName, 
isParallel,
+        groupTransactionEvents, batchSize, 0);
+  }
+
+  protected void startServerWithSender(int systemId, int locatorPort, int 
remoteSystemId,
+      String remoteName, boolean isParallel, boolean groupTransactionEvents, 
int batchSize,
+      int dispatcherThreads) throws IOException {
+    cacheRule.createCache(createServerConfig(locatorPort));
+
+    String uniqueName = "server-" + systemId;
+    File[] dirs = new File[] {temporaryFolder.newFolder(uniqueName)};
+
+    GatewaySenderFactory senderFactory = createGatewaySenderFactory(dirs, 
uniqueName);
+    senderFactory.setParallel(isParallel);
+    senderFactory.setGroupTransactionEvents(groupTransactionEvents);
+    senderFactory.setBatchSize(batchSize);
+    if (dispatcherThreads > 0) {
+      senderFactory.setDispatcherThreads(dispatcherThreads);
+    }
+    GatewaySender sender = senderFactory.create(remoteName, remoteSystemId);
+    sender.start();
+  }
+
+  protected void startServerWithReceiver(int locatorPort,
+      int receiverPort) throws IOException {
+    startServerWithReceiver(locatorPort, receiverPort, true);
+  }
+
+  protected void startServerWithReceiver(int locatorPort,
+      int receiverPort, boolean start) throws IOException {
+    cacheRule.createCache(createServerConfig(locatorPort));
+
+    GatewayReceiverFactory receiverFactory = 
createGatewayReceiverFactory(receiverPort);
+    GatewayReceiver receiver = receiverFactory.create();
+    if (start) {
+      receiver.start();
+    }
+  }
+
+  protected void startReceiver() throws IOException {
+    cacheRule.getCache().getGatewayReceivers().iterator().next().start();
+  }
+
+  protected GatewayReceiverFactory createGatewayReceiverFactory(int 
receiverPort) {
+    GatewayReceiverFactory receiverFactory = 
cacheRule.getCache().createGatewayReceiverFactory();
+
+    receiverFactory.setStartPort(receiverPort);
+    receiverFactory.setEndPort(receiverPort);
+    receiverFactory.setManualStart(true);
+    return receiverFactory;
+  }
+
+  protected Properties createServerConfig(int locatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(LOCATORS, "localhost[" + locatorPort + ']');
+    return config;
+  }
+
+  protected GatewaySenderFactory createGatewaySenderFactory(File[] dirs, 
String diskStoreName) {
+    InternalGatewaySenderFactory senderFactory =
+        (InternalGatewaySenderFactory) 
cacheRule.getCache().createGatewaySenderFactory();
+
+    senderFactory.setMaximumQueueMemory(100);
+    senderFactory.setBatchSize(10);
+    senderFactory.setBatchConflationEnabled(false);
+    senderFactory.setManualStart(true);
+    senderFactory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    senderFactory.setOrderPolicy(GatewaySender.DEFAULT_ORDER_POLICY);
+
+    DiskStoreFactory dsf = cacheRule.getCache().createDiskStoreFactory();
+    DiskStore store = dsf.setDiskDirs(dirs).create(diskStoreName);
+    senderFactory.setDiskStoreName(store.getName());
+
+    return senderFactory;
+  }
+
+  protected boolean isRunning(GatewaySender sender) {
+    return sender != null && sender.isRunning();
+  }
+
+  protected void validateRegionSize(String regionName, final int regionSize) {
+    final Region<Object, Object> r = cacheRule.getCache().getRegion(SEPARATOR 
+ regionName);
+    assertNotNull(r);
+    if (regionSize != r.keySet().size()) {
+      await().untilAsserted(() -> 
assertThat(r.keySet().size()).isEqualTo(regionSize));
+    }

Review comment:
       I would delete the if-statement and just use the await. The if statement 
is redundant... the await will also immediately complete successfully if the 
if-statement is true.

##########
File path: 
geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/parallel/TxGroupingParallelGatewaySenderQueue.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.wan.internal.txgrouping.parallel;
+
+import static 
org.apache.geode.cache.wan.GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.jetbrains.annotations.NotNull;
+
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.internal.cache.BucketRegionQueue;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import 
org.apache.geode.internal.cache.wan.parallel.BucketRegionQueueUnavailableException;
+import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
+
+public class TxGroupingParallelGatewaySenderQueue extends 
ParallelGatewaySenderQueue {
+
+  public TxGroupingParallelGatewaySenderQueue(
+      final @NotNull AbstractGatewaySender sender,
+      final @NotNull Set<Region<?, ?>> userRegions,
+      final int idx, final int nDispatcher, final boolean cleanQueues) {
+    super(sender, userRegions, idx, nDispatcher, cleanQueues);
+  }
+
+  @Override
+  protected void postProcessBatch(final @NotNull PartitionedRegion 
partitionedRegion,
+      final @NotNull List<GatewaySenderEventImpl> batch) {
+    if (batch.isEmpty()) {
+      return;
+    }
+
+    final Map<TransactionId, Integer> incompleteTransactionIdsInBatch =
+        getIncompleteTransactionsInBatch(batch);
+    if (incompleteTransactionIdsInBatch.isEmpty()) {
+      return;
+    }
+
+    int retries = 0;
+    while (true) {
+      for (Iterator<Map.Entry<TransactionId, Integer>> iter =
+          incompleteTransactionIdsInBatch.entrySet().iterator(); 
iter.hasNext();) {
+        Map.Entry<TransactionId, Integer> pendingTransaction = iter.next();
+        TransactionId transactionId = pendingTransaction.getKey();
+        int bucketId = pendingTransaction.getValue();
+        List<Object> events =
+            peekEventsWithTransactionId(partitionedRegion, bucketId, 
transactionId);
+        for (Object object : events) {

Review comment:
       Try to extract any nested loops to their own smaller methods. It helps 
keep things better organized and more readable.

##########
File path: 
geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/CommonTxGroupingGatewaySenderFactory.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.wan.internal.txgrouping;
+
+import static java.lang.String.format;
+
+import org.jetbrains.annotations.NotNull;
+
+import org.apache.geode.cache.wan.internal.GatewaySenderTypeFactory;
+import org.apache.geode.internal.cache.wan.GatewaySenderException;
+import org.apache.geode.internal.cache.wan.MutableGatewaySenderAttributes;
+
+public abstract class CommonTxGroupingGatewaySenderFactory {

Review comment:
       I think you should make this an interface instead of an abstract class. 
The `static` validator method would then become a `default` implementation 
which would probably only ever be overridden by a test.

##########
File path: 
geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/parallel/TxGroupingParallelDUnitTest.java
##########
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.txgrouping.parallel;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junitparams.Parameters;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.execute.data.CustId;
+import org.apache.geode.internal.cache.execute.data.Customer;
+import org.apache.geode.internal.cache.execute.data.Order;
+import org.apache.geode.internal.cache.execute.data.OrderId;
+import org.apache.geode.internal.cache.execute.data.Shipment;
+import org.apache.geode.internal.cache.execute.data.ShipmentId;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.txgrouping.TxGroupingBaseDUnitTest;
+import org.apache.geode.internal.util.ArrayUtils;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.runners.GeodeParamsRunner;
+
+@Category({WanTest.class})
+@RunWith(GeodeParamsRunner.class)
+public class TxGroupingParallelDUnitTest extends TxGroupingBaseDUnitTest {
+  @Test
+  @Parameters({"true", "false"})
+  public void testPRParallelPropagationWithVsWithoutGroupTransactionEvents(
+      boolean groupTransactionEvents) {
+    newYorkServerVM.invoke("create New York server", () -> {
+      startServerWithReceiver(newYorkLocatorPort, newYorkReceiverPort);
+      createCustomerOrderShipmentPartitionedRegion(null);
+    });
+
+    for (VM server : londonServersVM) {
+      server.invoke("create London server " + server.getId(), () -> {
+        startServerWithSender(server.getId(), londonLocatorPort, newYorkId, 
newYorkName, true,
+            groupTransactionEvents,
+            10);
+        createCustomerOrderShipmentPartitionedRegion(newYorkName);
+        GatewaySender sender = 
cacheRule.getCache().getGatewaySender(newYorkName);
+        await().untilAsserted(() -> assertThat(isRunning(sender)).isTrue());
+      });
+    }
+
+    final Map<Object, Object> custKeyValue = new HashMap<>();
+    int intCustId = 1;
+    CustId custId = new CustId(intCustId);
+    custKeyValue.put(custId, new Customer());
+    londonServer1VM.invoke(() -> putGivenKeyValues(customerRegionName, 
custKeyValue));
+
+    int transactions = 3;
+    final Map<Object, Object> keyValues = new HashMap<>();
+    for (int i = 0; i < transactions; i++) {
+      OrderId orderId = new OrderId(i, custId);
+      ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+      ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+      ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+      keyValues.put(orderId, new Order());
+      keyValues.put(shipmentId1, new Shipment());
+      keyValues.put(shipmentId2, new Shipment());
+      keyValues.put(shipmentId3, new Shipment());
+    }
+
+    // 3 transactions of 4 events each are sent so that the batch would
+    // initially contain the first 2 transactions complete and the first
+    // 2 events of the last transaction (10 entries).
+    // If --group-transaction-events is configured in the senders, the 
remaining
+    // 2 events of the last transaction are added to the batch which makes
+    // that only one batch of 12 events is sent.
+    // If --group-transaction-events is not configured in the senders, the
+    // remaining 2 events of the last transaction are added to the second batch
+    // which makes that 2 batches will be sent, one with 10 events and
+    // one with 2.
+    int eventsPerTransaction = 4;
+    londonServer1VM.invoke(() -> 
doOrderAndShipmentPutsInsideTransactions(keyValues,
+        eventsPerTransaction));
+
+    int entries = (transactions * eventsPerTransaction) + 1;
+
+    londonServer1VM.invoke(() -> validateRegionSize(customerRegionName, 1));
+    londonServer1VM.invoke(() -> validateRegionSize(orderRegionName, 
transactions));
+    londonServer1VM.invoke(() -> validateRegionSize(shipmentRegionName, 
transactions * 3));
+
+    List<Integer> senderStatsLondonServers = getSenderStats(newYorkName, 0, 
londonServersVM);
+
+    int expectedBatchesSent = groupTransactionEvents ? 1 : 2;
+    // queue size:
+    assertThat(senderStatsLondonServers.get(0)).isEqualTo(0);
+    // eventsReceived:
+    assertThat(senderStatsLondonServers.get(1)).isEqualTo(entries);
+    // events queued:
+    assertThat(senderStatsLondonServers.get(2)).isEqualTo(entries);
+    // events distributed:
+    assertThat(senderStatsLondonServers.get(3)).isEqualTo(entries);
+    // batches distributed:
+    assertThat(senderStatsLondonServers.get(4)).isEqualTo(expectedBatchesSent);
+    // batches redistributed:
+    assertThat(senderStatsLondonServers.get(5)).isEqualTo(0);
+    // events not queued conflated:
+    assertThat(senderStatsLondonServers.get(7)).isEqualTo(0);
+    // batches with incomplete transactions:
+    assertThat(senderStatsLondonServers.get(13)).isEqualTo(0);
+  }
+
+  @Test
+  @Parameters({"true", "false"})
+  public void 
testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithCompleteTransactions_SeveralClients(
+      boolean isBatchesRedistributed) {
+    newYorkServerVM.invoke("create New York server", () -> {
+      startServerWithReceiver(newYorkLocatorPort, newYorkReceiverPort, 
!isBatchesRedistributed);
+      createCustomerOrderShipmentPartitionedRegion(null);
+    });
+
+    int batchSize = 10;
+    for (VM server : londonServersVM) {
+      server.invoke("create London server " + server.getId(), () -> {
+        startServerWithSender(server.getId(), londonLocatorPort, newYorkId, 
newYorkName, true, true,
+            batchSize);
+        createCustomerOrderShipmentPartitionedRegion(newYorkName);
+        GatewaySender sender = 
cacheRule.getCache().getGatewaySender(newYorkName);
+        await().untilAsserted(() -> assertThat(isRunning(sender)).isTrue());
+      });
+    }
+
+    int clients = 4;
+    int transactions = 300;
+    // batchSize is 10. Each transaction will contain 1 order + 3 shipments = 
4 events.
+    // As a result, all batches will contain extra events to complete the
+    // transactions it will deliver.
+    int shipmentsPerTransaction = 3;
+
+    final List<Map<Object, Object>> customerData = new ArrayList<>(clients);
+    for (int intCustId = 0; intCustId < clients; intCustId++) {
+      final Map<Object, Object> custKeyValue = new HashMap<>();
+      CustId custId = new CustId(intCustId);
+      custKeyValue.put(custId, new Customer());
+      customerData.add(new HashMap<>());
+      londonServer1VM.invoke(() -> putGivenKeyValues(customerRegionName, 
custKeyValue));
+
+      for (int i = 0; i < transactions; i++) {
+        OrderId orderId = new OrderId(i, custId);
+        customerData.get(intCustId).put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          customerData.get(intCustId).put(new ShipmentId(i + j, orderId), new 
Shipment());
+        }
+      }
+    }
+
+    List<AsyncInvocation<?>> asyncInvocations = new ArrayList<>(clients);
+
+    int eventsPerTransaction = shipmentsPerTransaction + 1;
+    for (int i = 0; i < clients; i++) {
+      final int intCustId = i;
+      AsyncInvocation<?> asyncInvocation =
+          londonServer1VM.invokeAsync(() -> 
doOrderAndShipmentPutsInsideTransactions(
+              customerData.get(intCustId),
+              eventsPerTransaction));
+      asyncInvocations.add(asyncInvocation);
+    }
+
+    try {
+      for (AsyncInvocation<?> asyncInvocation : asyncInvocations) {
+        asyncInvocation.await();
+      }
+    } catch (InterruptedException e) {
+      fail("Interrupted");
+    }
+
+    londonServer1VM.invoke(() -> validateRegionSize(customerRegionName, 
clients));
+    londonServer1VM.invoke(() -> validateRegionSize(orderRegionName, 
transactions * clients));
+    londonServer1VM.invoke(() -> validateRegionSize(shipmentRegionName,
+        transactions * shipmentsPerTransaction * clients));
+
+    if (isBatchesRedistributed) {
+      // wait for batches to be redistributed and then start the receiver
+      londonServer1VM.invoke(() -> await()
+          .until(() -> getSenderStats(newYorkName, -1).get(5) > 0));
+      newYorkServerVM.invoke("start New York receiver", this::startReceiver);
+    }
+
+    // Check that all entries have been written in the receiver
+    newYorkServerVM.invoke(
+        () -> validateRegionSize(customerRegionName, clients));
+    newYorkServerVM.invoke(
+        () -> validateRegionSize(orderRegionName, transactions * clients));
+    newYorkServerVM.invoke(
+        () -> validateRegionSize(shipmentRegionName,
+            shipmentsPerTransaction * transactions * clients));
+
+    checkQueuesAreEmptyAndOnlyCompleteTransactionsAreReplicated(newYorkName,
+        isBatchesRedistributed);
+  }
+
+  @Test
+  public void 
testPRParallelPropagationWithGroupTransactionEventsWithIncompleteTransactionsWhenTransactionEntriesOnNotColocatedBuckets()
 {
+    newYorkServerVM.invoke("create New York server", () -> {
+      startServerWithReceiver(newYorkLocatorPort, newYorkReceiverPort);
+      createPartitionedRegion(REGION_NAME, null);
+    });
+
+    int dispatcherThreads = 2;
+    londonServer1VM.invoke("create London server " + londonServer1VM.getId(), 
() -> {
+      startServerWithSender(londonServer1VM.getId(), londonLocatorPort, 
newYorkId, newYorkName,
+          true, true, 10, dispatcherThreads);
+      createPartitionedRegion(REGION_NAME, newYorkName);
+      GatewaySender sender = 
cacheRule.getCache().getGatewaySender(newYorkName);
+      await().untilAsserted(() -> assertThat(isRunning(sender)).isTrue());
+    });
+
+    // Adding events in transactions
+    // Transactions will contain objects assigned to different buckets but 
given that there is only
+    // one server, there will be no TransactionDataNotCollocatedException.
+    // With this and by using more than one dispatcher thread, we will provoke 
that
+    // it will be impossible for the batches to have complete transactions as 
some
+    // events for a transaction will be handled by one dispatcher thread and 
some other events by
+    // another thread.
+    final Map<Object, Object> keyValue = new HashMap<>();
+    int entries = 30;
+    for (int i = 0; i < entries; i++) {
+      keyValue.put(i, i);
+    }
+
+    int entriesPerTransaction = 3;
+    londonServer1VM
+        .invoke(() -> doPutsInsideTransactions(REGION_NAME, keyValue, 
entriesPerTransaction));
+
+    londonServer1VM.invoke(() -> validateRegionSize(REGION_NAME, entries));
+
+    ArrayList<Integer> senderStatsLondonServer1 =
+        (ArrayList<Integer>) londonServer1VM.invoke(() -> 
getSenderStats(newYorkName, 0));
+
+    // The number of batches will be 4 because each
+    // dispatcher thread (there are 2) will send half the number of entries,
+    // each on 2 batches.
+    int batches = 4;
+    // queue size:
+    assertThat(senderStatsLondonServer1.get(0)).isEqualTo(0);
+    // eventsReceived:
+    assertThat(senderStatsLondonServer1.get(1)).isEqualTo(entries);
+    // events queued:
+    assertThat(senderStatsLondonServer1.get(2)).isEqualTo(entries);
+    // events distributed:
+    assertThat(senderStatsLondonServer1.get(3)).isEqualTo(entries);
+    // batches distributed:
+    assertThat(senderStatsLondonServer1.get(4)).isEqualTo(batches);
+    // batches redistributed:
+    assertThat(senderStatsLondonServer1.get(5)).isEqualTo(0);
+    // events not queued conflated:
+    assertThat(senderStatsLondonServer1.get(7)).isEqualTo(0);
+    // batches with incomplete transactions
+    assertThat(senderStatsLondonServer1.get(13)).isEqualTo(batches);
+
+    newYorkServerVM.invoke(() -> checkGatewayReceiverStats(batches, entries, 
entries));
+  }
+
+  @Test
+  @Parameters({"true", "false"})
+  public void 
testPRParallelPropagationWithVsWithoutGroupTransactionEventsWithBatchRedistribution(
+      boolean groupTransactionEvents) {
+    londonServer1VM.invoke("create London server " + londonServer1VM.getId(), 
() -> {
+      startServerWithSender(londonServer1VM.getId(), londonLocatorPort, 
newYorkId, newYorkName,
+          true, groupTransactionEvents, 10);
+      createCustomerOrderShipmentPartitionedRegion(newYorkName);
+      GatewaySender sender = 
cacheRule.getCache().getGatewaySender(newYorkName);
+      await().untilAsserted(() -> assertThat(isRunning(sender)).isTrue());
+    });
+
+    newYorkServerVM.invoke("create New York server with receiver stopped", () 
-> {
+      startServerWithReceiver(newYorkLocatorPort, newYorkReceiverPort, false);
+      createCustomerOrderShipmentPartitionedRegion(null);
+    });
+
+    final Map<Object, Object> custKeyValue = new HashMap<>();
+    int intCustId = 1;
+    CustId custId = new CustId(intCustId);
+    custKeyValue.put(custId, new Customer());
+    londonServer1VM.invoke(() -> putGivenKeyValues(customerRegionName, 
custKeyValue));
+
+    int transactions = 6;
+    final Map<Object, Object> keyValues = new HashMap<>();
+    for (int i = 0; i < transactions; i++) {
+      OrderId orderId = new OrderId(i, custId);
+      ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+      ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+      ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+      keyValues.put(orderId, new Order());
+      keyValues.put(shipmentId1, new Shipment());
+      keyValues.put(shipmentId2, new Shipment());
+      keyValues.put(shipmentId3, new Shipment());
+    }
+
+    // 6 transactions of 4 events each are sent with batch size = 10
+    // - With group transaction events:
+    // The first batch would initially contain the first 2 transactions 
complete and the first
+    // 2 events of the next transaction (10 entries).
+    // As --group-transaction-events is configured in the senders, the 
remaining
+    // 2 events of the second transaction are added to the batch which makes
+    // the first batch to be sent with 12 events. The same happens with the
+    // second batch which will contain 12 events too.
+    // - Without group-transaction-events 3 batches will be sent. 2
+    // with 10 events and one with 4.
+    int expectedBatchesSent;
+    if (groupTransactionEvents) {
+      expectedBatchesSent = 2;
+    } else {
+      expectedBatchesSent = 3;
+    }
+    int eventsPerTransaction = 4;
+    londonServer1VM.invoke(() -> 
doOrderAndShipmentPutsInsideTransactions(keyValues,
+        eventsPerTransaction));
+
+    int entries = (transactions * eventsPerTransaction) + 1;
+
+    londonServer1VM.invoke(() -> validateRegionSize(customerRegionName, 1));
+    londonServer1VM.invoke(() -> validateRegionSize(orderRegionName, 
transactions));
+    londonServer1VM.invoke(() -> validateRegionSize(shipmentRegionName, 
transactions * 3));
+
+    // wait for batches to be redistributed and then start the receiver
+    londonServer1VM.invoke(() -> await()
+        .until(() -> getSenderStats(newYorkName, -1).get(5) > 0));
+
+    newYorkServerVM.invoke("Start New York receiver", this::startReceiver);
+
+    ArrayList<Integer> senderStatsLondonServer1 =
+        (ArrayList<Integer>) londonServer1VM.invoke(() -> 
getSenderStats(newYorkName, 0));
+
+    // queue size:
+    assertThat(senderStatsLondonServer1.get(0)).isEqualTo(0);
+    // events received:
+    assertThat(senderStatsLondonServer1.get(1)).isEqualTo(entries);
+    // events queued:
+    assertThat(senderStatsLondonServer1.get(2)).isEqualTo(entries);
+    // events distributed:
+    assertThat(senderStatsLondonServer1.get(3)).isEqualTo(entries);
+    // batches distributed:
+    assertThat(senderStatsLondonServer1.get(4)).isEqualTo(expectedBatchesSent);
+    // batches redistributed:
+    assertThat(senderStatsLondonServer1.get(5)).isGreaterThan(0);
+    // events not queued conflated:
+    assertThat(senderStatsLondonServer1.get(7)).isEqualTo(0);
+  }
+
+  @Test
+  public void testParallelPropagationHAWithGroupTransactionEvents() throws 
Exception {
+    newYorkServerVM.invoke("create New York server", () -> {
+      startServerWithReceiver(newYorkLocatorPort, newYorkReceiverPort);
+      createPartitionedRegion(REGION_NAME, null);
+    });
+
+    int batchSize = 9;
+    int redundantCopies = 3;
+    for (VM server : londonServersVM) {
+      server.invoke("create London server " + server.getId(), () -> {
+        startServerWithSender(server.getId(), londonLocatorPort, newYorkId, 
newYorkName, true, true,
+            batchSize, redundantCopies);
+        createPartitionedRegion(REGION_NAME, newYorkName, redundantCopies);
+        GatewaySender sender = 
cacheRule.getCache().getGatewaySender(newYorkName);
+        await().untilAsserted(() -> assertThat(isRunning(sender)).isTrue());
+      });
+    }
+
+    int putsPerTransaction = 2;
+    int transactions = 1000;
+    AsyncInvocation<Void> asyncPutInvocation =
+        londonServer2VM.invokeAsync(
+            () -> doTxPutsWithRetryIfError(REGION_NAME, putsPerTransaction, 
transactions, 0));
+
+    newYorkServerVM.invoke(() -> await()
+        .untilAsserted(() -> 
assertThat(getRegionSize(REGION_NAME)).isGreaterThan(40)));
+    AsyncInvocation<Void> killServerInvocation =
+        londonServer1VM.invokeAsync(() -> cacheRule.getCache().close());
+    asyncPutInvocation.await();
+    killServerInvocation.await();
+
+    int entries = transactions * putsPerTransaction;
+    newYorkServerVM
+        .invoke(() -> validateRegionSize(REGION_NAME, transactions * 
putsPerTransaction));
+
+    List<Integer> londonServerStats =
+        getSenderStats(newYorkName, 0, (VM[]) 
ArrayUtils.remove(londonServersVM, 0));
+
+    // queue size
+    assertThat(londonServerStats.get(0)).isEqualTo(0);
+
+    // eventsReceived
+    // We may see two retried events (as transactions are made of 2 events) on 
all members due to
+    // the kill
+    assertThat(londonServerStats.get(1)).isLessThanOrEqualTo((entries + 2) * 
redundantCopies);
+    assertThat(londonServerStats.get(1)).isGreaterThanOrEqualTo(entries * 
redundantCopies);
+
+    // queuedEvents
+    assertThat(londonServerStats.get(2)).isLessThanOrEqualTo((entries + 2) * 
redundantCopies);
+    assertThat(londonServerStats.get(2)).isGreaterThanOrEqualTo(entries * 
redundantCopies);
+
+    // batches redistributed
+    assertThat(londonServerStats.get(5)).isEqualTo(0);
+
+    // batchesReceived is equal to numberOfEntries/(batchSize+1)
+    // As transactions are 2 events long, for each batch it will always be 
necessary to
+    // add one more entry to the 9 events batch in order to have complete 
transactions in the batch.
+    int batchesReceived = (entries) / (batchSize + 1);
+    newYorkServerVM.invoke(() -> checkGatewayReceiverStatsHA(batchesReceived, 
entries, entries));
+  }
+
+  private void 
checkQueuesAreEmptyAndOnlyCompleteTransactionsAreReplicated(String senderId,
+      boolean isBatchesRedistributed) {
+    List<Integer> senderStatsLondonServers = getSenderStats(senderId, 0, 
londonServersVM);
+
+    // queue size:
+    assertThat(senderStatsLondonServers.get(0)).isEqualTo(0);
+    // batches redistributed:
+    int batchesRedistributed = senderStatsLondonServers.get(5);
+    if (isBatchesRedistributed) {
+      assertThat(batchesRedistributed).isGreaterThan(0);
+    } else {
+      assertThat(batchesRedistributed).isEqualTo(0);
+    }
+    // batches with incomplete transactions
+    assertThat(senderStatsLondonServers.get(13)).isEqualTo(0);
+
+    for (VM londonServer : londonServersVM) {
+      londonServer.invoke(() -> 
validateGatewaySenderQueueAllBucketsDrained(senderId));
+    }
+  }
+
+  protected void validateGatewaySenderQueueAllBucketsDrained(final String 
senderId) {
+    IgnoredException exp =
+        
IgnoredException.addIgnoredException(RegionDestroyedException.class.getName());
+    IgnoredException exp1 =
+        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+    try {
+      GatewaySender sender = getGatewaySender(senderId);
+      final AbstractGatewaySender abstractSender = (AbstractGatewaySender) 
sender;
+      await().untilAsserted(() -> 
assertThat(abstractSender.getEventQueueSize()).isEqualTo(0));
+      await().untilAsserted(
+          () -> 
assertThat(abstractSender.getSecondaryEventQueueSize()).isEqualTo(0));
+    } finally {
+      exp.remove();
+      exp1.remove();
+    }

Review comment:
       `IgnoredException` implements `AutoCloseable` so you can use it in 
try-with-resources syntax. Using import-static and `addIgnoredException(Class)` 
tidies it up nicely:
   ```
   import static 
org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
   
   try (IgnoredException ie1 = 
addIgnoredException(RegionDestroyedException.class);
        IgnoredException ie2 = 
addIgnoredException(ForceReattemptException.class)) {
     // ...
   }

##########
File path: 
geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/serial/TxGroupingSerialDUnitTest.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.txgrouping.serial;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junitparams.Parameters;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
+import org.apache.geode.internal.cache.wan.txgrouping.TxGroupingBaseDUnitTest;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.runners.GeodeParamsRunner;
+
+@Category({WanTest.class})
+@RunWith(GeodeParamsRunner.class)
+public class TxGroupingSerialDUnitTest extends TxGroupingBaseDUnitTest {
+  @Test
+  @Parameters({"true", "false"})
+  public void 
testReplicatedSerialPropagationWithVsWithoutGroupTransactionEvents(
+      boolean groupTransactionEvents) {
+    newYorkServerVM.invoke("create New York server", () -> {
+      startServerWithReceiver(newYorkLocatorPort, newYorkReceiverPort);
+      createReplicatedRegion(REGION_NAME, null);
+    });
+
+    for (VM server : londonServersVM) {
+      server.invoke("create London server " + server.getId(), () -> {
+        startServerWithSender(server.getId(), londonLocatorPort, newYorkId, 
newYorkName, false,
+            groupTransactionEvents, 10, 1);
+        createReplicatedRegion(REGION_NAME, newYorkName);
+        GatewaySender sender = 
cacheRule.getCache().getGatewaySender(newYorkName);
+        await().untilAsserted(() -> assertThat(isRunning(sender)).isTrue());
+      });
+    }
+
+    final Map<Object, Object> keyValues = new HashMap<>();
+    int entries = 12;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    // 4 transactions of 3 events each are sent so that the first batch
+    // would initially contain the first 3 transactions complete and the first
+    // event of the next transaction (10 entries).
+    // If --group-transaction-events is configured in the senders, the 
remaining
+    // events of the third transaction are added to the batch which makes
+    // that the batch is sent with 12 events.
+    // If --group-transaction-events is not configured in the senders, the 
remaining
+    // events of the third transaction are added to the next batch which makes
+    // that the 2 batches are sent. One with 10 events and another one
+    // with 2 events.
+    int expectedBatchesSent = groupTransactionEvents ? 1 : 2;
+    int eventsPerTransaction = 3;
+    londonServer2VM.invoke(() -> doPutsInsideTransactions(REGION_NAME, 
keyValues,
+        eventsPerTransaction));
+
+    newYorkServerVM.invoke(() -> validateRegionSize(REGION_NAME, entries));
+
+    newYorkServerVM
+        .invoke(() -> checkGatewayReceiverStats(expectedBatchesSent, entries, 
entries, true));
+
+    londonServer1VM.invoke(() -> checkQueueStats(newYorkName, 0, entries, 
entries, entries));
+    londonServer1VM.invoke(() -> checkBatchStats(newYorkName, 
expectedBatchesSent, false));
+    londonServer1VM.invoke(() -> checkConflatedStats(newYorkName));
+
+    // wait until queue is empty
+    londonServer2VM.invoke(() -> await()
+        .until(() -> getSenderStats(newYorkName, -1).get(0) == 0));
+
+    londonServer2VM.invoke(() -> checkQueueStats(newYorkName, 0, entries, 0, 
0));
+    londonServer2VM.invoke(() -> checkBatchStats(newYorkName, 0, false));
+    londonServer2VM.invoke(() -> checkConflatedStats(newYorkName));
+  }
+
+  @Test
+  @Parameters({"true", "false"})
+  public void 
testReplicatedSerialPropagationWithGroupTransactionEventsSendsBatchesWithCompleteTransactions_SeveralClients(
+      boolean isBatchRedistributed) throws InterruptedException {
+    newYorkServerVM.invoke("create New York server", () -> {
+      startServerWithReceiver(newYorkLocatorPort, newYorkReceiverPort, 
!isBatchRedistributed);
+      createReplicatedRegion(REGION_NAME, null);
+    });
+
+    int batchSize = 10;
+    for (VM server : londonServersVM) {
+      server.invoke("create London server " + server.getId(), () -> {
+        startServerWithSender(server.getId(), londonLocatorPort, newYorkId, 
newYorkName, false,
+            true,
+            batchSize, 1);
+        createReplicatedRegion(REGION_NAME, newYorkName);
+        GatewaySender sender = 
cacheRule.getCache().getGatewaySender(newYorkName);
+        await().untilAsserted(() -> assertThat(isRunning(sender)).isTrue());
+      });
+    }
+
+    int clients = 2;
+    int eventsPerTransaction = batchSize + 1;
+    int entriesPerInvocation = eventsPerTransaction * 200;
+
+    final List<Map<Object, Object>> data = new ArrayList<>(clients);
+    for (int clientId = 0; clientId < clients; clientId++) {
+      final Map<Object, Object> keyValues = new HashMap<>();
+      for (int i = entriesPerInvocation * clientId; i < entriesPerInvocation
+          * (clientId + 1); i++) {
+        keyValues.put(i, i + "_Value");
+      }
+      data.add(keyValues);
+    }
+
+    int entries = entriesPerInvocation * clients;
+
+    List<AsyncInvocation<Void>> putAsyncInvocations = new ArrayList<>(clients);
+    for (int i = 0; i < clients; i++) {
+      final int index = i;
+      AsyncInvocation<Void> asyncInvocation =
+          londonServer1VM.invokeAsync(() -> 
doPutsInsideTransactions(REGION_NAME, data.get(index),
+              eventsPerTransaction));
+      putAsyncInvocations.add(asyncInvocation);
+    }
+
+    for (AsyncInvocation<Void> invocation : putAsyncInvocations) {
+      invocation.await();
+    }
+
+    if (isBatchRedistributed) {
+      // wait for batches to be redistributed and then start the receiver
+      londonServer1VM.invoke(() -> await()
+          .until(() -> getSenderStats(newYorkName, -1).get(5) > 0));
+      newYorkServerVM.invoke(this::startReceiver);
+    }
+
+    newYorkServerVM.invoke(() -> validateRegionSize(REGION_NAME, entries));
+
+    
checkQueuesAreEmptyAndOnlyCompleteTransactionsAreReplicated(isBatchRedistributed);
+  }
+
+  @Test
+  @Parameters({"true", "false"})
+  public void 
testReplicatedSerialPropagationWithVsWithoutGroupTransactionEventsWithBatchRedistribution(
+      boolean groupTransactionEvents) {
+    newYorkServerVM.invoke("create New York server", () -> {
+      startServerWithReceiver(newYorkLocatorPort, newYorkReceiverPort, false);
+      createReplicatedRegion(REGION_NAME, null);
+    });
+
+    for (VM server : londonServersVM) {
+      server.invoke("create London server " + server.getId(), () -> {
+        startServerWithSender(server.getId(), londonLocatorPort, newYorkId, 
newYorkName, false,
+            groupTransactionEvents, 10, 1);
+        createReplicatedRegion(REGION_NAME, newYorkName);
+        GatewaySender sender = 
cacheRule.getCache().getGatewaySender(newYorkName);
+        await().untilAsserted(() -> assertThat(isRunning(sender)).isTrue());
+      });
+    }
+
+    final Map<Object, Object> keyValues = new HashMap<>();
+    int entries = 24;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+    int eventsPerTransaction = 3;
+    londonServer2VM.invoke(() -> doPutsInsideTransactions(REGION_NAME, 
keyValues,
+        eventsPerTransaction));
+
+    // wait for batches to be redistributed and then start the receiver
+    londonServer1VM.invoke(() -> await()
+        .untilAsserted(() -> assertThat(getSenderStats(newYorkName, 
-1).get(5)).isGreaterThan(0)));
+
+    newYorkServerVM.invoke(this::startReceiver);
+
+    newYorkServerVM.invoke(() -> validateRegionSize(REGION_NAME, entries));
+
+    // 8 transactions of 3 events each are sent.
+    // - With group-transaction-events
+    // The first batch would initially contain the first 3 transactions 
complete
+    // and the first event of the next transaction (10 entries).
+    // As --group-transaction-events is configured in the senders, the 
remaining
+    // events of the third transaction are added to the batch which makes
+    // that the first batch is sent with 12 events. The same happens with the
+    // second batch which will contain 12 events too.
+    // - Without group-transaction-events 3 batches are sent, 2 with 10 events
+    // and one with 4.
+    int expectedBatchesSent = groupTransactionEvents ? 2 : 3;
+
+    newYorkServerVM
+        .invoke(() -> checkGatewayReceiverStats(expectedBatchesSent, entries, 
entries, true));
+
+    londonServer1VM.invoke(() -> checkQueueStats(newYorkName, 0, entries, 
entries, entries));
+    londonServer1VM.invoke(() -> checkBatchStats(newYorkName, 
expectedBatchesSent, true));
+
+    // wait until queue is empty
+    londonServer2VM.invoke(() -> getSenderStats(newYorkName, 0));
+
+    londonServer2VM.invoke(() -> checkQueueStats(newYorkName, 0, entries, 0, 
0));
+    londonServer2VM.invoke(() -> checkBatchStats(newYorkName, 0, false));
+  }
+
+  @Test
+  public void testReplicatedSerialPropagationHAWithGroupTransactionEvents() 
throws Exception {
+    newYorkServerVM.invoke("create New York server", () -> {
+      startServerWithReceiver(newYorkLocatorPort, newYorkReceiverPort);
+      createReplicatedRegion(REGION_NAME, null);
+    });
+
+    int batchSize = 9;
+    for (VM server : londonServersVM) {
+      server.invoke("create London server " + server.getId(), () -> {
+        startServerWithSender(server.getId(), londonLocatorPort, newYorkId, 
newYorkName, false,
+            true, batchSize, 1);
+        createReplicatedRegion(REGION_NAME, newYorkName);
+        GatewaySender sender = 
cacheRule.getCache().getGatewaySender(newYorkName);
+        await().untilAsserted(() -> assertThat(isRunning(sender)).isTrue());
+      });
+    }
+
+    int putsPerTransaction = 2;
+    int transactions = 5000;
+    AsyncInvocation<Void> putsInvocation1 =
+        londonServer3VM.invokeAsync(
+            () -> doTxPutsWithRetryIfError(REGION_NAME, putsPerTransaction, 
transactions, 0));
+    AsyncInvocation<Void> putsInvocation2 =
+        londonServer4VM.invokeAsync(
+            () -> doTxPutsWithRetryIfError(REGION_NAME, putsPerTransaction, 
transactions, 1));
+
+    newYorkServerVM.invoke(() -> await()
+        .untilAsserted(() -> 
assertThat(getRegionSize(REGION_NAME)).isGreaterThan(40)));
+
+    AsyncInvocation<Boolean> killServerAsyncInvocation =
+        londonServer1VM.invokeAsync(() -> killPrimarySender(newYorkName));
+    Boolean isKilled = killServerAsyncInvocation.get();
+    if (!isKilled) {
+      AsyncInvocation<Boolean> killServerAsyncInvocation2 =
+          londonServer2VM.invokeAsync(() -> killPrimarySender(newYorkName));
+      killServerAsyncInvocation2.await();
+    }
+    putsInvocation1.await();
+    putsInvocation2.await();
+    killServerAsyncInvocation.await();
+
+    int entries = 2 * putsPerTransaction * transactions;
+    londonServer2VM.invoke(() -> validateRegionSize(REGION_NAME, entries));
+    newYorkServerVM.invoke(() -> validateRegionSize(REGION_NAME, entries));
+
+    // batchesReceived is equal to numberOfEntries/(batchSize+1)
+    // As transactions are 2 events long, for each batch it will always be 
necessary to
+    // add one more entry to the 9 events batch in order to have complete 
transactions in the batch.
+    int batchesReceived = entries / (batchSize + 1);
+    newYorkServerVM.invoke(() -> checkGatewayReceiverStatsHA(batchesReceived, 
entries, entries));
+
+    londonServer2VM.invoke(() -> checkStats_Failover(newYorkName, entries));
+  }
+
+  private void checkQueuesAreEmptyAndOnlyCompleteTransactionsAreReplicated(
+      boolean isBatchesRedistributed) {
+    // Wait for sender queues to be empty
+    List<List<Integer>> londonServersStats = new 
ArrayList(londonServersVM.length);
+    int i = 0;
+    for (VM londonServer : londonServersVM) {
+      londonServersStats.add(londonServer.invoke(() -> 
getSenderStats(newYorkName, 0)));
+    }
+
+    int queueSize = londonServersStats.stream().map(x -> x.get(0)).reduce(0, 
(y, z) -> y + z);
+    assertThat(queueSize).isEqualTo(0);
+
+    // batches redistributed:
+    int batchesRedistributed =
+        londonServersStats.stream().map(x -> x.get(5)).reduce(0, (y, z) -> y + 
z);
+    if (isBatchesRedistributed) {
+      assertThat(batchesRedistributed).isGreaterThan(0);
+    } else {
+      assertThat(batchesRedistributed).isEqualTo(0);
+    }
+  }
+
+  private void createReplicatedRegion(String regionName, String senderId) {
+    RegionFactory<Object, Object> fact =
+        cacheRule.getCache().createRegionFactory(RegionShortcut.REPLICATE);
+    if (senderId != null) {
+      fact.addGatewaySenderId(senderId);
+    }
+    fact.create(regionName);
+  }
+
+  private void checkQueueStats(String senderId, final int queueSize, final int 
eventsReceived,
+      final int eventsQueued, final int eventsDistributed) {
+    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
+    assertThat(statistics.getEventQueueSize()).isEqualTo(queueSize);
+    assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived);
+    assertThat(statistics.getEventsQueued()).isEqualTo(eventsQueued);
+    
assertThat(statistics.getEventsDistributed()).isGreaterThanOrEqualTo(eventsDistributed);
+  }
+
+  private void checkBatchStats(String senderId, final int batches,
+      final boolean batchesRedistributed) {
+    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
+    assertThat(statistics.getBatchesDistributed()).isEqualTo(batches);
+
+    if (batchesRedistributed) {
+      assertThat(statistics.getBatchesRedistributed()).isGreaterThan(0);
+    } else {
+      assertThat(statistics.getBatchesRedistributed()).isEqualTo(0);
+    }
+  }
+
+  private void checkConflatedStats(String senderId) {
+    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
+    assertThat(statistics.getEventsNotQueuedConflated()).isEqualTo(0);
+  }
+
+  private void checkStats_Failover(String senderId, final int eventsReceived) {
+    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
+    assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived);
+    assertThat((statistics.getEventsQueued() + 
statistics.getUnprocessedTokensAddedByPrimary()
+        + 
statistics.getUnprocessedEventsRemovedByPrimary())).isEqualTo(eventsReceived);
+  }
+
+  private Boolean killPrimarySender(String senderId) {

Review comment:
       The dunit testing framework originally required primitive wrappers, so I 
think you may have copied some of this from elsewhere. It's better to just use 
simple primitives for return types and parameters now, such as `boolean` 
instead of `Boolean`.

##########
File path: 
geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingBaseDUnitTest.java
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.txgrouping;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
+import static 
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.CacheTransactionManager;
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionException;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.cache.wan.GatewayReceiverFactory;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.GatewaySenderFactory;
+import org.apache.geode.distributed.LocatorLauncher;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.CustomerIDPartitionResolver;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.execute.data.OrderId;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
+import org.apache.geode.internal.cache.wan.InternalGatewaySenderFactory;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import org.apache.geode.test.dunit.Invoke;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.categories.WanTest;
+import 
org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.runners.GeodeParamsRunner;
+
+@Category({WanTest.class})
+@RunWith(GeodeParamsRunner.class)
+public class TxGroupingBaseDUnitTest implements Serializable {
+
+  protected static final String REGION_NAME = "TheRegion";
+
+  protected final String shipmentRegionName = "ShipmentsRegion";
+  protected final String customerRegionName = "CustomersRegion";
+  protected final String orderRegionName = "OrdersRegion";
+
+  protected static LocatorLauncher locatorLauncher;
+  protected static ServerLauncher serverLauncher;
+
+  protected VM londonLocatorVM;
+  protected VM newYorkLocatorVM;
+  protected VM newYorkServerVM;
+  protected VM londonServer1VM;
+  protected VM londonServer2VM;
+  protected VM londonServer3VM;
+  protected VM londonServer4VM;
+  protected VM[] londonServersVM;
+
+  protected String newYorkName;
+
+  protected int londonId;
+  protected int newYorkId;
+
+  protected int londonLocatorPort;
+  protected int newYorkLocatorPort;
+
+  protected int newYorkReceiverPort;
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new 
SerializableTemporaryFolder();
+
+  private static List<Integer> dispatcherThreads = new 
ArrayList<>(Arrays.asList(1, 3, 5));
+  // this will be set for each test method run with one of the values from 
above list
+  private static int numDispatcherThreadsForTheRun = 1;
+
+  @Before
+  public void setUp() {
+    londonLocatorVM = getVM(0);
+    newYorkLocatorVM = getVM(1);
+    newYorkServerVM = getVM(2);
+    londonServer1VM = getVM(3);
+    londonServer2VM = getVM(4);
+    londonServer3VM = getVM(5);
+    londonServer4VM = getVM(6);
+    londonServersVM = new VM[] {londonServer1VM, londonServer2VM, 
londonServer3VM, londonServer4VM};
+
+    newYorkName = "ny";
+
+    londonId = 1;
+    newYorkId = 2;
+
+    int[] ports = getRandomAvailableTCPPorts(3);
+    londonLocatorPort = ports[0];
+    newYorkLocatorPort = ports[1];
+    newYorkReceiverPort = ports[2];
+
+    newYorkLocatorVM.invoke("start New York locator", () -> {
+      Properties config = createLocatorConfig(newYorkId, newYorkLocatorPort, 
londonLocatorPort);
+      cacheRule.createCache(config);
+    });
+
+    londonLocatorVM.invoke("start London locator", () -> {
+      Properties config = createLocatorConfig(londonId, londonLocatorPort, 
newYorkLocatorPort);
+      cacheRule.createCache(config);
+    });
+    Collections.shuffle(dispatcherThreads);
+    int dispatcherThreadsNo = dispatcherThreads.get(0);
+    Invoke.invokeInEveryVM(() -> 
setNumDispatcherThreadsForTheRun(dispatcherThreadsNo));
+
+  }
+
+  @After
+  public void tearDown() {
+    newYorkServerVM.invoke(() -> {
+      if (serverLauncher != null) {
+        serverLauncher.stop();
+        serverLauncher = null;
+      }
+    });
+
+    for (VM server : londonServersVM) {
+      server.invoke(() -> {
+        if (serverLauncher != null) {
+          serverLauncher.stop();
+          serverLauncher = null;
+        }
+      });
+    }
+
+    newYorkLocatorVM.invoke(() -> {
+      if (locatorLauncher != null) {
+        locatorLauncher.stop();
+        locatorLauncher = null;
+      }
+    });
+
+    londonLocatorVM.invoke(() -> {
+      if (locatorLauncher != null) {
+        locatorLauncher.stop();
+        locatorLauncher = null;
+      }
+    });
+  }
+
+  protected Properties createLocatorConfig(int systemId, int locatorPort, int 
remoteLocatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");

Review comment:
       This is the default so you can delete the line with `MCAST_PORT`

##########
File path: 
geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingBaseDUnitTest.java
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.txgrouping;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
+import static 
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.CacheTransactionManager;
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionException;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.cache.wan.GatewayReceiverFactory;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.GatewaySenderFactory;
+import org.apache.geode.distributed.LocatorLauncher;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.CustomerIDPartitionResolver;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.execute.data.OrderId;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
+import org.apache.geode.internal.cache.wan.InternalGatewaySenderFactory;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import org.apache.geode.test.dunit.Invoke;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.categories.WanTest;
+import 
org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.runners.GeodeParamsRunner;
+
+@Category({WanTest.class})
+@RunWith(GeodeParamsRunner.class)
+public class TxGroupingBaseDUnitTest implements Serializable {
+
+  protected static final String REGION_NAME = "TheRegion";
+
+  protected final String shipmentRegionName = "ShipmentsRegion";
+  protected final String customerRegionName = "CustomersRegion";
+  protected final String orderRegionName = "OrdersRegion";
+
+  protected static LocatorLauncher locatorLauncher;
+  protected static ServerLauncher serverLauncher;
+
+  protected VM londonLocatorVM;
+  protected VM newYorkLocatorVM;
+  protected VM newYorkServerVM;
+  protected VM londonServer1VM;
+  protected VM londonServer2VM;
+  protected VM londonServer3VM;
+  protected VM londonServer4VM;
+  protected VM[] londonServersVM;
+
+  protected String newYorkName;
+
+  protected int londonId;
+  protected int newYorkId;
+
+  protected int londonLocatorPort;
+  protected int newYorkLocatorPort;
+
+  protected int newYorkReceiverPort;
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new 
SerializableTemporaryFolder();
+
+  private static List<Integer> dispatcherThreads = new 
ArrayList<>(Arrays.asList(1, 3, 5));
+  // this will be set for each test method run with one of the values from 
above list
+  private static int numDispatcherThreadsForTheRun = 1;
+
+  @Before
+  public void setUp() {
+    londonLocatorVM = getVM(0);
+    newYorkLocatorVM = getVM(1);
+    newYorkServerVM = getVM(2);
+    londonServer1VM = getVM(3);
+    londonServer2VM = getVM(4);
+    londonServer3VM = getVM(5);
+    londonServer4VM = getVM(6);
+    londonServersVM = new VM[] {londonServer1VM, londonServer2VM, 
londonServer3VM, londonServer4VM};
+
+    newYorkName = "ny";
+
+    londonId = 1;
+    newYorkId = 2;
+
+    int[] ports = getRandomAvailableTCPPorts(3);
+    londonLocatorPort = ports[0];
+    newYorkLocatorPort = ports[1];
+    newYorkReceiverPort = ports[2];
+
+    newYorkLocatorVM.invoke("start New York locator", () -> {
+      Properties config = createLocatorConfig(newYorkId, newYorkLocatorPort, 
londonLocatorPort);
+      cacheRule.createCache(config);
+    });
+
+    londonLocatorVM.invoke("start London locator", () -> {
+      Properties config = createLocatorConfig(londonId, londonLocatorPort, 
newYorkLocatorPort);
+      cacheRule.createCache(config);
+    });
+    Collections.shuffle(dispatcherThreads);
+    int dispatcherThreadsNo = dispatcherThreads.get(0);
+    Invoke.invokeInEveryVM(() -> 
setNumDispatcherThreadsForTheRun(dispatcherThreadsNo));
+
+  }
+
+  @After
+  public void tearDown() {
+    newYorkServerVM.invoke(() -> {
+      if (serverLauncher != null) {
+        serverLauncher.stop();
+        serverLauncher = null;
+      }
+    });
+
+    for (VM server : londonServersVM) {
+      server.invoke(() -> {
+        if (serverLauncher != null) {
+          serverLauncher.stop();
+          serverLauncher = null;
+        }
+      });
+    }
+
+    newYorkLocatorVM.invoke(() -> {
+      if (locatorLauncher != null) {
+        locatorLauncher.stop();
+        locatorLauncher = null;
+      }
+    });
+
+    londonLocatorVM.invoke(() -> {
+      if (locatorLauncher != null) {
+        locatorLauncher.stop();
+        locatorLauncher = null;
+      }
+    });
+  }
+
+  protected Properties createLocatorConfig(int systemId, int locatorPort, int 
remoteLocatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(DISTRIBUTED_SYSTEM_ID, String.valueOf(systemId));
+    config.setProperty(LOCATORS, "localhost[" + locatorPort + ']');
+    config.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocatorPort + 
']');
+    config.setProperty(START_LOCATOR,
+        "localhost[" + locatorPort + 
"],server=true,peer=true,hostname-for-clients=localhost");
+    return config;
+  }
+
+  protected void startServerWithSender(int systemId, int locatorPort, int 
remoteSystemId,
+      String remoteName, boolean isParallel, boolean groupTransactionEvents, 
int batchSize)
+      throws IOException {
+    startServerWithSender(systemId, locatorPort, remoteSystemId, remoteName, 
isParallel,
+        groupTransactionEvents, batchSize, 0);
+  }
+
+  protected void startServerWithSender(int systemId, int locatorPort, int 
remoteSystemId,
+      String remoteName, boolean isParallel, boolean groupTransactionEvents, 
int batchSize,
+      int dispatcherThreads) throws IOException {
+    cacheRule.createCache(createServerConfig(locatorPort));
+
+    String uniqueName = "server-" + systemId;
+    File[] dirs = new File[] {temporaryFolder.newFolder(uniqueName)};
+
+    GatewaySenderFactory senderFactory = createGatewaySenderFactory(dirs, 
uniqueName);
+    senderFactory.setParallel(isParallel);
+    senderFactory.setGroupTransactionEvents(groupTransactionEvents);
+    senderFactory.setBatchSize(batchSize);
+    if (dispatcherThreads > 0) {
+      senderFactory.setDispatcherThreads(dispatcherThreads);
+    }
+    GatewaySender sender = senderFactory.create(remoteName, remoteSystemId);
+    sender.start();
+  }
+
+  protected void startServerWithReceiver(int locatorPort,
+      int receiverPort) throws IOException {
+    startServerWithReceiver(locatorPort, receiverPort, true);
+  }
+
+  protected void startServerWithReceiver(int locatorPort,
+      int receiverPort, boolean start) throws IOException {
+    cacheRule.createCache(createServerConfig(locatorPort));
+
+    GatewayReceiverFactory receiverFactory = 
createGatewayReceiverFactory(receiverPort);
+    GatewayReceiver receiver = receiverFactory.create();
+    if (start) {
+      receiver.start();
+    }
+  }
+
+  protected void startReceiver() throws IOException {
+    cacheRule.getCache().getGatewayReceivers().iterator().next().start();
+  }
+
+  protected GatewayReceiverFactory createGatewayReceiverFactory(int 
receiverPort) {
+    GatewayReceiverFactory receiverFactory = 
cacheRule.getCache().createGatewayReceiverFactory();
+
+    receiverFactory.setStartPort(receiverPort);
+    receiverFactory.setEndPort(receiverPort);
+    receiverFactory.setManualStart(true);
+    return receiverFactory;
+  }
+
+  protected Properties createServerConfig(int locatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(LOCATORS, "localhost[" + locatorPort + ']');
+    return config;
+  }
+
+  protected GatewaySenderFactory createGatewaySenderFactory(File[] dirs, 
String diskStoreName) {
+    InternalGatewaySenderFactory senderFactory =
+        (InternalGatewaySenderFactory) 
cacheRule.getCache().createGatewaySenderFactory();
+
+    senderFactory.setMaximumQueueMemory(100);
+    senderFactory.setBatchSize(10);
+    senderFactory.setBatchConflationEnabled(false);
+    senderFactory.setManualStart(true);
+    senderFactory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    senderFactory.setOrderPolicy(GatewaySender.DEFAULT_ORDER_POLICY);
+
+    DiskStoreFactory dsf = cacheRule.getCache().createDiskStoreFactory();
+    DiskStore store = dsf.setDiskDirs(dirs).create(diskStoreName);
+    senderFactory.setDiskStoreName(store.getName());
+
+    return senderFactory;
+  }
+
+  protected boolean isRunning(GatewaySender sender) {
+    return sender != null && sender.isRunning();
+  }
+
+  protected void validateRegionSize(String regionName, final int regionSize) {
+    final Region<Object, Object> r = cacheRule.getCache().getRegion(SEPARATOR 
+ regionName);
+    assertNotNull(r);
+    if (regionSize != r.keySet().size()) {
+      await().untilAsserted(() -> 
assertThat(r.keySet().size()).isEqualTo(regionSize));
+    }
+  }
+
+  protected List<Integer> getSenderStats(String senderId, final int 
expectedQueueSize) {
+    AbstractGatewaySender sender =
+        (AbstractGatewaySender) 
cacheRule.getCache().getGatewaySender(senderId);
+    GatewaySenderStats statistics = sender.getStatistics();
+    if (expectedQueueSize != -1) {
+      final RegionQueue regionQueue;
+      regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0];
+      if (sender.isParallel()) {
+        ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue =
+            (ConcurrentParallelGatewaySenderQueue) regionQueue;
+        PartitionedRegion pr =
+            parallelGatewaySenderQueue.getRegions().toArray(new 
PartitionedRegion[1])[0];
+      }
+      await()
+          .untilAsserted(() -> 
assertThat(regionQueue.size()).isEqualTo(expectedQueueSize));
+    }
+    ArrayList<Integer> stats = new ArrayList<>();

Review comment:
       Remember to declare as the weakest interface you need:
   ```
   List<Integer> stats = new ArrayList<>();
   ```

##########
File path: 
geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingBaseDUnitTest.java
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.txgrouping;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
+import static 
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;

Review comment:
       Please use AssertJ for all assertions. The `assertEquals` can just use 
`assertThat(a).isEqualTo(b)` and `assertNotNull` becomes 
`assertThat(a).isNotNull();`

##########
File path: 
geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/parallel/TxGroupingParallelDUnitTest.java
##########
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.txgrouping.parallel;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junitparams.Parameters;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.execute.data.CustId;
+import org.apache.geode.internal.cache.execute.data.Customer;
+import org.apache.geode.internal.cache.execute.data.Order;
+import org.apache.geode.internal.cache.execute.data.OrderId;
+import org.apache.geode.internal.cache.execute.data.Shipment;
+import org.apache.geode.internal.cache.execute.data.ShipmentId;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.txgrouping.TxGroupingBaseDUnitTest;
+import org.apache.geode.internal.util.ArrayUtils;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.runners.GeodeParamsRunner;
+
+@Category({WanTest.class})
+@RunWith(GeodeParamsRunner.class)
+public class TxGroupingParallelDUnitTest extends TxGroupingBaseDUnitTest {
+  @Test
+  @Parameters({"true", "false"})
+  public void testPRParallelPropagationWithVsWithoutGroupTransactionEvents(
+      boolean groupTransactionEvents) {
+    newYorkServerVM.invoke("create New York server", () -> {
+      startServerWithReceiver(newYorkLocatorPort, newYorkReceiverPort);
+      createCustomerOrderShipmentPartitionedRegion(null);
+    });
+
+    for (VM server : londonServersVM) {
+      server.invoke("create London server " + server.getId(), () -> {
+        startServerWithSender(server.getId(), londonLocatorPort, newYorkId, 
newYorkName, true,
+            groupTransactionEvents,
+            10);
+        createCustomerOrderShipmentPartitionedRegion(newYorkName);
+        GatewaySender sender = 
cacheRule.getCache().getGatewaySender(newYorkName);
+        await().untilAsserted(() -> assertThat(isRunning(sender)).isTrue());
+      });
+    }
+
+    final Map<Object, Object> custKeyValue = new HashMap<>();
+    int intCustId = 1;
+    CustId custId = new CustId(intCustId);
+    custKeyValue.put(custId, new Customer());
+    londonServer1VM.invoke(() -> putGivenKeyValues(customerRegionName, 
custKeyValue));
+
+    int transactions = 3;
+    final Map<Object, Object> keyValues = new HashMap<>();
+    for (int i = 0; i < transactions; i++) {
+      OrderId orderId = new OrderId(i, custId);
+      ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+      ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+      ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+      keyValues.put(orderId, new Order());
+      keyValues.put(shipmentId1, new Shipment());
+      keyValues.put(shipmentId2, new Shipment());
+      keyValues.put(shipmentId3, new Shipment());
+    }
+
+    // 3 transactions of 4 events each are sent so that the batch would
+    // initially contain the first 2 transactions complete and the first
+    // 2 events of the last transaction (10 entries).
+    // If --group-transaction-events is configured in the senders, the 
remaining
+    // 2 events of the last transaction are added to the batch which makes
+    // that only one batch of 12 events is sent.
+    // If --group-transaction-events is not configured in the senders, the
+    // remaining 2 events of the last transaction are added to the second batch
+    // which makes that 2 batches will be sent, one with 10 events and
+    // one with 2.
+    int eventsPerTransaction = 4;
+    londonServer1VM.invoke(() -> 
doOrderAndShipmentPutsInsideTransactions(keyValues,
+        eventsPerTransaction));
+
+    int entries = (transactions * eventsPerTransaction) + 1;
+
+    londonServer1VM.invoke(() -> validateRegionSize(customerRegionName, 1));
+    londonServer1VM.invoke(() -> validateRegionSize(orderRegionName, 
transactions));
+    londonServer1VM.invoke(() -> validateRegionSize(shipmentRegionName, 
transactions * 3));
+
+    List<Integer> senderStatsLondonServers = getSenderStats(newYorkName, 0, 
londonServersVM);
+
+    int expectedBatchesSent = groupTransactionEvents ? 1 : 2;
+    // queue size:
+    assertThat(senderStatsLondonServers.get(0)).isEqualTo(0);
+    // eventsReceived:
+    assertThat(senderStatsLondonServers.get(1)).isEqualTo(entries);
+    // events queued:
+    assertThat(senderStatsLondonServers.get(2)).isEqualTo(entries);
+    // events distributed:
+    assertThat(senderStatsLondonServers.get(3)).isEqualTo(entries);
+    // batches distributed:
+    assertThat(senderStatsLondonServers.get(4)).isEqualTo(expectedBatchesSent);
+    // batches redistributed:
+    assertThat(senderStatsLondonServers.get(5)).isEqualTo(0);
+    // events not queued conflated:
+    assertThat(senderStatsLondonServers.get(7)).isEqualTo(0);
+    // batches with incomplete transactions:
+    assertThat(senderStatsLondonServers.get(13)).isEqualTo(0);
+  }
+
+  @Test
+  @Parameters({"true", "false"})
+  public void 
testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithCompleteTransactions_SeveralClients(
+      boolean isBatchesRedistributed) {
+    newYorkServerVM.invoke("create New York server", () -> {
+      startServerWithReceiver(newYorkLocatorPort, newYorkReceiverPort, 
!isBatchesRedistributed);
+      createCustomerOrderShipmentPartitionedRegion(null);
+    });
+
+    int batchSize = 10;
+    for (VM server : londonServersVM) {
+      server.invoke("create London server " + server.getId(), () -> {
+        startServerWithSender(server.getId(), londonLocatorPort, newYorkId, 
newYorkName, true, true,
+            batchSize);
+        createCustomerOrderShipmentPartitionedRegion(newYorkName);
+        GatewaySender sender = 
cacheRule.getCache().getGatewaySender(newYorkName);
+        await().untilAsserted(() -> assertThat(isRunning(sender)).isTrue());
+      });
+    }
+
+    int clients = 4;
+    int transactions = 300;
+    // batchSize is 10. Each transaction will contain 1 order + 3 shipments = 
4 events.
+    // As a result, all batches will contain extra events to complete the
+    // transactions it will deliver.
+    int shipmentsPerTransaction = 3;
+
+    final List<Map<Object, Object>> customerData = new ArrayList<>(clients);
+    for (int intCustId = 0; intCustId < clients; intCustId++) {
+      final Map<Object, Object> custKeyValue = new HashMap<>();
+      CustId custId = new CustId(intCustId);
+      custKeyValue.put(custId, new Customer());
+      customerData.add(new HashMap<>());
+      londonServer1VM.invoke(() -> putGivenKeyValues(customerRegionName, 
custKeyValue));
+
+      for (int i = 0; i < transactions; i++) {
+        OrderId orderId = new OrderId(i, custId);
+        customerData.get(intCustId).put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          customerData.get(intCustId).put(new ShipmentId(i + j, orderId), new 
Shipment());
+        }
+      }
+    }
+
+    List<AsyncInvocation<?>> asyncInvocations = new ArrayList<>(clients);
+
+    int eventsPerTransaction = shipmentsPerTransaction + 1;
+    for (int i = 0; i < clients; i++) {
+      final int intCustId = i;
+      AsyncInvocation<?> asyncInvocation =
+          londonServer1VM.invokeAsync(() -> 
doOrderAndShipmentPutsInsideTransactions(
+              customerData.get(intCustId),
+              eventsPerTransaction));
+      asyncInvocations.add(asyncInvocation);
+    }
+
+    try {
+      for (AsyncInvocation<?> asyncInvocation : asyncInvocations) {
+        asyncInvocation.await();
+      }
+    } catch (InterruptedException e) {
+      fail("Interrupted");
+    }

Review comment:
       Don't catch an exception and use `fail`. Instead just add `throws 
InterruptedException` to the test and delete the try-catch lines.

##########
File path: 
geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingBaseDUnitTest.java
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.txgrouping;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
+import static 
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.CacheTransactionManager;
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionException;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.cache.wan.GatewayReceiverFactory;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.GatewaySenderFactory;
+import org.apache.geode.distributed.LocatorLauncher;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.CustomerIDPartitionResolver;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.execute.data.OrderId;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
+import org.apache.geode.internal.cache.wan.InternalGatewaySenderFactory;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import org.apache.geode.test.dunit.Invoke;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.categories.WanTest;
+import 
org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.runners.GeodeParamsRunner;
+
+@Category({WanTest.class})
+@RunWith(GeodeParamsRunner.class)
+public class TxGroupingBaseDUnitTest implements Serializable {
+
+  protected static final String REGION_NAME = "TheRegion";
+
+  protected final String shipmentRegionName = "ShipmentsRegion";
+  protected final String customerRegionName = "CustomersRegion";
+  protected final String orderRegionName = "OrdersRegion";
+
+  protected static LocatorLauncher locatorLauncher;
+  protected static ServerLauncher serverLauncher;
+
+  protected VM londonLocatorVM;
+  protected VM newYorkLocatorVM;
+  protected VM newYorkServerVM;
+  protected VM londonServer1VM;
+  protected VM londonServer2VM;
+  protected VM londonServer3VM;
+  protected VM londonServer4VM;
+  protected VM[] londonServersVM;
+
+  protected String newYorkName;
+
+  protected int londonId;
+  protected int newYorkId;
+
+  protected int londonLocatorPort;
+  protected int newYorkLocatorPort;
+
+  protected int newYorkReceiverPort;
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new 
SerializableTemporaryFolder();
+
+  private static List<Integer> dispatcherThreads = new 
ArrayList<>(Arrays.asList(1, 3, 5));
+  // this will be set for each test method run with one of the values from 
above list
+  private static int numDispatcherThreadsForTheRun = 1;
+
+  @Before
+  public void setUp() {
+    londonLocatorVM = getVM(0);
+    newYorkLocatorVM = getVM(1);
+    newYorkServerVM = getVM(2);
+    londonServer1VM = getVM(3);
+    londonServer2VM = getVM(4);
+    londonServer3VM = getVM(5);
+    londonServer4VM = getVM(6);
+    londonServersVM = new VM[] {londonServer1VM, londonServer2VM, 
londonServer3VM, londonServer4VM};
+
+    newYorkName = "ny";
+
+    londonId = 1;
+    newYorkId = 2;
+
+    int[] ports = getRandomAvailableTCPPorts(3);
+    londonLocatorPort = ports[0];
+    newYorkLocatorPort = ports[1];
+    newYorkReceiverPort = ports[2];
+
+    newYorkLocatorVM.invoke("start New York locator", () -> {
+      Properties config = createLocatorConfig(newYorkId, newYorkLocatorPort, 
londonLocatorPort);
+      cacheRule.createCache(config);
+    });
+
+    londonLocatorVM.invoke("start London locator", () -> {
+      Properties config = createLocatorConfig(londonId, londonLocatorPort, 
newYorkLocatorPort);
+      cacheRule.createCache(config);
+    });
+    Collections.shuffle(dispatcherThreads);
+    int dispatcherThreadsNo = dispatcherThreads.get(0);
+    Invoke.invokeInEveryVM(() -> 
setNumDispatcherThreadsForTheRun(dispatcherThreadsNo));
+
+  }
+
+  @After
+  public void tearDown() {
+    newYorkServerVM.invoke(() -> {
+      if (serverLauncher != null) {
+        serverLauncher.stop();
+        serverLauncher = null;
+      }
+    });
+
+    for (VM server : londonServersVM) {
+      server.invoke(() -> {
+        if (serverLauncher != null) {
+          serverLauncher.stop();
+          serverLauncher = null;
+        }
+      });
+    }
+
+    newYorkLocatorVM.invoke(() -> {
+      if (locatorLauncher != null) {
+        locatorLauncher.stop();
+        locatorLauncher = null;
+      }
+    });
+
+    londonLocatorVM.invoke(() -> {
+      if (locatorLauncher != null) {
+        locatorLauncher.stop();
+        locatorLauncher = null;
+      }
+    });
+  }
+
+  protected Properties createLocatorConfig(int systemId, int locatorPort, int 
remoteLocatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(DISTRIBUTED_SYSTEM_ID, String.valueOf(systemId));
+    config.setProperty(LOCATORS, "localhost[" + locatorPort + ']');
+    config.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocatorPort + 
']');
+    config.setProperty(START_LOCATOR,
+        "localhost[" + locatorPort + 
"],server=true,peer=true,hostname-for-clients=localhost");
+    return config;
+  }
+
+  protected void startServerWithSender(int systemId, int locatorPort, int 
remoteSystemId,
+      String remoteName, boolean isParallel, boolean groupTransactionEvents, 
int batchSize)
+      throws IOException {
+    startServerWithSender(systemId, locatorPort, remoteSystemId, remoteName, 
isParallel,
+        groupTransactionEvents, batchSize, 0);
+  }
+
+  protected void startServerWithSender(int systemId, int locatorPort, int 
remoteSystemId,
+      String remoteName, boolean isParallel, boolean groupTransactionEvents, 
int batchSize,
+      int dispatcherThreads) throws IOException {
+    cacheRule.createCache(createServerConfig(locatorPort));
+
+    String uniqueName = "server-" + systemId;
+    File[] dirs = new File[] {temporaryFolder.newFolder(uniqueName)};
+
+    GatewaySenderFactory senderFactory = createGatewaySenderFactory(dirs, 
uniqueName);
+    senderFactory.setParallel(isParallel);
+    senderFactory.setGroupTransactionEvents(groupTransactionEvents);
+    senderFactory.setBatchSize(batchSize);
+    if (dispatcherThreads > 0) {
+      senderFactory.setDispatcherThreads(dispatcherThreads);
+    }
+    GatewaySender sender = senderFactory.create(remoteName, remoteSystemId);
+    sender.start();
+  }
+
+  protected void startServerWithReceiver(int locatorPort,
+      int receiverPort) throws IOException {
+    startServerWithReceiver(locatorPort, receiverPort, true);
+  }
+
+  protected void startServerWithReceiver(int locatorPort,
+      int receiverPort, boolean start) throws IOException {
+    cacheRule.createCache(createServerConfig(locatorPort));
+
+    GatewayReceiverFactory receiverFactory = 
createGatewayReceiverFactory(receiverPort);
+    GatewayReceiver receiver = receiverFactory.create();
+    if (start) {
+      receiver.start();
+    }
+  }
+
+  protected void startReceiver() throws IOException {
+    cacheRule.getCache().getGatewayReceivers().iterator().next().start();
+  }
+
+  protected GatewayReceiverFactory createGatewayReceiverFactory(int 
receiverPort) {
+    GatewayReceiverFactory receiverFactory = 
cacheRule.getCache().createGatewayReceiverFactory();
+
+    receiverFactory.setStartPort(receiverPort);
+    receiverFactory.setEndPort(receiverPort);
+    receiverFactory.setManualStart(true);
+    return receiverFactory;
+  }
+
+  protected Properties createServerConfig(int locatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(LOCATORS, "localhost[" + locatorPort + ']');
+    return config;
+  }
+
+  protected GatewaySenderFactory createGatewaySenderFactory(File[] dirs, 
String diskStoreName) {
+    InternalGatewaySenderFactory senderFactory =
+        (InternalGatewaySenderFactory) 
cacheRule.getCache().createGatewaySenderFactory();
+
+    senderFactory.setMaximumQueueMemory(100);
+    senderFactory.setBatchSize(10);
+    senderFactory.setBatchConflationEnabled(false);
+    senderFactory.setManualStart(true);
+    senderFactory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    senderFactory.setOrderPolicy(GatewaySender.DEFAULT_ORDER_POLICY);
+
+    DiskStoreFactory dsf = cacheRule.getCache().createDiskStoreFactory();
+    DiskStore store = dsf.setDiskDirs(dirs).create(diskStoreName);
+    senderFactory.setDiskStoreName(store.getName());
+
+    return senderFactory;
+  }
+
+  protected boolean isRunning(GatewaySender sender) {
+    return sender != null && sender.isRunning();
+  }
+
+  protected void validateRegionSize(String regionName, final int regionSize) {
+    final Region<Object, Object> r = cacheRule.getCache().getRegion(SEPARATOR 
+ regionName);
+    assertNotNull(r);
+    if (regionSize != r.keySet().size()) {
+      await().untilAsserted(() -> 
assertThat(r.keySet().size()).isEqualTo(regionSize));
+    }
+  }
+
+  protected List<Integer> getSenderStats(String senderId, final int 
expectedQueueSize) {
+    AbstractGatewaySender sender =
+        (AbstractGatewaySender) 
cacheRule.getCache().getGatewaySender(senderId);
+    GatewaySenderStats statistics = sender.getStatistics();
+    if (expectedQueueSize != -1) {
+      final RegionQueue regionQueue;
+      regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0];
+      if (sender.isParallel()) {
+        ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue =
+            (ConcurrentParallelGatewaySenderQueue) regionQueue;
+        PartitionedRegion pr =
+            parallelGatewaySenderQueue.getRegions().toArray(new 
PartitionedRegion[1])[0];
+      }
+      await()
+          .untilAsserted(() -> 
assertThat(regionQueue.size()).isEqualTo(expectedQueueSize));
+    }
+    ArrayList<Integer> stats = new ArrayList<>();
+    stats.add(statistics.getEventQueueSize());
+    stats.add(statistics.getEventsReceived());
+    stats.add(statistics.getEventsQueued());
+    stats.add(statistics.getEventsDistributed());
+    stats.add(statistics.getBatchesDistributed());
+    stats.add(statistics.getBatchesRedistributed());
+    stats.add(statistics.getEventsFiltered());
+    stats.add(statistics.getEventsNotQueuedConflated());
+    stats.add(statistics.getEventsConflatedFromBatches());
+    stats.add(statistics.getConflationIndexesMapSize());
+    stats.add(statistics.getSecondaryEventQueueSize());
+    stats.add(statistics.getEventsProcessedByPQRM());
+    stats.add(statistics.getEventsExceedingAlertThreshold());
+    stats.add((int) statistics.getBatchesWithIncompleteTransactions());
+    return stats;
+  }
+
+  protected GatewaySender getGatewaySender(String senderId) {
+    Set<GatewaySender> senders = cacheRule.getCache().getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    return sender;
+  }
+
+  protected void doPutsInsideTransactions(String regionName, Map<Object, 
Object> keyValues,
+      int eventsPerTransaction) {
+    Region<Object, Object> r = cacheRule.getCache().getRegion(Region.SEPARATOR 
+ regionName);
+    assertNotNull(r);
+    int eventInTransaction = 0;
+    CacheTransactionManager cacheTransactionManager =
+        cacheRule.getCache().getCacheTransactionManager();
+    for (Object key : keyValues.keySet()) {
+      if (eventInTransaction == 0) {
+        cacheTransactionManager.begin();
+      }
+      r.put(key, keyValues.get(key));
+      if (++eventInTransaction == eventsPerTransaction) {
+        cacheTransactionManager.commit();
+        eventInTransaction = 0;
+      }
+    }
+    if (eventInTransaction != 0) {
+      cacheTransactionManager.commit();
+    }
+  }
+
+  protected void checkGatewayReceiverStats(int processBatches, int 
eventsReceived,
+      int creates) {
+    checkGatewayReceiverStats(processBatches, eventsReceived, creates, false);
+  }
+
+  protected void checkGatewayReceiverStats(int processBatches, int 
eventsReceived,
+      int creates, boolean isExact) {
+    Set<GatewayReceiver> gatewayReceivers = 
cacheRule.getCache().getGatewayReceivers();
+    GatewayReceiver receiver = gatewayReceivers.iterator().next();
+    CacheServerStats stats = ((CacheServerImpl) 
receiver.getServer()).getAcceptor().getStats();
+
+    assertThat(stats).isInstanceOf(GatewayReceiverStats.class);
+    GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats) stats;
+    if (isExact) {
+      
assertThat(gatewayReceiverStats.getProcessBatchRequests()).isEqualTo(processBatches);
+    } else {
+      assertThat(gatewayReceiverStats.getProcessBatchRequests())
+          .isGreaterThanOrEqualTo(processBatches);
+    }
+    
assertThat(eventsReceived).isEqualTo(gatewayReceiverStats.getEventsReceived());
+    assertThat(creates).isEqualTo(gatewayReceiverStats.getCreateRequest());
+  }
+
+  protected void doTxPutsWithRetryIfError(String regionName, final long 
putsPerTransaction,

Review comment:
       I think you should try to simplify any new method with this many 
calculations, for-loops, do-whiles and try-blocks. If you really need all of 
those blocks then try extracting the nested blocks to their own short little 
methods.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to