This is an automated email from the ASF dual-hosted git repository.

mkevo pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.15 by this push:
     new fdb5e92d93 GEODE-10281: Fix WAN data inconsistency (#7665)
fdb5e92d93 is described below

commit fdb5e92d93095dc26f82e2935cce18e121f70b36
Author: Jakov Varenina <62134331+jvaren...@users.noreply.github.com>
AuthorDate: Thu Jul 7 14:15:38 2022 +0200

    GEODE-10281: Fix WAN data inconsistency (#7665)
---
 .../internal/cache/wan/GatewaySenderEventImpl.java |   2 +-
 .../cache/wan/GatewaySenderEventImplTest.java      |  87 ++++--
 ...eplicateRegionWithSerialGwsDistributedTest.java | 333 +++++++++++++++++++++
 3 files changed, 393 insertions(+), 29 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 494e499168..d18a9a5d68 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -853,7 +853,7 @@ public class GatewaySenderEventImpl
     // If the message is an update, it may be conflatable. If it is a
     // create, destroy, invalidate or destroy-region, it is not conflatable.
     // Only updates are conflated.
-    return isUpdate();
+    return isUpdate() && !isConcurrencyConflict();
   }
 
   @Override
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java
index cec3e4b5a2..cf1f5d100e 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java
@@ -33,13 +33,15 @@ import java.io.DataInput;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.stream.Stream;
 
-import junitparams.Parameters;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.TransactionId;
@@ -61,18 +63,16 @@ import 
org.apache.geode.internal.serialization.VersionedDataInputStream;
 import org.apache.geode.internal.serialization.VersionedDataOutputStream;
 import org.apache.geode.internal.util.BlobHelper;
 import org.apache.geode.test.fake.Fakes;
-import org.apache.geode.test.junit.runners.GeodeParamsRunner;
 
-@RunWith(GeodeParamsRunner.class)
 public class GatewaySenderEventImplTest {
 
   private GemFireCacheImpl cache;
 
-  @Rule
-  public TestName testName = new TestName();
+  private String testName;
 
-  @Before
-  public void setUpGemFire() {
+  @BeforeEach
+  public void setUpGemFire(TestInfo testInfo) {
+    testName = testInfo.getDisplayName();
     createCache();
   }
 
@@ -110,8 +110,8 @@ public class GatewaySenderEventImplTest {
     assertThat(gatewaySenderEvent.getTransactionId()).isNotNull();
   }
 
-  @Test
-  @Parameters(method = "getVersionsAndExpectedInvocations")
+  @ParameterizedTest
+  @MethodSource("getVersionsAndExpectedInvocations")
   public void 
testSerializingDataFromCurrentVersionToOldVersion(VersionAndExpectedInvocations 
vaei)
       throws IOException {
     GatewaySenderEventImpl gatewaySenderEvent = 
spy(GatewaySenderEventImpl.class);
@@ -129,8 +129,8 @@ public class GatewaySenderEventImplTest {
         any());
   }
 
-  @Test
-  @Parameters(method = "getVersionsAndExpectedInvocations")
+  @ParameterizedTest
+  @MethodSource("getVersionsAndExpectedInvocations")
   public void testDeserializingDataFromOldVersionToCurrentVersion(
       VersionAndExpectedInvocations vaei)
       throws IOException, ClassNotFoundException {
@@ -151,18 +151,17 @@ public class GatewaySenderEventImplTest {
         any());
   }
 
-  private VersionAndExpectedInvocations[] getVersionsAndExpectedInvocations() {
-    return new VersionAndExpectedInvocations[] {
-        new VersionAndExpectedInvocations(GEODE_1_8_0, 1, 0, 0),
-        new VersionAndExpectedInvocations(GEODE_1_13_0, 1, 1, 0),
-        new VersionAndExpectedInvocations(GEODE_1_14_0, 1, 1, 1)
-    };
+  private static Stream<Arguments> getVersionsAndExpectedInvocations() {
+    return Stream.of(
+        Arguments.of(new VersionAndExpectedInvocations(GEODE_1_8_0, 1, 0, 0)),
+        Arguments.of(new VersionAndExpectedInvocations(GEODE_1_13_0, 1, 1, 0)),
+        Arguments.of(new VersionAndExpectedInvocations(GEODE_1_14_0, 1, 1, 
1)));
   }
 
   @Test
   public void testEquality() throws Exception {
     LocalRegion region = mock(LocalRegion.class);
-    when(region.getFullPath()).thenReturn(testName.getMethodName() + 
"_region");
+    when(region.getFullPath()).thenReturn(testName + "_region");
     when(region.getCache()).thenReturn(cache);
     Object event = 
ParallelGatewaySenderHelper.createGatewaySenderEvent(region, Operation.CREATE,
         "key1", "value1", 0, 0, 0, 0);
@@ -209,7 +208,7 @@ public class GatewaySenderEventImplTest {
     assertThat(event).isNotEqualTo(eventDifferentValue);
 
     LocalRegion region2 = mock(LocalRegion.class);
-    when(region2.getFullPath()).thenReturn(testName.getMethodName() + 
"_region2");
+    when(region2.getFullPath()).thenReturn(testName + "_region2");
     when(region2.getCache()).thenReturn(cache);
     Object eventDifferentRegion =
         ParallelGatewaySenderHelper.createGatewaySenderEvent(region2, 
Operation.CREATE,
@@ -221,7 +220,7 @@ public class GatewaySenderEventImplTest {
   public void testSerialization() throws Exception {
     // Set up test
     LocalRegion region = mock(LocalRegion.class);
-    when(region.getFullPath()).thenReturn(testName.getMethodName() + 
"_region");
+    when(region.getFullPath()).thenReturn(testName + "_region");
     when(region.getCache()).thenReturn(cache);
     TXId txId = new TXId(cache.getMyId(), 0);
     when(region.getTXId()).thenReturn(txId);
@@ -348,12 +347,13 @@ public class GatewaySenderEventImplTest {
     return cacheEvent;
   }
 
-  @Parameters({"true, true", "true, false", "false, false"})
+  @ParameterizedTest
+  @CsvSource({"true,true", "true,false", "false,false"})
   public void testCreation_WithAfterUpdateWithGenerateCallbacks(boolean 
isGenerateCallbacks,
       boolean isCallbackArgumentNull)
       throws IOException {
-    InternalRegion region = mock(InternalRegion.class);
-    when(region.getFullPath()).thenReturn(testName.getMethodName() + 
"_region");
+    InternalRegion region = mock(LocalRegion.class);
+    when(region.getFullPath()).thenReturn(testName + "_region");
 
     Operation operation = mock(Operation.class);
     when(operation.isLocalLoad()).thenReturn(true);
@@ -377,6 +377,37 @@ public class GatewaySenderEventImplTest {
     assertThat(event.getAction()).isEqualTo(action);
   }
 
+  @Test
+  public void testShouldNotBeConflatedCreate() throws IOException {
+    final EntryEventImpl cacheEvent = 
mockEntryEventImpl(mock(TransactionId.class));
+
+    final GatewaySenderEventImpl gatewaySenderEvent =
+        new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent, 
null, INCLUDE);
+
+    assertThat(gatewaySenderEvent.shouldBeConflated()).isFalse();
+  }
+
+  @Test
+  public void testShouldBeConflatedUpdate() throws IOException {
+    final EntryEventImpl cacheEvent = 
mockEntryEventImpl(mock(TransactionId.class));
+
+    final GatewaySenderEventImpl gatewaySenderEvent =
+        new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE, cacheEvent, 
null, INCLUDE);
+
+    assertThat(gatewaySenderEvent.shouldBeConflated()).isTrue();
+  }
+
+  @Test
+  public void testShouldNotBeConflatedUpdateConcurrentConflict() throws 
IOException {
+    final EntryEventImpl cacheEvent = 
mockEntryEventImpl(mock(TransactionId.class));
+    when(cacheEvent.isConcurrencyConflict()).thenReturn(true);
+
+    final GatewaySenderEventImpl gatewaySenderEvent =
+        new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE, cacheEvent, 
null, INCLUDE);
+
+    assertThat(gatewaySenderEvent.shouldBeConflated()).isFalse();
+  }
+
   public static class VersionAndExpectedInvocations {
 
     private final KnownVersion version;
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.java
new file mode 100644
index 0000000000..c940ade012
--- /dev/null
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.serial;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.cache.wan.GatewaySender;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class 
InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest
+    implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  public static boolean ENTRY_CONFLICT_WINNER_HAS_REACHED_THE_REDUNDANT_SERVER;
+
+  private MemberVM locator1Site2;
+
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private int server1Site2Port;
+  private int server2Site2Port;
+
+  private ClientVM clientConnectedToServer1Site2;
+  private ClientVM clientConnectedToServer2Site2;
+
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+  private static final String REGION_NAME = "test1";
+
+  private static final String GATEWAY_SENDER_ID = "ln";
+
+  private final Map.Entry<Integer, Integer> ENTRY_INITIAL = new 
AbstractMap.SimpleEntry<>(1, 0);
+  private final Map.Entry<Integer, Integer> ENTRY_CONFLICT_RESOLUTION_WINNER =
+      new AbstractMap.SimpleEntry<>(1, 1);
+  private final Map.Entry<Integer, Integer> ENTRY_CONFLICT_RESOLUTION_LOSER =
+      new AbstractMap.SimpleEntry<>(1, 2);
+
+  @Before
+  public void setupMultiSite() throws Exception {
+    Properties props = new Properties();
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE1);
+    MemberVM locator1Site1 = clusterStartupRule.startLocatorVM(0, props);
+    MemberVM locator2Site1 = clusterStartupRule.startLocatorVM(1, props, 
locator1Site1.getPort());
+
+    // start servers for site #1
+    server1Site1 =
+        clusterStartupRule.startServerVM(2, locator1Site1.getPort(), 
locator2Site1.getPort());
+    server2Site1 =
+        clusterStartupRule.startServerVM(3, locator1Site1.getPort(), 
locator2Site1.getPort());
+    connectGfshToSite(locator1Site1);
+
+    // create partition region on site #1
+    CommandStringBuilder regionCmd = new 
CommandStringBuilder(CliStrings.CREATE_REGION);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGION, REGION_NAME);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "REPLICATE");
+
+    gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess();
+
+    String csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYRECEIVER)
+        .addOption(CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS, "localhost")
+        .getCommandString();
+
+    gfsh.executeAndAssertThat(csb).statusIsSuccess();
+
+    server1Site1.invoke(
+        
InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest::verifyReceiverState);
+    server2Site1.invoke(
+        
InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest::verifyReceiverState);
+
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE2);
+    props.setProperty(REMOTE_LOCATORS,
+        "localhost[" + locator1Site1.getPort() + "],localhost[" + 
locator2Site1.getPort() + "]");
+    locator1Site2 = clusterStartupRule.startLocatorVM(5, props);
+
+    // start servers for site #2
+    server1Site2 = clusterStartupRule.startServerVM(6, 
locator1Site2.getPort());
+    server2Site2 = clusterStartupRule.startServerVM(7, 
locator1Site2.getPort());
+
+    server2Site2Port = server2Site2.getPort();
+    server1Site2Port = server1Site2.getPort();
+
+    // create gateway-sender on site #2
+    connectGfshToSite(locator1Site2);
+    String command = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER)
+        .addOption(CliStrings.MEMBERS, server2Site2.getName())
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__ID, GATEWAY_SENDER_ID)
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, 
"1")
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "false")
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION, 
"true")
+        .getCommandString();
+    gfsh.executeAndAssertThat(command).statusIsSuccess();
+
+    verifyGatewaySenderState(server2Site2, false);
+
+    executeGatewaySenderActionCommandSite2(CliStrings.PAUSE_GATEWAYSENDER);
+
+    // create partition region on site #2
+    regionCmd = new CommandStringBuilder(CliStrings.CREATE_REGION);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGION, REGION_NAME);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "REPLICATE");
+    regionCmd.addOption(CliStrings.CREATE_REGION__GATEWAYSENDERID, 
GATEWAY_SENDER_ID);
+    gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess();
+  }
+
+  @Test
+  public void testEventIsNotConflatedWhenConcurrentModificationIsDetected() 
throws Exception {
+    startClientToServer1Site2(server1Site2Port);
+    startClientToServer2Site2(server2Site2Port);
+
+    clientConnectedToServer2Site2.invoke(() -> 
executePutOperation(ENTRY_INITIAL));
+    waitUntilEventIsConsistentlyReplicatedAcrossServers(ENTRY_INITIAL, 
server1Site2, server2Site2);
+
+    // Configure cache writer on server to delay writing of entry in order to 
provoke
+    // the internal conflict
+    server1Site2.invoke(() -> {
+      InternalRegion region =
+          ClusterStartupRule.getCache().getInternalRegionByPath("/" + 
REGION_NAME);
+      region.getAttributesMutator().setCacheWriter(new 
TestCacheWriterDelayWritingOfEntry(
+          ENTRY_CONFLICT_RESOLUTION_WINNER, ENTRY_CONFLICT_RESOLUTION_LOSER));
+    });
+
+    clientConnectedToServer2Site2.invokeAsync(() -> executePutOperation(
+        ENTRY_CONFLICT_RESOLUTION_WINNER));
+
+    server1Site2.invoke(() -> await().untilAsserted(() -> assertThat(
+        
InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.ENTRY_CONFLICT_WINNER_HAS_REACHED_THE_REDUNDANT_SERVER)
+            .isTrue()));
+
+    clientConnectedToServer1Site2.invokeAsync(() -> executePutOperation(
+        ENTRY_CONFLICT_RESOLUTION_LOSER));
+
+    // Check that expected entry has won the internal conflict resolution
+    
waitUntilEventIsConsistentlyReplicatedAcrossServers(ENTRY_CONFLICT_RESOLUTION_WINNER,
+        server1Site2,
+        server2Site2);
+
+    server2Site2.invoke(
+        
InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest::awaitQueueSize);
+    executeGatewaySenderActionCommandSite2(CliStrings.RESUME_GATEWAYSENDER);
+
+    // check that expected event is replicated to the remote cluster
+    
waitUntilEventIsConsistentlyReplicatedAcrossServers(ENTRY_CONFLICT_RESOLUTION_WINNER,
+        server1Site1,
+        server2Site1);
+  }
+
+  private void waitUntilEventIsConsistentlyReplicatedAcrossServers(
+      final Map.Entry<Integer, Integer> entry,
+      MemberVM... servers) {
+    await().untilAsserted(() -> 
isEventIsConsistentlyReplicatedAcrossServers(entry, servers));
+  }
+
+  private static void isEventIsConsistentlyReplicatedAcrossServers(
+      final Map.Entry<Integer, Integer> entry,
+      MemberVM... servers) {
+    for (MemberVM server : servers) {
+      assertThat(server.invoke(() -> doesEventExistOnServer(entry))).isTrue();
+    }
+  }
+
+  private static boolean doesEventExistOnServer(Map.Entry<Integer, Integer> 
entry) {
+    Region<Integer, Integer> region =
+        ClusterStartupRule.getCache().getRegion("/" + REGION_NAME);
+    return Objects.equals(region.get(entry.getKey()), entry.getValue());
+  }
+
+  private void executeGatewaySenderActionCommandSite2(final String action) 
throws Exception {
+    connectGfshToSite(locator1Site2);
+    CommandStringBuilder regionCmd = new CommandStringBuilder(action);
+    regionCmd.addOption(CliStrings.MEMBERS, server2Site2.getName());
+    regionCmd.addOption(CliStrings.PAUSE_GATEWAYSENDER__ID, GATEWAY_SENDER_ID);
+    gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess();
+
+    verifyGatewaySenderState(server2Site2, 
CliStrings.PAUSE_GATEWAYSENDER.equals(action));
+  }
+
+  private void executePutOperation(Map.Entry<Integer, Integer> entry) {
+    Region<Integer, Integer> region =
+        ClusterStartupRule.clientCacheRule.getCache().getRegion(REGION_NAME);
+    region.put(entry.getKey(), entry.getValue());
+  }
+
+  private static void awaitQueueSize() {
+    await()
+        .untilAsserted(() -> validateQueueSize(
+            
InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.GATEWAY_SENDER_ID,
+            3));
+  }
+
+  private static void validateQueueSize(String senderId, int numQueueEntries) {
+    GatewaySender sender = 
ClusterStartupRule.getCache().getGatewaySender(senderId);
+    Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
+    int size = 0;
+    for (RegionQueue q : queues) {
+      size += q.size();
+    }
+    assertThat(size).isEqualTo(numQueueEntries);
+  }
+
+  private static void verifyReceiverState() {
+    Set<GatewayReceiver> receivers = 
ClusterStartupRule.getCache().getGatewayReceivers();
+    for (GatewayReceiver receiver : receivers) {
+      assertThat(receiver.isRunning()).isEqualTo(true);
+    }
+  }
+
+  private void verifyGatewaySenderState(MemberVM memberVM, boolean isPaused) {
+    memberVM.invoke(() -> verifySenderState(GATEWAY_SENDER_ID, true, 
isPaused));
+    locator1Site2.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(memberVM.getVM()), 
GATEWAY_SENDER_ID, true,
+            isPaused));
+  }
+
+  private static InternalDistributedMember getMember(final VM vm) {
+    return vm.invoke(() -> ClusterStartupRule.getCache().getMyId());
+  }
+
+  private void startClientToServer1Site2(final int serverPort) throws 
Exception {
+    clientConnectedToServer1Site2 =
+        clusterStartupRule.startClientVM(8, c -> 
c.withServerConnection(serverPort));
+    clientConnectedToServer1Site2.invoke(() -> {
+      ClusterStartupRule.clientCacheRule.createProxyRegion(REGION_NAME);
+    });
+  }
+
+  private void startClientToServer2Site2(final int serverPort) throws 
Exception {
+    clientConnectedToServer2Site2 =
+        clusterStartupRule.startClientVM(4, c -> 
c.withServerConnection(serverPort));
+    clientConnectedToServer2Site2.invoke(() -> {
+      ClusterStartupRule.clientCacheRule.createProxyRegion(REGION_NAME);
+    });
+  }
+
+  private void connectGfshToSite(MemberVM locator) throws Exception {
+    if (gfsh.isConnected()) {
+      gfsh.disconnect();
+    }
+    gfsh.connectAndVerify(locator);
+  }
+
+  public static class TestCacheWriterDelayWritingOfEntry<K, V> implements 
CacheWriter<K, V> {
+    private final Map.Entry<Integer, Integer> entryToDelay;
+
+    private final Map.Entry<Integer, Integer> waitUntilEntry;
+
+    public TestCacheWriterDelayWritingOfEntry(Map.Entry<Integer, Integer> 
entryToDelay,
+        Map.Entry<Integer, Integer> waitUntilEntry) {
+      this.entryToDelay = entryToDelay;
+      this.waitUntilEntry = waitUntilEntry;
+    }
+
+    @Override
+    public void beforeUpdate(EntryEvent<K, V> event) throws 
CacheWriterException {
+      Region<Integer, Integer> region = 
ClusterStartupRule.getCache().getRegion("/" + REGION_NAME);
+      int value = (Integer) event.getNewValue();
+      int key = (Integer) event.getKey();
+      if (key == entryToDelay.getKey() && value == entryToDelay.getValue()) {
+        
InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.ENTRY_CONFLICT_WINNER_HAS_REACHED_THE_REDUNDANT_SERVER
 =
+            true;
+        await().untilAsserted(() -> 
assertThat(region.get(waitUntilEntry.getKey()))
+            .isEqualTo(waitUntilEntry.getValue()));
+      }
+    }
+
+    @Override
+    public void beforeCreate(EntryEvent<K, V> event) throws 
CacheWriterException {}
+
+    @Override
+    public void beforeDestroy(EntryEvent<K, V> event) throws 
CacheWriterException {}
+
+    @Override
+    public void beforeRegionDestroy(RegionEvent<K, V> event) throws 
CacheWriterException {}
+
+    @Override
+    public void beforeRegionClear(RegionEvent<K, V> event) throws 
CacheWriterException {}
+  }
+}

Reply via email to