This is an automated email from the ASF dual-hosted git repository. igodwin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 3e2d501 GEODE-6819: Fix PartitionedRegionSingleHopDUnitTest BindExceptions 3e2d501 is described below commit 3e2d5015834f50bdf052fd9aafe3dc8cf7d504a4 Author: Kirk Lund <kl...@apache.org> AuthorDate: Fri Apr 3 00:48:26 2020 -0700 GEODE-6819: Fix PartitionedRegionSingleHopDUnitTest BindExceptions Intermittent failures were caused by trying to stop and then restart server(s) with the previously used ports which are no longer free on the host machine. This PR overhauls all of PartitionedRegionSingleHopDUnitTest while also fixing flaky failures caused by underlying BindExceptions in both: testMetadataIsSameOnAllServersAndClients testSingleHopWithHAWithLocator --- .../cache/PartitionedRegionSingleHopDUnitTest.java | 1592 +++++++++----------- .../cache/client/internal/InternalClientCache.java | 2 + .../geode/internal/cache/util/UncheckedUtils.java | 6 + 3 files changed, 729 insertions(+), 871 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopDUnitTest.java index dcd5c4f..4ecba96 100755 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopDUnitTest.java @@ -16,35 +16,40 @@ package org.apache.geode.internal.cache; import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.cache.RegionShortcut.LOCAL; +import static org.apache.geode.cache.RegionShortcut.PARTITION; +import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT; import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION; import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; +import static org.apache.geode.internal.cache.util.UncheckedUtils.cast; import static org.apache.geode.internal.lang.SystemPropertyHelper.GEMFIRE_PREFIX; import static org.apache.geode.management.ManagementService.getExistingManagementService; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout; -import static org.apache.geode.test.dunit.Disconnect.disconnectAllFromDS; import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException; import static org.apache.geode.test.dunit.VM.getController; import static org.apache.geode.test.dunit.VM.getVM; import static org.apache.geode.test.dunit.VM.getVMId; +import static org.apache.geode.test.dunit.rules.DistributedRule.getDistributedSystemProperties; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.io.UncheckedIOException; +import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import java.util.Properties; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.junit.After; import org.junit.Before; @@ -54,32 +59,37 @@ import org.junit.experimental.categories.Category; import org.apache.geode.DataSerializable; import org.apache.geode.DataSerializer; +import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.EntryOperation; import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.PartitionResolver; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionFactory; -import org.apache.geode.cache.Scope; -import org.apache.geode.cache.client.Pool; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.PoolFactory; import org.apache.geode.cache.client.PoolManager; import org.apache.geode.cache.client.internal.ClientMetadataService; import org.apache.geode.cache.client.internal.ClientPartitionAdvisor; +import org.apache.geode.cache.client.internal.InternalClientCache; import org.apache.geode.cache.execute.FunctionAdapter; import org.apache.geode.cache.execute.FunctionContext; import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.execute.RegionFunctionContext; import org.apache.geode.cache.server.CacheServer; -import org.apache.geode.distributed.Locator; +import org.apache.geode.distributed.LocatorLauncher; +import org.apache.geode.distributed.ServerLauncher; import org.apache.geode.distributed.internal.ServerLocation; -import org.apache.geode.internal.AvailablePort; import org.apache.geode.internal.cache.BucketAdvisor.ServerBucketProfile; +import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException; import org.apache.geode.management.ManagementService; import org.apache.geode.management.membership.MembershipEvent; import org.apache.geode.management.membership.UniversalMembershipListenerAdapter; import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.DUnitEnv; import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.rules.CacheRule; import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; import org.apache.geode.test.dunit.rules.DistributedRule; import org.apache.geode.test.junit.categories.ClientServerTest; @@ -89,21 +99,24 @@ import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolde @SuppressWarnings("serial") public class PartitionedRegionSingleHopDUnitTest implements Serializable { - private static final String PR_NAME = "single_hop_pr"; - private static final String ORDER = "ORDER"; - private static final String CUSTOMER = "CUSTOMER"; - private static final String SHIPMENT = "SHIPMENT"; + private static final String PARTITIONED_REGION_NAME = "single_hop_pr"; + private static final String ORDER_REGION_NAME = "ORDER"; + private static final String CUSTOMER_REGION_NAME = "CUSTOMER"; + private static final String SHIPMENT_REGION_NAME = "SHIPMENT"; + private static final String REPLICATE_REGION_NAME = "rr"; + private static final int LOCAL_MAX_MEMORY_DEFAULT = -1; - private static final AtomicReference<CountDownLatch> LATCH = new AtomicReference<>(); + private static final ServerLauncher DUMMY_SERVER = mock(ServerLauncher.class); + private static final LocatorLauncher DUMMY_LOCATOR = mock(LocatorLauncher.class); + private static final InternalClientCache DUMMY_CLIENT = mock(InternalClientCache.class); + private static final InternalCache DUMMY_CACHE = mock(InternalCache.class); + private static final AtomicReference<ServerLauncher> SERVER = new AtomicReference<>(); + private static final AtomicReference<LocatorLauncher> LOCATOR = new AtomicReference<>(); + private static final AtomicReference<InternalClientCache> CLIENT = new AtomicReference<>(); + private static final AtomicReference<InternalCache> CACHE = new AtomicReference<>(); - private static volatile Region<Object, Object> testRegion; - private static volatile Region<Object, Object> customerRegion; - private static volatile Region<Object, Object> orderRegion; - private static volatile Region<Object, Object> shipmentRegion; - private static volatile Region<Object, Object> replicatedRegion; - private static volatile InternalCache cache; - private static volatile Locator locator; + private static final AtomicReference<CountDownLatch> LATCH = new AtomicReference<>(); private String diskStoreName; @@ -115,8 +128,6 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { @Rule public DistributedRule distributedRule = new DistributedRule(); @Rule - public CacheRule cacheRule = new CacheRule(); - @Rule public DistributedRestoreSystemProperties restoreProps = new DistributedRestoreSystemProperties(); @Rule public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); @@ -131,24 +142,27 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { vm1 = getVM(1); vm2 = getVM(2); vm3 = getVM(3); + + for (VM vm : asList(getController(), vm0, vm1, vm2, vm3)) { + vm.invoke(() -> { + CLIENT.set(DUMMY_CLIENT); + CACHE.set(DUMMY_CACHE); + SERVER.set(DUMMY_SERVER); + LOCATOR.set(DUMMY_LOCATOR); + }); + } } @After public void tearDown() { for (VM vm : asList(getController(), vm0, vm1, vm2, vm3)) { vm.invoke(() -> { - cacheRule.closeAndNullCache(); - locator = null; - cache = null; - testRegion = null; - customerRegion = null; - orderRegion = null; - shipmentRegion = null; - replicatedRegion = null; + CLIENT.getAndSet(DUMMY_CLIENT).close(); + CACHE.getAndSet(DUMMY_CACHE).close(); + SERVER.getAndSet(DUMMY_SERVER).stop(); + LOCATOR.getAndSet(DUMMY_LOCATOR).stop(); }); } - - disconnectAllFromDS(); } /** @@ -156,13 +170,11 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { */ @Test public void testNoClient() throws Exception { - vm0.invoke(() -> createServer(1, 4)); - vm1.invoke(() -> createServer(1, 4)); - - vm2.invoke(() -> createPeer()); - vm3.invoke(() -> createPeer()); - - createAccessorServer(); + vm0.invoke(() -> createServer(-1, 1, 4)); + vm1.invoke(() -> createServer(-1, 1, 4)); + vm2.invoke(() -> createAccessorPeer(1, 4)); + vm3.invoke(() -> createAccessorPeer(1, 4)); + createAccessorServer(1, 4); for (VM vm : asList(getController(), vm0, vm1, vm2, vm3)) { vm.invoke(() -> clearMetadata()); @@ -173,15 +185,17 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { } for (VM vm : asList(getController(), vm0, vm1, vm2, vm3)) { - vm.invoke(() -> getFromPartitionedRegions()); + vm.invoke(() -> doGets()); } for (VM vm : asList(getController(), vm0, vm1, vm2, vm3)) { - vm.invoke(() -> verifyEmptyMetadata()); - } + vm.invoke(() -> { + ClientMetadataService clientMetadataService = + getInternalCache(SERVER.get()).getClientMetadataService(); - for (VM vm : asList(getController(), vm0, vm1, vm2, vm3)) { - vm.invoke(() -> verifyEmptyStaticData()); + assertThat(clientMetadataService.getClientPRMetadata_TEST_ONLY()).isEmpty(); + assertThat(clientMetadataService.getClientPartitionAttributesMap()).isEmpty(); + }); } } @@ -191,21 +205,19 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { */ @Test public void testClientConnectedToAccessors() { - int port0 = vm0.invoke(() -> createAccessorServer()); - int port1 = vm1.invoke(() -> createAccessorServer()); - - vm2.invoke(() -> createPeer()); - vm3.invoke(() -> createPeer()); - - createClient(port0, port1); + int port0 = vm0.invoke(() -> createAccessorServer(1, 4)); + int port1 = vm1.invoke(() -> createAccessorServer(1, 4)); + vm2.invoke(() -> createAccessorPeer(1, 4)); + vm3.invoke(() -> createAccessorPeer(1, 4)); + createClient(250, true, true, true, port0, port1); putIntoPartitionedRegions(); + doGets(); - getFromPartitionedRegions(); - - verifyEmptyMetadata(); + ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService(); - verifyEmptyStaticData(); + assertThat(clientMetadataService.getClientPRMetadata_TEST_ONLY()).isEmpty(); + assertThat(clientMetadataService.getClientPartitionAttributesMap()).isEmpty(); } /** @@ -214,22 +226,19 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { */ @Test public void testClientConnectedTo1Server() { - int port0 = vm0.invoke(() -> createServer(1, 4)); - - vm1.invoke(() -> createPeer()); - vm2.invoke(() -> createPeer()); - - vm3.invoke(() -> createAccessorServer()); - - createClient(port0); + int port0 = vm0.invoke(() -> createServer(-1, 1, 4)); + vm1.invoke(() -> createAccessorPeer(1, 4)); + vm2.invoke(() -> createAccessorPeer(1, 4)); + vm3.invoke(() -> createAccessorServer(1, 4)); + createClient(250, true, true, true, port0); putIntoPartitionedRegions(); + doGets(); - getFromPartitionedRegions(); - - verifyEmptyMetadata(); + ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService(); - verifyEmptyStaticData(); + assertThat(clientMetadataService.getClientPRMetadata_TEST_ONLY()).isEmpty(); + assertThat(clientMetadataService.getClientPartitionAttributesMap()).isEmpty(); } /** @@ -239,19 +248,48 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { */ @Test public void testMetadataContents() { - int port0 = vm0.invoke(() -> createServer(1, 4)); - int port1 = vm1.invoke(() -> createServer(1, 4)); - int port2 = vm2.invoke(() -> createServer(1, 4)); - int port3 = vm3.invoke(() -> createServer(1, 4)); - - createClient(port0, port1, port2, port3); + int port0 = vm0.invoke(() -> createServer(-1, 1, 4)); + int port1 = vm1.invoke(() -> createServer(-1, 1, 4)); + int port2 = vm2.invoke(() -> createServer(-1, 1, 4)); + int port3 = vm3.invoke(() -> createServer(-1, 1, 4)); + createClient(100, true, false, true, port0, port1, port2, port3); putIntoPartitionedRegions(); + doGets(); - getFromPartitionedRegions(); + ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService(); - verifyMetadata(); - updateIntoSinglePR(); + await().untilAsserted(() -> { + assertThat(clientMetadataService.getRefreshTaskCount_TEST_ONLY()).isZero(); + }); + + clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); + + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); + + partitionedRegion.put(0, "update0"); + assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); + + partitionedRegion.put(1, "update1"); + assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); + + partitionedRegion.put(2, "update2"); + assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); + + partitionedRegion.put(3, "update3"); + assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); + + partitionedRegion.put(0, "update00"); + assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); + + partitionedRegion.put(1, "update11"); + assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); + + partitionedRegion.put(2, "update22"); + assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); + + partitionedRegion.put(3, "update33"); + assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); } /** @@ -261,21 +299,22 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { */ @Test public void testMetadataServiceCallAccuracy() { - int port0 = vm0.invoke(() -> createServer(1, 4)); - int port1 = vm1.invoke(() -> createServer(1, 4)); - - vm2.invoke(() -> createClient(port0)); - createClient(port1); + int port0 = vm0.invoke(() -> createServer(-1, 1, 4)); + int port1 = vm1.invoke(() -> createServer(-1, 1, 4)); + vm2.invoke(() -> createClient(250, true, true, true, port0)); + createClient(250, true, true, true, port1); - vm2.invoke(() -> putIntoSinglePR()); + vm2.invoke(() -> doPuts(getRegion(PARTITIONED_REGION_NAME))); - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); + ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService(); clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); - testRegion.put(0, "create0"); - testRegion.put(1, "create1"); - testRegion.put(2, "create2"); - testRegion.put(3, "create3"); + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); + + partitionedRegion.put(0, "create0"); + partitionedRegion.put(1, "create1"); + partitionedRegion.put(2, "create2"); + partitionedRegion.put(3, "create3"); await().untilAsserted(() -> { assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isTrue(); @@ -288,10 +327,10 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); - testRegion.put(0, "create0"); - testRegion.put(1, "create1"); - testRegion.put(2, "create2"); - testRegion.put(3, "create3"); + partitionedRegion.put(0, "create0"); + partitionedRegion.put(1, "create1"); + partitionedRegion.put(2, "create2"); + partitionedRegion.put(3, "create3"); await().untilAsserted(() -> { assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); @@ -300,21 +339,22 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { @Test public void testMetadataServiceCallAccuracy_FromDestroyOp() { - int port0 = vm0.invoke(() -> createServer(0, 4)); - int port1 = vm1.invoke(() -> createServer(0, 4)); + int port0 = vm0.invoke(() -> createServer(-1, 0, 4)); + int port1 = vm1.invoke(() -> createServer(-1, 0, 4)); + vm2.invoke(() -> createClient(250, true, true, true, port0)); + createClient(250, true, true, true, port1); - vm2.invoke(() -> createClient(port0)); - createClient(port1); + vm2.invoke(() -> doPuts(getRegion(PARTITIONED_REGION_NAME))); - vm2.invoke(() -> putIntoSinglePR()); - - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); + ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService(); clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); - testRegion.destroy(0); - testRegion.destroy(1); - testRegion.destroy(2); - testRegion.destroy(3); + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); + + partitionedRegion.destroy(0); + partitionedRegion.destroy(1); + partitionedRegion.destroy(2); + partitionedRegion.destroy(3); await().untilAsserted(() -> { assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isTrue(); @@ -323,21 +363,22 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { @Test public void testMetadataServiceCallAccuracy_FromGetOp() { - int port0 = vm0.invoke(() -> createServer(0, 4)); - int port1 = vm1.invoke(() -> createServer(0, 4)); + int port0 = vm0.invoke(() -> createServer(-1, 0, 4)); + int port1 = vm1.invoke(() -> createServer(-1, 0, 4)); + vm2.invoke(() -> createClient(250, true, true, true, port0)); + createClient(250, true, true, true, port1); - vm2.invoke(() -> createClient(port0)); - createClient(port1); + vm2.invoke(() -> doPuts(getRegion(PARTITIONED_REGION_NAME))); - vm2.invoke(() -> putIntoSinglePR()); - - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); + ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService(); clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); - testRegion.get(0); - testRegion.get(1); - testRegion.get(2); - testRegion.get(3); + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); + + partitionedRegion.get(0); + partitionedRegion.get(1); + partitionedRegion.get(2); + partitionedRegion.get(3); await().untilAsserted(() -> { assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isTrue(); @@ -345,10 +386,10 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); - testRegion.get(0); - testRegion.get(1); - testRegion.get(2); - testRegion.get(3); + partitionedRegion.get(0); + partitionedRegion.get(1); + partitionedRegion.get(2); + partitionedRegion.get(3); await().untilAsserted(() -> { assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); @@ -357,24 +398,25 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { @Test public void testSingleHopWithHA() { - int port0 = vm0.invoke(() -> createServer(0, 8)); - int port1 = vm1.invoke(() -> createServer(0, 8)); - int port2 = vm2.invoke(() -> createServer(0, 8)); - int port3 = vm3.invoke(() -> createServer(0, 8)); + int port0 = vm0.invoke(() -> createServer(-1, 0, 8)); + int port1 = vm1.invoke(() -> createServer(-1, 0, 8)); + int port2 = vm2.invoke(() -> createServer(-1, 0, 8)); + int port3 = vm3.invoke(() -> createServer(-1, 0, 8)); + createClient(100, true, false, true, port0, port1, port2, port3); - createClient(port0, port1, port2, port3); - - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); + ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService(); clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); + // put for (int i = 1; i <= 16; i++) { - testRegion.put(i, i); + partitionedRegion.put(i, i); } // update for (int i = 1; i <= 16; i++) { - testRegion.put(i, i + 1); + partitionedRegion.put(i, i + 1); } await().untilAsserted(() -> { @@ -386,64 +428,59 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { // again update for (int i = 1; i <= 16; i++) { - testRegion.put(i, i + 10); + partitionedRegion.put(i, i + 10); } } @Test public void testSingleHopWithHAWithLocator() { - int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - String locator = "localhost[" + port3 + "]"; + int locatorPort = vm3.invoke(() -> startLocator()); + String locators = "localhost[" + locatorPort + "]"; - vm3.invoke(() -> startLocatorInVM(port3)); + vm0.invoke(() -> createServer(locators, null, LOCAL_MAX_MEMORY_DEFAULT, 0, 8)); + vm1.invoke(() -> createServer(locators, null, LOCAL_MAX_MEMORY_DEFAULT, 0, 8)); + vm2.invoke(() -> createServer(locators, null, LOCAL_MAX_MEMORY_DEFAULT, 0, 8)); + createClient(250, true, true, false, locatorPort); - try { - vm0.invoke(() -> createServerWithLocator(locator)); - vm1.invoke(() -> createServerWithLocator(locator)); - vm2.invoke(() -> createServerWithLocator(locator)); + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); - createClientWithLocator("localhost", port3); - - // put - for (int i = 1; i <= 16; i++) { - testRegion.put(i, i); - } - - // update - for (int i = 1; i <= 16; i++) { - testRegion.put(i, i + 1); - } + // put + for (int i = 1; i <= 16; i++) { + partitionedRegion.put(i, i); + } - // kill server - vm0.invoke(() -> stopServer()); + // update + for (int i = 1; i <= 16; i++) { + partitionedRegion.put(i, i + 1); + } - // again update - for (int i = 1; i <= 16; i++) { - testRegion.put(i, i + 10); - } + // kill server + vm0.invoke(() -> stopServer()); - } finally { - vm3.invoke(() -> stopLocator()); + // again update + for (int i = 1; i <= 16; i++) { + partitionedRegion.put(i, i + 10); } } @Test public void testNoMetadataServiceCall_ForGetOp() { - int port0 = vm0.invoke(() -> createServer(0, 4)); - int port1 = vm1.invoke(() -> createServer(0, 4)); - - vm2.invoke(() -> createClientWithoutPRSingleHopEnabled(port0)); - createClientWithoutPRSingleHopEnabled(port1); + int port0 = vm0.invoke(() -> createServer(-1, 0, 4)); + int port1 = vm1.invoke(() -> createServer(-1, 0, 4)); + vm2.invoke(() -> createClient(250, false, true, true, port0)); + createClient(250, false, true, true, port1); - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); + ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService(); clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); - vm2.invoke(() -> putIntoSinglePR()); + vm2.invoke(() -> doPuts(getRegion(PARTITIONED_REGION_NAME))); - testRegion.get(0); - testRegion.get(1); - testRegion.get(2); - testRegion.get(3); + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); + + partitionedRegion.get(0); + partitionedRegion.get(1); + partitionedRegion.get(2); + partitionedRegion.get(3); await().untilAsserted(() -> { assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); @@ -451,10 +488,10 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); - testRegion.get(0); - testRegion.get(1); - testRegion.get(2); - testRegion.get(3); + partitionedRegion.get(0); + partitionedRegion.get(1); + partitionedRegion.get(2); + partitionedRegion.get(3); await().untilAsserted(() -> { assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); @@ -463,30 +500,31 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { @Test public void testNoMetadataServiceCall() { - int port0 = vm0.invoke(() -> createServer(1, 4)); - int port1 = vm1.invoke(() -> createServer(1, 4)); + int port0 = vm0.invoke(() -> createServer(-1, 1, 4)); + int port1 = vm1.invoke(() -> createServer(-1, 1, 4)); + vm2.invoke(() -> createClient(250, false, true, true, port0)); + createClient(250, false, true, true, port1); - vm2.invoke(() -> createClientWithoutPRSingleHopEnabled(port0)); - createClientWithoutPRSingleHopEnabled(port1); + vm2.invoke(() -> doPuts(getRegion(PARTITIONED_REGION_NAME))); - vm2.invoke(() -> putIntoSinglePR()); + ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService(); - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); - testRegion.put(0, "create0"); + partitionedRegion.put(0, "create0"); boolean metadataRefreshed_get1 = clientMetadataService.isRefreshMetadataTestOnly(); clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); - testRegion.put(1, "create1"); + partitionedRegion.put(1, "create1"); boolean metadataRefreshed_get2 = clientMetadataService.isRefreshMetadataTestOnly(); clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); - testRegion.put(2, "create2"); + partitionedRegion.put(2, "create2"); boolean metadataRefreshed_get3 = clientMetadataService.isRefreshMetadataTestOnly(); clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); - testRegion.put(3, "create3"); + partitionedRegion.put(3, "create3"); boolean metadataRefreshed_get4 = clientMetadataService.isRefreshMetadataTestOnly(); await().untilAsserted(() -> { @@ -498,10 +536,10 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); - testRegion.put(0, "create0"); - testRegion.put(1, "create1"); - testRegion.put(2, "create2"); - testRegion.put(3, "create3"); + partitionedRegion.put(0, "create0"); + partitionedRegion.put(1, "create1"); + partitionedRegion.put(2, "create2"); + partitionedRegion.put(3, "create3"); await().untilAsserted(() -> { assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); @@ -510,21 +548,22 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { @Test public void testNoMetadataServiceCall_ForDestroyOp() { - int port0 = vm0.invoke(() -> createServer(0, 4)); - int port1 = vm1.invoke(() -> createServer(0, 4)); - - vm2.invoke(() -> createClientWithoutPRSingleHopEnabled(port0)); - createClientWithoutPRSingleHopEnabled(port1); + int port0 = vm0.invoke(() -> createServer(-1, 0, 4)); + int port1 = vm1.invoke(() -> createServer(-1, 0, 4)); + vm2.invoke(() -> createClient(250, false, true, true, port0)); + createClient(250, false, true, true, port1); - vm2.invoke(() -> putIntoSinglePR()); + vm2.invoke(() -> doPuts(getRegion(PARTITIONED_REGION_NAME))); - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); + ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService(); clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); - testRegion.destroy(0); - testRegion.destroy(1); - testRegion.destroy(2); - testRegion.destroy(3); + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); + + partitionedRegion.destroy(0); + partitionedRegion.destroy(1); + partitionedRegion.destroy(2); + partitionedRegion.destroy(3); await().untilAsserted(() -> { assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); @@ -534,36 +573,41 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { @Test public void testServerLocationRemovalThroughPing() throws Exception { LATCH.set(new CountDownLatch(2)); + int redundantCopies = 3; int totalNumberOfBuckets = 4; - int port0 = vm0.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); - int port1 = vm1.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); - int port2 = vm2.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); - int port3 = vm3.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); - - createClient(port0, port1, port2, port3); + int port0 = vm0.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets)); + int port1 = vm1.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets)); + int port2 = vm2.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets)); + int port3 = vm3.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets)); + createOldClient(100, true, false, true, port0, port1, port2, port3); - ManagementService managementService = getExistingManagementService(cache); + ManagementService managementService = getExistingManagementService(CACHE.get()); new MemberCrashedListener(LATCH.get()).registerMembershipListener(managementService); putIntoPartitionedRegions(); - getFromPartitionedRegions(); + doGets(); - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); + ClientMetadataService clientMetadataService = CACHE.get().getClientMetadataService(); Map<String, ClientPartitionAdvisor> clientPRMetadata = clientMetadataService.getClientPRMetadata_TEST_ONLY(); + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); + Region<Object, Object> customerRegion = getRegion(CUSTOMER_REGION_NAME); + Region<Object, Object> orderRegion = getRegion(ORDER_REGION_NAME); + Region<Object, Object> shipmentRegion = getRegion(SHIPMENT_REGION_NAME); + await().untilAsserted(() -> { assertThat(clientPRMetadata) .hasSize(4) - .containsKey(testRegion.getFullPath()) + .containsKey(partitionedRegion.getFullPath()) .containsKey(customerRegion.getFullPath()) .containsKey(orderRegion.getFullPath()) .containsKey(shipmentRegion.getFullPath()); }); - ClientPartitionAdvisor prMetadata = clientPRMetadata.get(testRegion.getFullPath()); + ClientPartitionAdvisor prMetadata = clientPRMetadata.get(partitionedRegion.getFullPath()); assertThat(prMetadata.getBucketServerLocationsMap_TEST_ONLY()).hasSize(totalNumberOfBuckets); for (Entry entry : prMetadata.getBucketServerLocationsMap_TEST_ONLY().entrySet()) { @@ -575,43 +619,45 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { LATCH.get().await(getTimeout().getValueInMS(), MILLISECONDS); - getFromPartitionedRegions(); + doGets(); verifyDeadServer(clientPRMetadata, customerRegion, port0, port1); - verifyDeadServer(clientPRMetadata, testRegion, port0, port1); + verifyDeadServer(clientPRMetadata, partitionedRegion, port0, port1); } @Test public void testMetadataFetchOnlyThroughFunctions() { // Workaround for 52004 - addIgnoredException("InternalFunctionInvocationTargetException"); + addIgnoredException(InternalFunctionInvocationTargetException.class); + int redundantCopies = 3; int totalNumberOfBuckets = 4; - int port0 = vm0.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); - int port1 = vm1.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); - int port2 = vm2.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); - int port3 = vm3.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); + int port0 = vm0.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets)); + int port1 = vm1.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets)); + int port2 = vm2.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets)); + int port3 = vm3.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets)); + createClient(100, true, false, true, port0, port1, port2, port3); - createClient(port0, port1, port2, port3); + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); - executeFunctions(); + executeFunctions(partitionedRegion); - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); + ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService(); Map<String, ClientPartitionAdvisor> clientPRMetadata = clientMetadataService.getClientPRMetadata_TEST_ONLY(); await().untilAsserted(() -> { - assertThat(clientPRMetadata).hasSize(1); + assertThat(clientPRMetadata) + .hasSize(1) + .containsKey(partitionedRegion.getFullPath()); }); - assertThat(clientPRMetadata).containsKey(testRegion.getFullPath()); - - ClientPartitionAdvisor prMetadata = clientPRMetadata.get(testRegion.getFullPath()); + ClientPartitionAdvisor prMetadata = clientPRMetadata.get(partitionedRegion.getFullPath()); await().untilAsserted(() -> { + clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion); assertThat(prMetadata.getBucketServerLocationsMap_TEST_ONLY()).hasSize(totalNumberOfBuckets); - clientMetadataService.getClientPRMetadata((InternalRegion) testRegion); }); } @@ -620,61 +666,63 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { int redundantCopies = 3; int totalNumberOfBuckets = 4; - int port0 = vm0.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); - int port1 = vm1.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); - int port2 = vm2.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); - int port3 = vm3.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); + int port0 = vm0.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets)); + int port1 = vm1.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets)); + int port2 = vm2.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets)); + int port3 = vm3.invoke(() -> createServer(-1, redundantCopies, totalNumberOfBuckets)); + createClient(100, true, false, true, port0, port1, port2, port3); - createClient(port0, port1, port2, port3); + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); - putAll(); + doPutAlls(partitionedRegion); - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); + ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService(); Map<String, ClientPartitionAdvisor> clientPRMetadata = clientMetadataService.getClientPRMetadata_TEST_ONLY(); + ClientPartitionAdvisor prMetadata = clientPRMetadata.get(partitionedRegion.getFullPath()); await().untilAsserted(() -> { - assertThat(clientPRMetadata).hasSize(1); - }); - - assertThat(clientPRMetadata).containsKey(testRegion.getFullPath()); - - ClientPartitionAdvisor prMetadata = clientPRMetadata.get(testRegion.getFullPath()); - - await().untilAsserted(() -> { - assertThat(prMetadata.getBucketServerLocationsMap_TEST_ONLY()).hasSize(totalNumberOfBuckets); + assertThat(clientPRMetadata) + .hasSize(1) + .containsKey(partitionedRegion.getFullPath()); + assertThat(prMetadata.getBucketServerLocationsMap_TEST_ONLY()) + .hasSize(totalNumberOfBuckets); }); } @Test public void testMetadataIsSameOnAllServersAndClients() { + int locatorPort = DUnitEnv.get().getLocatorPort(); + String locators = "localhost[" + locatorPort + "]"; + int redundantCopies = 3; int totalNumberOfBuckets = 4; - int port0 = vm0.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); - int port1 = vm1.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); - int port2 = vm2.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); - int port3 = vm3.invoke(() -> createServer(redundantCopies, totalNumberOfBuckets)); + vm0.invoke(() -> createServer(locators, null, -1, redundantCopies, totalNumberOfBuckets)); + vm1.invoke(() -> createServer(locators, null, -1, redundantCopies, totalNumberOfBuckets)); + vm2.invoke(() -> createServer(locators, null, -1, redundantCopies, totalNumberOfBuckets)); + vm3.invoke(() -> createServer(locators, null, -1, redundantCopies, totalNumberOfBuckets)); + createClient(100, true, false, false, locatorPort); - createClient(port0, port1, port2, port3); + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); - put(); + doManyPuts(partitionedRegion); for (VM vm : asList(vm0, vm1, vm2, vm3)) { vm.invoke(() -> waitForLocalBucketsCreation()); } - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); - clientMetadataService.getClientPRMetadata((InternalRegion) testRegion); + ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService(); + clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion); Map<String, ClientPartitionAdvisor> clientPRMetadata = clientMetadataService.getClientPRMetadata_TEST_ONLY(); assertThat(clientPRMetadata) .hasSize(1) - .containsKey(testRegion.getFullPath()); + .containsKey(partitionedRegion.getFullPath()); - ClientPartitionAdvisor prMetadata = clientPRMetadata.get(testRegion.getFullPath()); + ClientPartitionAdvisor prMetadata = clientPRMetadata.get(partitionedRegion.getFullPath()); Map<Integer, List<BucketServerLocation66>> clientBucketMap = prMetadata.getBucketServerLocationsMap_TEST_ONLY(); @@ -693,25 +741,25 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { vm0.invoke(() -> stopServer()); vm1.invoke(() -> stopServer()); - vm0.invoke(() -> startServerOnPort(port0)); - vm1.invoke(() -> startServerOnPort(port1)); + vm0.invoke(() -> startServer()); + vm1.invoke(() -> startServer()); - put(); + doManyPuts(partitionedRegion); for (VM vm : asList(vm0, vm1, vm2, vm3)) { vm.invoke(() -> waitForLocalBucketsCreation()); } - await().alias("bucket copies are not created").untilAsserted(() -> { + await().atMost(2, SECONDS).untilAsserted(() -> { Map<String, ClientPartitionAdvisor> clientPRMetadata_await = clientMetadataService.getClientPRMetadata_TEST_ONLY(); assertThat(clientPRMetadata_await) .hasSize(1) - .containsKey(testRegion.getFullPath()); + .containsKey(partitionedRegion.getFullPath()); ClientPartitionAdvisor prMetadata_await = - clientPRMetadata_await.get(testRegion.getFullPath()); + clientPRMetadata_await.get(partitionedRegion.getFullPath()); Map<Integer, List<BucketServerLocation66>> clientBucketMap_await = prMetadata_await.getBucketServerLocationsMap_TEST_ONLY(); @@ -722,20 +770,20 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { } }); - clientMetadataService.getClientPRMetadata((InternalRegion) testRegion); + clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion); clientPRMetadata = clientMetadataService.getClientPRMetadata_TEST_ONLY(); assertThat(clientPRMetadata) .hasSize(1) - .containsKey(testRegion.getFullPath()); + .containsKey(partitionedRegion.getFullPath()); - prMetadata = clientPRMetadata.get(testRegion.getFullPath()); + prMetadata = clientPRMetadata.get(partitionedRegion.getFullPath()); Map<Integer, List<BucketServerLocation66>> clientBucketMap2 = prMetadata.getBucketServerLocationsMap_TEST_ONLY(); - await().alias("expected no metadata to be refreshed").untilAsserted(() -> { + await().untilAsserted(() -> { assertThat(clientBucketMap2).hasSize(totalNumberOfBuckets); }); @@ -747,33 +795,34 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { vm.invoke(() -> verifyMetadata(clientBucketMap)); } - vm0.invoke(() -> cacheRule.closeAndNullCache()); - vm1.invoke(() -> cacheRule.closeAndNullCache()); + for (VM vm : asList(vm0, vm1)) { + vm.invoke(() -> { + SERVER.getAndSet(DUMMY_SERVER).stop(); + }); + } - put(); + doManyPuts(partitionedRegion); vm2.invoke(() -> { - PartitionedRegion pr = (PartitionedRegion) testRegion; + PartitionedRegion pr = (PartitionedRegion) getRegion(PARTITIONED_REGION_NAME); pr.getRegionAdvisor().getAllClientBucketProfilesTest(); }); - vm3.invoke(() -> { - PartitionedRegion pr = (PartitionedRegion) testRegion; + PartitionedRegion pr = (PartitionedRegion) getRegion(PARTITIONED_REGION_NAME); pr.getRegionAdvisor().getAllClientBucketProfilesTest(); }); vm2.invoke(() -> waitForLocalBucketsCreation()); vm3.invoke(() -> waitForLocalBucketsCreation()); - clientMetadataService.getClientPRMetadata((InternalRegion) testRegion); - + clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion); clientPRMetadata = clientMetadataService.getClientPRMetadata_TEST_ONLY(); assertThat(clientPRMetadata) .hasSize(1) - .containsKey(testRegion.getFullPath()); + .containsKey(partitionedRegion.getFullPath()); - prMetadata = clientPRMetadata.get(testRegion.getFullPath()); + prMetadata = clientPRMetadata.get(partitionedRegion.getFullPath()); Map<Integer, List<BucketServerLocation66>> clientBucketMap3 = prMetadata.getBucketServerLocationsMap_TEST_ONLY(); @@ -795,36 +844,35 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { public void testMetadataIsSameOnAllServersAndClientsHA() { int totalNumberOfBuckets = 4; - int port0 = vm0.invoke(() -> createServer(2, totalNumberOfBuckets)); - int port1 = vm1.invoke(() -> createServer(2, totalNumberOfBuckets)); - - createClient(port0, port1, port0, port1); + int port0 = vm0.invoke(() -> createServer(-1, 2, totalNumberOfBuckets)); + int port1 = vm1.invoke(() -> createServer(-1, 2, totalNumberOfBuckets)); + createClient(100, true, false, true, port0, port1, port0, port1); - put(); + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); - clientMetadataService.getClientPRMetadata((InternalRegion) testRegion); + doManyPuts(partitionedRegion); + ClientMetadataService clientMetadataService = CLIENT.get().getClientMetadataService(); + clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion); Map<String, ClientPartitionAdvisor> clientPRMetadata = clientMetadataService.getClientPRMetadata_TEST_ONLY(); await().untilAsserted(() -> { - assertThat(clientPRMetadata).hasSize(1); + assertThat(clientPRMetadata) + .hasSize(1) + .containsKey(partitionedRegion.getFullPath()); }); - assertThat(clientPRMetadata).containsKey(testRegion.getFullPath()); - vm0.invoke(() -> { - PartitionedRegion pr = (PartitionedRegion) testRegion; + PartitionedRegion pr = (PartitionedRegion) getRegion(PARTITIONED_REGION_NAME); pr.getRegionAdvisor().getAllClientBucketProfilesTest(); }); - vm1.invoke(() -> { - PartitionedRegion pr = (PartitionedRegion) testRegion; + PartitionedRegion pr = (PartitionedRegion) getRegion(PARTITIONED_REGION_NAME); pr.getRegionAdvisor().getAllClientBucketProfilesTest(); }); - ClientPartitionAdvisor prMetadata = clientPRMetadata.get(testRegion.getFullPath()); + ClientPartitionAdvisor prMetadata = clientPRMetadata.get(partitionedRegion.getFullPath()); Map<Integer, List<BucketServerLocation66>> clientBucketMap = prMetadata.getBucketServerLocationsMap_TEST_ONLY(); @@ -841,9 +889,9 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { vm0.invoke(() -> stopServer()); - put(); + doManyPuts(partitionedRegion); - clientMetadataService.getClientPRMetadata((InternalRegion) testRegion); + clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion); assertThat(clientBucketMap).hasSize(totalNumberOfBuckets); @@ -853,9 +901,9 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { assertThat(clientPRMetadata) .hasSize(1) - .containsKey(testRegion.getFullPath()); - - assertThat(clientBucketMap).hasSize(totalNumberOfBuckets); + .containsKey(partitionedRegion.getFullPath()); + assertThat(clientBucketMap) + .hasSize(totalNumberOfBuckets); await().untilAsserted(() -> { for (Entry<Integer, List<BucketServerLocation66>> entry : clientBucketMap.entrySet()) { @@ -868,10 +916,12 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { public void testClientMetadataForPersistentPrs() throws Exception { LATCH.set(new CountDownLatch(4)); - int port0 = vm0.invoke(() -> createPersistentPrsAndServer()); - int port1 = vm1.invoke(() -> createPersistentPrsAndServer()); - int port2 = vm2.invoke(() -> createPersistentPrsAndServer()); - int port3 = vm3.invoke(() -> createPersistentPrsAndServer()); + int locatorPort = DUnitEnv.get().getLocatorPort(); + + vm0.invoke(() -> createServer("disk", -1, 3, 4)); + vm1.invoke(() -> createServer("disk", -1, 3, 4)); + vm2.invoke(() -> createServer("disk", -1, 3, 4)); + vm3.invoke(() -> createServer("disk", -1, 3, 4)); vm3.invoke(() -> putIntoPartitionedRegions()); @@ -879,574 +929,325 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { vm.invoke(() -> waitForLocalBucketsCreation()); } - createClient(port0, port1, port2, port3); + createOldClient(100, true, false, false, locatorPort); - ManagementService managementService = getExistingManagementService(cache); - MemberCrashedListener listener = new MemberCrashedListener(LATCH.get()); - listener.registerMembershipListener(managementService); + ManagementService managementService = getExistingManagementService(CACHE.get()); + new MemberCrashedListener(LATCH.get()).registerMembershipListener(managementService); + ClientMetadataService clientMetadataService = CACHE.get().getClientMetadataService(); + + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); - await().until(() -> fetchAndValidateMetadata()); + await().until(() -> { + clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion); + + Map<ServerLocation, Set<Integer>> serverBucketMap = + clientMetadataService.groupByServerToAllBuckets(partitionedRegion, true); + + return serverBucketMap != null; + }); for (VM vm : asList(vm0, vm1, vm2, vm3)) { - vm.invoke(() -> cacheRule.closeAndNullCache()); + vm.invoke(() -> { + SERVER.getAndSet(DUMMY_SERVER).stop(); + }); } LATCH.get().await(getTimeout().getValueInMS(), MILLISECONDS); - AsyncInvocation<Void> createServerOnVM3 = - vm3.invokeAsync(() -> createPersistentPrsAndServerOnPort(port3)); - AsyncInvocation<Void> createServerOnVM2 = - vm2.invokeAsync(() -> createPersistentPrsAndServerOnPort(port2)); - AsyncInvocation<Void> createServerOnVM1 = - vm1.invokeAsync(() -> createPersistentPrsAndServerOnPort(port1)); - AsyncInvocation<Void> createServerOnVM0 = - vm0.invokeAsync(() -> createPersistentPrsAndServerOnPort(port0)); + AsyncInvocation<Integer> createServerOnVM3 = + vm3.invokeAsync(() -> createServer("disk", -1, 3, 4)); + AsyncInvocation<Integer> createServerOnVM2 = + vm2.invokeAsync(() -> createServer("disk", -1, 3, 4)); + AsyncInvocation<Integer> createServerOnVM1 = + vm1.invokeAsync(() -> createServer("disk", -1, 3, 4)); + AsyncInvocation<Integer> createServerOnVM0 = + vm0.invokeAsync(() -> createServer("disk", -1, 3, 4)); createServerOnVM3.await(); createServerOnVM2.await(); createServerOnVM1.await(); createServerOnVM0.await(); - fetchAndValidateMetadata(); + await().untilAsserted(() -> { + clientMetadataService.getClientPRMetadata((InternalRegion) partitionedRegion); + Map<ServerLocation, Set<Integer>> serverBucketMap = + clientMetadataService.groupByServerToAllBuckets(partitionedRegion, true); + + assertThat(serverBucketMap).hasSize(4); + }); } - private boolean fetchAndValidateMetadata() { - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); - clientMetadataService.getClientPRMetadata((InternalRegion) testRegion); + private int createServer(int localMaxMemory, int redundantCopies, int totalNumberOfBuckets) + throws IOException { + return createServer(null, null, localMaxMemory, redundantCopies, totalNumberOfBuckets); + } - Map<ServerLocation, Set<Integer>> serverBucketMap = - clientMetadataService.groupByServerToAllBuckets(testRegion, true); + private int createServer(String diskStoreName, int localMaxMemory, int redundantCopies, + int totalNumberOfBuckets) throws IOException { + return createServer(null, diskStoreName, localMaxMemory, redundantCopies, totalNumberOfBuckets); + } - return serverBucketMap != null; + private int createServer(String locators, String diskStoreName, int localMaxMemory, + int redundantCopies, int totalNumberOfBuckets) throws IOException { + return doCreateServer(locators, 0, diskStoreName, localMaxMemory, LOCAL_MAX_MEMORY_DEFAULT, + redundantCopies, totalNumberOfBuckets); } - private void stopServer() { - for (CacheServer cacheServer : cache.getCacheServers()) { - cacheServer.stop(); - } + private int createAccessorServer(int redundantCopies, int totalNumberOfBuckets) + throws IOException { + return doCreateServer(null, 0, null, 0, 0, redundantCopies, totalNumberOfBuckets); } - private void startLocatorInVM(int locatorPort) throws IOException { - Properties properties = new Properties(); - properties.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false"); + private void createAccessorPeer(int redundantCopies, int totalNumberOfBuckets) + throws IOException { + ServerLauncher serverLauncher = new ServerLauncher.Builder() + .setDeletePidFileOnStop(true) + .setDisableDefaultServer(true) + .setWorkingDirectory(getWorkingDirectory()) + .set(getDistributedSystemProperties()) + .build(); + serverLauncher.start(); - File logFile = new File("locator-" + locatorPort + ".log"); + SERVER.set(serverLauncher); - locator = Locator.startLocatorAndDS(locatorPort, logFile, null, properties); + createRegions(null, -1, -1, redundantCopies, totalNumberOfBuckets); } - private void stopLocator() { - locator.stop(); - } + private int doCreateServer(String locators, int serverPortInput, String diskStoreName, + int localMaxMemory, int localMaxMemoryOthers, int redundantCopies, int totalNumberOfBuckets) + throws IOException { + ServerLauncher.Builder serverBuilder = new ServerLauncher.Builder() + .setDeletePidFileOnStop(true) + .setDisableDefaultServer(true) + .setWorkingDirectory(getWorkingDirectory()) + .set(getDistributedSystemProperties()); + + if (locators != null) { + serverBuilder.set(LOCATORS, locators); + } - private int createServerWithLocator(String locators) throws IOException { - Properties properties = new Properties(); - properties.setProperty(LOCATORS, locators); + ServerLauncher serverLauncher = serverBuilder.build(); + serverLauncher.start(); - cache = cacheRule.getOrCreateCache(properties); + SERVER.set(serverLauncher); - CacheServer cacheServer = cache.addCacheServer(); + CacheServer cacheServer = serverLauncher.getCache().addCacheServer(); cacheServer.setHostnameForClients("localhost"); - cacheServer.setPort(0); + cacheServer.setPort(serverPortInput); cacheServer.start(); - int redundantCopies = 0; - int totalNumberOfBuckets = 8; - - testRegion = createBasicPartitionedRegion(redundantCopies, totalNumberOfBuckets, - LOCAL_MAX_MEMORY_DEFAULT); - - customerRegion = createColocatedRegion(CUSTOMER, null, redundantCopies, totalNumberOfBuckets, - LOCAL_MAX_MEMORY_DEFAULT); - - orderRegion = createColocatedRegion(ORDER, CUSTOMER, redundantCopies, totalNumberOfBuckets, - LOCAL_MAX_MEMORY_DEFAULT); + int serverPort = cacheServer.getPort(); - shipmentRegion = createColocatedRegion(SHIPMENT, ORDER, redundantCopies, totalNumberOfBuckets, - LOCAL_MAX_MEMORY_DEFAULT); + createRegions(diskStoreName, localMaxMemory, localMaxMemoryOthers, redundantCopies, + totalNumberOfBuckets); - return cacheServer.getPort(); + return serverPort; } - private void clearMetadata() { - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); - clientMetadataService.getClientPartitionAttributesMap().clear(); - clientMetadataService.getClientPRMetadata_TEST_ONLY().clear(); - } - - private void verifyMetadata(Map<Integer, List<BucketServerLocation66>> clientBucketMap) { - PartitionedRegion pr = (PartitionedRegion) testRegion; - Map<Integer, Set<ServerBucketProfile>> serverBucketMap = - pr.getRegionAdvisor().getAllClientBucketProfilesTest(); - - assertThat(serverBucketMap).hasSize(clientBucketMap.size()); - assertThat(clientBucketMap.keySet()).containsAll(serverBucketMap.keySet()); - - for (Entry<Integer, List<BucketServerLocation66>> entry : clientBucketMap.entrySet()) { - int bucketId = entry.getKey(); - List<BucketServerLocation66> bucketLocations = entry.getValue(); + private void createRegions(String diskStoreName, int localMaxMemory, int localMaxMemoryOthers, + int redundantCopies, int totalNumberOfBuckets) throws IOException { + createPartitionedRegion(PARTITIONED_REGION_NAME, null, diskStoreName, localMaxMemory, null, + redundantCopies, totalNumberOfBuckets); - BucketServerLocation66 primaryBucketLocation = null; - int countOfPrimaries = 0; - for (BucketServerLocation66 bucketLocation : bucketLocations) { - if (bucketLocation.isPrimary()) { - primaryBucketLocation = bucketLocation; - countOfPrimaries++; - } - } + createPartitionedRegion(CUSTOMER_REGION_NAME, null, diskStoreName, localMaxMemoryOthers, + new CustomerIdPartitionResolver<>(), redundantCopies, totalNumberOfBuckets); - assertThat(countOfPrimaries).isEqualTo(1); + createPartitionedRegion(ORDER_REGION_NAME, CUSTOMER_REGION_NAME, diskStoreName, + localMaxMemoryOthers, + new CustomerIdPartitionResolver<>(), redundantCopies, totalNumberOfBuckets); - Set<ServerBucketProfile> bucketProfiles = serverBucketMap.get(bucketId); - assertThat(bucketProfiles).hasSize(bucketLocations.size()); + createPartitionedRegion(SHIPMENT_REGION_NAME, ORDER_REGION_NAME, diskStoreName, + localMaxMemoryOthers, + new CustomerIdPartitionResolver<>(), redundantCopies, totalNumberOfBuckets); - countOfPrimaries = 0; - for (ServerBucketProfile bucketProfile : bucketProfiles) { - ServerLocation sl = (ServerLocation) bucketProfile.getBucketServerLocations().toArray()[0]; - assertThat(bucketLocations.contains(sl)).isTrue(); - // should be only one primary - if (bucketProfile.isPrimary) { - countOfPrimaries++; - assertThat(sl).isEqualTo(primaryBucketLocation); - } - } - assertThat(countOfPrimaries).isEqualTo(1); - } + SERVER.get().getCache().createRegionFactory().create(REPLICATE_REGION_NAME); } - private void waitForLocalBucketsCreation() { - PartitionedRegion pr = (PartitionedRegion) testRegion; - - await().untilAsserted(() -> assertThat(pr.getDataStore().getAllLocalBuckets()).hasSize(4)); + private void createClient(int pingInterval, boolean prSingleHopEnabled, + boolean subscriptionEnabled, boolean useServerPool, int... ports) { + CLIENT.set((InternalClientCache) new ClientCacheFactory().set(LOCATORS, "").create()); + String poolName = + createPool(pingInterval, prSingleHopEnabled, subscriptionEnabled, useServerPool, ports); + createRegionsInClientCache(poolName); } - private void verifyDeadServer(Map<String, ClientPartitionAdvisor> regionMetaData, Region region, - int port0, int port1) { - - ClientPartitionAdvisor prMetaData = regionMetaData.get(region.getFullPath()); - - for (Entry<Integer, List<BucketServerLocation66>> entry : prMetaData - .getBucketServerLocationsMap_TEST_ONLY().entrySet()) { - for (BucketServerLocation66 bsl : entry.getValue()) { - assertThat(bsl.getPort()) - .isNotEqualTo(port0) - .isNotEqualTo(port1); - } - } + private void createOldClient(int pingInterval, boolean prSingleHopEnabled, + boolean subscriptionEnabled, boolean useServerPool, int... ports) { + CACHE.set((InternalCache) new CacheFactory().set(LOCATORS, "").create()); + String poolName = + createPool(pingInterval, prSingleHopEnabled, subscriptionEnabled, useServerPool, ports); + createRegionsInOldClient(poolName); } - private void createClientWithoutPRSingleHopEnabled(int port0) { - Properties properties = new Properties(); - properties.setProperty(LOCATORS, ""); - - cache = cacheRule.getOrCreateCache(properties); - + private String createPool(long pingInterval, boolean prSingleHopEnabled, + boolean subscriptionEnabled, boolean useServerPool, int... ports) { System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true"); - - Pool pool; + String poolName = PARTITIONED_REGION_NAME; try { - pool = PoolManager.createFactory() - .addServer("localhost", port0) - .setPingInterval(250) - .setSubscriptionEnabled(true) - .setSubscriptionRedundancy(-1) - .setReadTimeout(2000) - .setSocketBufferSize(1000) - .setMinConnections(6) + PoolFactory poolFactory = PoolManager.createFactory() .setMaxConnections(10) + .setMinConnections(6) + .setPingInterval(pingInterval) + .setPRSingleHopEnabled(prSingleHopEnabled) + .setReadTimeout(2000) .setRetryAttempts(3) - .setPRSingleHopEnabled(false) - .create(PR_NAME); + .setSocketBufferSize(1000) + .setSubscriptionEnabled(subscriptionEnabled) + .setSubscriptionRedundancy(-1); + + if (useServerPool) { + for (int port : ports) { + poolFactory.addServer("localhost", port); + } + } else { + for (int port : ports) { + poolFactory.addLocator("localhost", port); + } + } + + poolFactory.create(poolName); } finally { - System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "false"); + System.clearProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints"); } - createRegionsInClientCache(pool.getName()); + return poolName; } - private int createAccessorServer() throws IOException { - cache = cacheRule.getOrCreateCache(); - - CacheServer cacheServer = cache.addCacheServer(); - cacheServer.setPort(0); - cacheServer.setHostnameForClients("localhost"); - cacheServer.start(); - - int redundantCopies = 1; - int totalNumberOfBuckets = 4; - int localMaxMemory = 0; - - testRegion = - createBasicPartitionedRegion(redundantCopies, totalNumberOfBuckets, localMaxMemory); - - customerRegion = - createColocatedRegion(CUSTOMER, null, redundantCopies, totalNumberOfBuckets, - localMaxMemory); - - orderRegion = - createColocatedRegion(ORDER, CUSTOMER, redundantCopies, totalNumberOfBuckets, - localMaxMemory); - - shipmentRegion = - createColocatedRegion(SHIPMENT, ORDER, redundantCopies, totalNumberOfBuckets, - localMaxMemory); - - RegionFactory<Object, Object> regionFactory = cache.createRegionFactory(); - replicatedRegion = regionFactory.create("rr"); - - return cacheServer.getPort(); - } + private <K, V> Region<K, V> createPartitionedRegion(String regionName, String colocatedRegionName, + String diskStoreName, int localMaxMemory, PartitionResolver<K, V> partitionResolver, + int redundantCopies, int totalNumberOfBuckets) throws IOException { + InternalCache cache = getCache(); - private <K, V> Region<K, V> createBasicPartitionedRegion(int redundantCopies, - int totalNumberOfBuckets, - int localMaxMemory) { PartitionAttributesFactory<K, V> paf = new PartitionAttributesFactory<K, V>() .setRedundantCopies(redundantCopies) .setTotalNumBuckets(totalNumberOfBuckets); - if (localMaxMemory > -1) { - paf.setLocalMaxMemory(localMaxMemory); - } - - return cache.<K, V>createRegionFactory() - .setPartitionAttributes(paf.create()) - .setConcurrencyChecksEnabled(true) - .create(PR_NAME); - } - - private <K, V> Region<K, V> createColocatedRegion(String regionName, String colocatedRegionName, - int redundantCopies, int totalNumberOfBuckets, int localMaxMemory) { - PartitionAttributesFactory<K, V> paf = new PartitionAttributesFactory<K, V>() - .setRedundantCopies(redundantCopies) - .setTotalNumBuckets(totalNumberOfBuckets) - .setPartitionResolver(new CustomerIdPartitionResolver<>("CustomerIDPartitionResolver")); - if (colocatedRegionName != null) { paf.setColocatedWith(colocatedRegionName); } if (localMaxMemory > -1) { paf.setLocalMaxMemory(localMaxMemory); } - - return cache.<K, V>createRegionFactory() - .setPartitionAttributes(paf.create()) - .setConcurrencyChecksEnabled(true) - .create(regionName); - } - - private int createServer(int redundantCopies, int totalNumberOfBuckets) throws IOException { - cache = cacheRule.getOrCreateCache(); - - CacheServer cacheServer = cache.addCacheServer(); - cacheServer.setPort(0); - cacheServer.setHostnameForClients("localhost"); - cacheServer.start(); - - testRegion = createBasicPartitionedRegion(redundantCopies, totalNumberOfBuckets, -1); - - // creating colocated Regions - customerRegion = - createColocatedRegion(CUSTOMER, null, redundantCopies, totalNumberOfBuckets, - LOCAL_MAX_MEMORY_DEFAULT); - - orderRegion = - createColocatedRegion(ORDER, CUSTOMER, redundantCopies, totalNumberOfBuckets, - LOCAL_MAX_MEMORY_DEFAULT); - - shipmentRegion = - createColocatedRegion(SHIPMENT, ORDER, redundantCopies, totalNumberOfBuckets, - LOCAL_MAX_MEMORY_DEFAULT); - - replicatedRegion = cache.createRegionFactory().create("rr"); - - return cacheServer.getPort(); - } - - private int createPersistentPrsAndServer() throws IOException { - cache = cacheRule.getOrCreateCache(); - - if (cache.findDiskStore(diskStoreName) == null) { - cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName); + if (partitionResolver != null) { + paf.setPartitionResolver(partitionResolver); } - testRegion = createBasicPersistentPartitionRegion(); - - // creating colocated Regions - - int redundantCopies = 3; - int totalNumberOfBuckets = 4; - - customerRegion = createColocatedPersistentRegionForTest(CUSTOMER, null, - redundantCopies, totalNumberOfBuckets, LOCAL_MAX_MEMORY_DEFAULT); - - orderRegion = createColocatedPersistentRegionForTest(ORDER, CUSTOMER, redundantCopies, - totalNumberOfBuckets, LOCAL_MAX_MEMORY_DEFAULT); - - shipmentRegion = createColocatedPersistentRegionForTest(SHIPMENT, ORDER, redundantCopies, - totalNumberOfBuckets, LOCAL_MAX_MEMORY_DEFAULT); - - RegionFactory<Object, Object> regionFactory = cache.createRegionFactory(); - - replicatedRegion = regionFactory.create("rr"); - - CacheServer cacheServer = cache.addCacheServer(); - cacheServer.setPort(0); - cacheServer.setHostnameForClients("localhost"); - cacheServer.start(); - - return cacheServer.getPort(); - } + RegionFactory<K, V> regionFactory; + if (diskStoreName != null) { + // create DiskStore + if (cache.findDiskStore(diskStoreName) == null) { + cache.createDiskStoreFactory() + .setDiskDirs(getDiskDirs()) + .create(diskStoreName); + } - private <K, V> Region<K, V> createBasicPersistentPartitionRegion() { - PartitionAttributesFactory<K, V> paf = new PartitionAttributesFactory<K, V>() - .setRedundantCopies(3) - .setTotalNumBuckets(4); + regionFactory = cache.createRegionFactory(PARTITION_PERSISTENT); + regionFactory.setDiskStoreName(diskStoreName); + } else { + regionFactory = cache.createRegionFactory(PARTITION); + } - return cache.<K, V>createRegionFactory() - .setDataPolicy(DataPolicy.PERSISTENT_PARTITION) - .setDiskStoreName("disk") + return regionFactory + .setConcurrencyChecksEnabled(true) .setPartitionAttributes(paf.create()) - .create(PR_NAME); + .create(regionName); } - private <K, V> Region<K, V> createColocatedPersistentRegionForTest(String regionName, - String colocatedRegionName, int redundantCopies, int totalNumberOfBuckets, - int localMaxMemory) { + private void createRegionsInClientCache(String poolName) { + ClientRegionFactory<Object, Object> proxyRegionFactory = + CLIENT.get().createClientRegionFactory(ClientRegionShortcut.PROXY); + proxyRegionFactory.setPoolName(poolName); - PartitionAttributesFactory<K, V> paf = new PartitionAttributesFactory<K, V>() - .setRedundantCopies(redundantCopies) - .setTotalNumBuckets(totalNumberOfBuckets) - .setPartitionResolver(new CustomerIdPartitionResolver<>("CustomerIDPartitionResolver")); + proxyRegionFactory.create(PARTITIONED_REGION_NAME); - if (localMaxMemory > -1) { - paf.setLocalMaxMemory(localMaxMemory); - } - if (colocatedRegionName != null) { - paf.setColocatedWith(colocatedRegionName); - } + ClientRegionFactory<Object, Object> localRegionFactory = + CLIENT.get().createClientRegionFactory(ClientRegionShortcut.LOCAL); + localRegionFactory.setConcurrencyChecksEnabled(true); + localRegionFactory.setPoolName(poolName); - RegionFactory<K, V> regionFactory = cache.<K, V>createRegionFactory() - .setDataPolicy(DataPolicy.PERSISTENT_PARTITION) - .setDiskStoreName("disk") - .setPartitionAttributes(paf.create()); - - return regionFactory.create(regionName); + localRegionFactory.create(CUSTOMER_REGION_NAME); + localRegionFactory.create(ORDER_REGION_NAME); + localRegionFactory.create(SHIPMENT_REGION_NAME); + localRegionFactory.create(REPLICATE_REGION_NAME); } - private void createPersistentPrsAndServerOnPort(int port) throws IOException { - cache = cacheRule.getOrCreateCache(); - - if (cache.findDiskStore(diskStoreName) == null) { - cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName); - } - - testRegion = createBasicPersistentPartitionRegion(); - - // creating colocated Regions - int redundantCopies = 3; - int totalNumberOfBuckets = 4; - - customerRegion = - createColocatedPersistentRegionForTest(CUSTOMER, null, redundantCopies, - totalNumberOfBuckets, LOCAL_MAX_MEMORY_DEFAULT); - - orderRegion = - createColocatedPersistentRegionForTest(ORDER, CUSTOMER, redundantCopies, - totalNumberOfBuckets, LOCAL_MAX_MEMORY_DEFAULT); - - shipmentRegion = - createColocatedPersistentRegionForTest(SHIPMENT, ORDER, redundantCopies, - totalNumberOfBuckets, LOCAL_MAX_MEMORY_DEFAULT); + private void createRegionsInOldClient(String poolName) { + CACHE.get().createRegionFactory() + .setDataPolicy(DataPolicy.EMPTY) + .setPoolName(poolName) + .create(PARTITIONED_REGION_NAME); - replicatedRegion = cache.createRegionFactory() - .create("rr"); + RegionFactory<Object, Object> localRegionFactory = CACHE.get().createRegionFactory(LOCAL) + .setConcurrencyChecksEnabled(true) + .setPoolName(poolName); - CacheServer cacheServer = cache.addCacheServer(); - cacheServer.setPort(port); - cacheServer.setHostnameForClients("localhost"); - cacheServer.start(); + localRegionFactory.create(CUSTOMER_REGION_NAME); + localRegionFactory.create(ORDER_REGION_NAME); + localRegionFactory.create(SHIPMENT_REGION_NAME); + localRegionFactory.create(REPLICATE_REGION_NAME); } - private void startServerOnPort(int port) throws IOException { - cache = cacheRule.getOrCreateCache(); - - CacheServer cacheServer = cache.addCacheServer(); - cacheServer.setPort(port); + private int startServer() throws IOException { + CacheServer cacheServer = SERVER.get().getCache().addCacheServer(); cacheServer.setHostnameForClients("localhost"); + cacheServer.setPort(0); cacheServer.start(); - } - - private void createPeer() { - cache = cacheRule.getOrCreateCache(); - - int redundantCopies = 1; - int totalNumberOfBuckets = 4; - - testRegion = createBasicPartitionedRegion(redundantCopies, totalNumberOfBuckets, -1); - - customerRegion = - createColocatedRegion(CUSTOMER, null, redundantCopies, totalNumberOfBuckets, -1); - - orderRegion = - createColocatedRegion(ORDER, CUSTOMER, redundantCopies, totalNumberOfBuckets, -1); - - shipmentRegion = - createColocatedRegion(SHIPMENT, ORDER, redundantCopies, totalNumberOfBuckets, -1); - replicatedRegion = cache.createRegionFactory().create("rr"); + return cacheServer.getPort(); } - private void createClient(int port) { - Properties properties = new Properties(); - properties.setProperty(LOCATORS, ""); + private int startLocator() throws IOException { + LocatorLauncher locatorLauncher = new LocatorLauncher.Builder() + .setDeletePidFileOnStop(true) + .setPort(0) + .setWorkingDirectory(getWorkingDirectory()) + .set(ENABLE_CLUSTER_CONFIGURATION, "false") + .build(); + locatorLauncher.start(); - cache = cacheRule.getOrCreateCache(properties); + LOCATOR.set(locatorLauncher); - System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true"); - Pool pool; - try { - pool = PoolManager.createFactory() - .addServer("localhost", port) - .setPingInterval(250) - .setSubscriptionEnabled(true) - .setSubscriptionRedundancy(-1) - .setReadTimeout(2000) - .setSocketBufferSize(1000) - .setMinConnections(6) - .setMaxConnections(10) - .setRetryAttempts(3) - .create(PR_NAME); - } finally { - System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "false"); - } - - createRegionsInClientCache(pool.getName()); + return locatorLauncher.getLocator().getPort(); } - private void createClient(int port0, int port1) { - Properties properties = new Properties(); - properties.setProperty(LOCATORS, ""); - - cache = cacheRule.getOrCreateCache(properties); - - System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true"); - Pool pool; - try { - pool = PoolManager.createFactory() - .addServer("localhost", port0) - .addServer("localhost", port1) - .setPingInterval(250) - .setSubscriptionEnabled(true) - .setSubscriptionRedundancy(-1) - .setReadTimeout(2000) - .setSocketBufferSize(1000) - .setMinConnections(6) - .setMaxConnections(10) - .setRetryAttempts(3) - .create(PR_NAME); - } finally { - System.clearProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints"); + private void stopServer() { + for (CacheServer cacheServer : SERVER.get().getCache().getCacheServers()) { + cacheServer.stop(); } - - createRegionsInClientCache(pool.getName()); } - private void createClientWithLocator(String host, int port0) { - Properties properties = new Properties(); - properties.setProperty(LOCATORS, ""); - - cache = cacheRule.getOrCreateCache(properties); - - System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true"); - Pool pool; - try { - pool = PoolManager.createFactory() - .addLocator(host, port0) - .setPingInterval(250) - .setSubscriptionEnabled(true) - .setSubscriptionRedundancy(-1) - .setReadTimeout(2000) - .setSocketBufferSize(1000) - .setMinConnections(6) - .setMaxConnections(10) - .setRetryAttempts(3) - .create(PR_NAME); - } finally { - System.clearProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints"); - } - - createRegionsInClientCache(pool.getName()); + private void doPuts(Region<Object, Object> region) { + region.put(0, "create0"); + region.put(1, "create1"); + region.put(2, "create2"); + region.put(3, "create3"); } - private void createClient(int port0, int port1, int port2, int port3) { - Properties properties = new Properties(); - properties.setProperty(LOCATORS, ""); - - cache = cacheRule.getOrCreateCache(properties); - - System.setProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true"); - Pool pool; - try { - pool = PoolManager.createFactory() - .addServer("localhost", port0) - .addServer("localhost", port1) - .addServer("localhost", port2) - .addServer("localhost", port3) - .setPingInterval(100) - .setSubscriptionEnabled(false) - .setReadTimeout(2000) - .setSocketBufferSize(1000) - .setMinConnections(6) - .setMaxConnections(10) - .setRetryAttempts(3) - .create(PR_NAME); - } finally { - System.clearProperty(GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints"); + private void doManyPuts(Region<Object, Object> region) { + region.put(0, "create0"); + region.put(1, "create1"); + region.put(2, "create2"); + region.put(3, "create3"); + for (int i = 0; i < 40; i++) { + region.put(i, "create" + i); } - - createRegionsInClientCache(pool.getName()); - } - - private void createRegionsInClientCache(String poolName) { - testRegion = cache.createRegionFactory() - .setDataPolicy(DataPolicy.EMPTY) - .setPoolName(poolName) - .create(PR_NAME); - - customerRegion = cache.createRegionFactory() - .setConcurrencyChecksEnabled(true) - .setPoolName(poolName) - .setScope(Scope.LOCAL) - .create(CUSTOMER); - - orderRegion = cache.createRegionFactory() - .setConcurrencyChecksEnabled(true) - .setPoolName(poolName) - .setScope(Scope.LOCAL) - .create(ORDER); - - shipmentRegion = cache.createRegionFactory() - .setConcurrencyChecksEnabled(true) - .setPoolName(poolName) - .setScope(Scope.LOCAL) - .create(SHIPMENT); - - replicatedRegion = cache.createRegionFactory() - .setConcurrencyChecksEnabled(true) - .setPoolName(poolName) - .setScope(Scope.LOCAL) - .create("rr"); } private void putIntoPartitionedRegions() { + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); + Region<Object, Object> customerRegion = getRegion(CUSTOMER_REGION_NAME); + Region<Object, Object> orderRegion = getRegion(ORDER_REGION_NAME); + Region<Object, Object> shipmentRegion = getRegion(SHIPMENT_REGION_NAME); + Region<Object, Object> replicateRegion = getRegion(REPLICATE_REGION_NAME); + for (int i = 0; i <= 3; i++) { - CustomerId custid = new CustomerId(i); + CustomerId customerId = new CustomerId(i); Customer customer = new Customer("name" + i, "Address" + i); - customerRegion.put(custid, customer); + customerRegion.put(customerId, customer); for (int j = 1; j <= 10; j++) { int oid = i * 10 + j; - OrderId orderId = new OrderId(oid, custid); + OrderId orderId = new OrderId(oid, customerId); Order order = new Order("Order" + oid); orderRegion.put(orderId, order); @@ -1459,88 +1260,53 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { } } - testRegion.put(0, "create0"); - testRegion.put(1, "create1"); - testRegion.put(2, "create2"); - testRegion.put(3, "create3"); + partitionedRegion.put(0, "create0"); + partitionedRegion.put(1, "create1"); + partitionedRegion.put(2, "create2"); + partitionedRegion.put(3, "create3"); - testRegion.put(0, "update0"); - testRegion.put(1, "update1"); - testRegion.put(2, "update2"); - testRegion.put(3, "update3"); + partitionedRegion.put(0, "update0"); + partitionedRegion.put(1, "update1"); + partitionedRegion.put(2, "update2"); + partitionedRegion.put(3, "update3"); - testRegion.put(0, "update00"); - testRegion.put(1, "update11"); - testRegion.put(2, "update22"); - testRegion.put(3, "update33"); + partitionedRegion.put(0, "update00"); + partitionedRegion.put(1, "update11"); + partitionedRegion.put(2, "update22"); + partitionedRegion.put(3, "update33"); Map<Object, Object> map = new HashMap<>(); map.put(1, 1); - replicatedRegion.putAll(map); + replicateRegion.putAll(map); } - private File getDiskDir() { - try { - File file = new File(temporaryFolder.getRoot(), diskStoreName + getVMId()); - if (!file.exists()) { - temporaryFolder.newFolder(diskStoreName + getVMId()); - } - return file.getAbsoluteFile(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private File[] getDiskDirs() { - return new File[] {getDiskDir()}; - } - - private void executeFunctions() { - Set<Object> filter = new HashSet<>(); - filter.add(0); - FunctionService.onRegion(testRegion).withFilter(filter).execute(new PutFunction()) - .getResult(); - filter.add(1); - FunctionService.onRegion(testRegion).withFilter(filter).execute(new PutFunction()) - .getResult(); - filter.add(2); - filter.add(3); - FunctionService.onRegion(testRegion).withFilter(filter).execute(new PutFunction()) - .getResult(); - FunctionService.onRegion(testRegion).execute(new PutFunction()).getResult(); - } - - private void putAll() { + private void doPutAlls(Region<Object, Object> region) { Map<Object, Object> map = new HashMap<>(); map.put(0, 0); map.put(1, 1); map.put(2, 2); map.put(3, 3); - testRegion.putAll(map, "putAllCallback"); - testRegion.putAll(map); - testRegion.putAll(map); - testRegion.putAll(map); - } - private void put() { - testRegion.put(0, "create0"); - testRegion.put(1, "create1"); - testRegion.put(2, "create2"); - testRegion.put(3, "create3"); - for (int i = 0; i < 40; i++) { - testRegion.put(i, "create" + i); - } + region.putAll(map, "putAllCallback"); + region.putAll(map); + region.putAll(map); + region.putAll(map); } - private void getFromPartitionedRegions() { + private void doGets() { + Region<Object, Object> partitionedRegion = getRegion(PARTITIONED_REGION_NAME); + Region<Object, Object> customerRegion = getRegion(CUSTOMER_REGION_NAME); + Region<Object, Object> orderRegion = getRegion(ORDER_REGION_NAME); + Region<Object, Object> shipmentRegion = getRegion(SHIPMENT_REGION_NAME); + for (int i = 0; i <= 3; i++) { - CustomerId custid = new CustomerId(i); + CustomerId customerId = new CustomerId(i); Customer customer = new Customer("name" + i, "Address" + i); - customerRegion.get(custid, customer); + customerRegion.get(customerId, customer); for (int j = 1; j <= 10; j++) { int oid = i * 10 + j; - OrderId orderId = new OrderId(oid, custid); + OrderId orderId = new OrderId(oid, customerId); Order order = new Order("Order" + oid); orderRegion.get(orderId, order); @@ -1553,75 +1319,167 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { } } - testRegion.get(0, "create0"); - testRegion.get(1, "create1"); - testRegion.get(2, "create2"); - testRegion.get(3, "create3"); + partitionedRegion.get(0, "create0"); + partitionedRegion.get(1, "create1"); + partitionedRegion.get(2, "create2"); + partitionedRegion.get(3, "create3"); - testRegion.get(0, "update0"); - testRegion.get(1, "update1"); - testRegion.get(2, "update2"); - testRegion.get(3, "update3"); + partitionedRegion.get(0, "update0"); + partitionedRegion.get(1, "update1"); + partitionedRegion.get(2, "update2"); + partitionedRegion.get(3, "update3"); - testRegion.get(0, "update00"); - testRegion.get(1, "update11"); - testRegion.get(2, "update22"); - testRegion.get(3, "update33"); + partitionedRegion.get(0, "update00"); + partitionedRegion.get(1, "update11"); + partitionedRegion.get(2, "update22"); + partitionedRegion.get(3, "update33"); } - private void putIntoSinglePR() { - testRegion.put(0, "create0"); - testRegion.put(1, "create1"); - testRegion.put(2, "create2"); - testRegion.put(3, "create3"); + private void executeFunctions(Region<Object, Object> region) { + cast(FunctionService.onRegion(region)) + .withFilter(filter(0)) + .execute(new PutFunction()) + .getResult(); + + cast(FunctionService.onRegion(region)) + .withFilter(filter(0, 1)) + .execute(new PutFunction()) + .getResult(); + + cast(FunctionService.onRegion(region)) + .withFilter(filter(0, 1, 2, 3)) + .execute(new PutFunction()) + .getResult(); + + cast(FunctionService.onRegion(region)) + .execute(new PutFunction()) + .getResult(); } - private void updateIntoSinglePR() { - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); + private Set<Object> filter(Object... values) { + return Arrays.stream(values).collect(Collectors.toSet()); + } - clientMetadataService.satisfyRefreshMetadata_TEST_ONLY(false); + private void clearMetadata() { + ClientMetadataService clientMetadataService = + getInternalCache(SERVER.get()).getClientMetadataService(); + clientMetadataService.getClientPartitionAttributesMap().clear(); + clientMetadataService.getClientPRMetadata_TEST_ONLY().clear(); + } - testRegion.put(0, "update0"); - assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); + private String getWorkingDirectory() throws IOException { + int vmId = getVMId(); + File directory = new File(temporaryFolder.getRoot(), "VM-" + vmId); + if (!directory.exists()) { + temporaryFolder.newFolder("VM-" + vmId); + } + return directory.getAbsolutePath(); + } - testRegion.put(1, "update1"); - assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); + private File getDiskDir() throws IOException { + File file = new File(temporaryFolder.getRoot(), diskStoreName + getVMId()); + if (!file.exists()) { + temporaryFolder.newFolder(diskStoreName + getVMId()); + } + return file.getAbsoluteFile(); + } - testRegion.put(2, "update2"); - assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); + private File[] getDiskDirs() throws IOException { + return new File[] {getDiskDir()}; + } - testRegion.put(3, "update3"); - assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); + private InternalCache getInternalCache(ServerLauncher serverLauncher) { + return cast(serverLauncher.getCache()); + } - testRegion.put(0, "update00"); - assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); + private void waitForLocalBucketsCreation() { + PartitionedRegion pr = (PartitionedRegion) getRegion(PARTITIONED_REGION_NAME); - testRegion.put(1, "update11"); - assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); + await().untilAsserted(() -> assertThat(pr.getDataStore().getAllLocalBuckets()).hasSize(4)); + } - testRegion.put(2, "update22"); - assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); + private void verifyDeadServer(Map<String, ClientPartitionAdvisor> regionMetaData, Region region, + int port0, int port1) { + ClientPartitionAdvisor prMetaData = regionMetaData.get(region.getFullPath()); + Set<Entry<Integer, List<BucketServerLocation66>>> bucketLocationsMap = + prMetaData.getBucketServerLocationsMap_TEST_ONLY().entrySet(); - testRegion.put(3, "update33"); - assertThat(clientMetadataService.isRefreshMetadataTestOnly()).isFalse(); + for (Entry<Integer, List<BucketServerLocation66>> entry : bucketLocationsMap) { + for (BucketServerLocation66 bucketLocation : entry.getValue()) { + assertThat(bucketLocation.getPort()) + .isNotEqualTo(port0) + .isNotEqualTo(port1); + } + } } - private void verifyEmptyMetadata() { - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); - assertThat(clientMetadataService.getClientPRMetadata_TEST_ONLY()).isEmpty(); + private void verifyMetadata(Map<Integer, List<BucketServerLocation66>> clientBucketMap) { + PartitionedRegion pr = (PartitionedRegion) getRegion(PARTITIONED_REGION_NAME); + Map<Integer, Set<ServerBucketProfile>> serverBucketMap = + pr.getRegionAdvisor().getAllClientBucketProfilesTest(); + + assertThat(serverBucketMap) + .hasSize(clientBucketMap.size()); + assertThat(serverBucketMap.keySet()) + .containsAll(clientBucketMap.keySet()); + + for (Entry<Integer, List<BucketServerLocation66>> entry : clientBucketMap.entrySet()) { + int bucketId = entry.getKey(); + List<BucketServerLocation66> bucketLocations = entry.getValue(); + + BucketServerLocation66 primaryBucketLocation = null; + int countOfPrimaries = 0; + for (BucketServerLocation66 bucketLocation : bucketLocations) { + if (bucketLocation.isPrimary()) { + primaryBucketLocation = bucketLocation; + countOfPrimaries++; + } + } + + assertThat(countOfPrimaries).isEqualTo(1); + + Set<ServerBucketProfile> bucketProfiles = serverBucketMap.get(bucketId); + + assertThat(bucketProfiles).hasSize(bucketLocations.size()); + + countOfPrimaries = 0; + for (ServerBucketProfile bucketProfile : bucketProfiles) { + for (BucketServerLocation66 bucketLocation : bucketProfile.getBucketServerLocations()) { + + assertThat(bucketLocations).contains(bucketLocation); + + // should be only one primary + if (bucketProfile.isPrimary) { + countOfPrimaries++; + + assertThat(bucketLocation).isEqualTo(primaryBucketLocation); + } + } + } + + assertThat(countOfPrimaries).isEqualTo(1); + } } - private void verifyEmptyStaticData() { - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); - assertThat(clientMetadataService.getClientPartitionAttributesMap()).isEmpty(); + private InternalCache getCache() { + if (CACHE.get() != DUMMY_CACHE) { + return CACHE.get(); + } + if (SERVER.get() != DUMMY_SERVER) { + return (InternalCache) SERVER.get().getCache(); + } + if (CLIENT.get() != DUMMY_CACHE) { + return (InternalCache) CLIENT.get(); + } + return null; } - private void verifyMetadata() { - ClientMetadataService clientMetadataService = cache.getClientMetadataService(); - // make sure all fetch tasks are completed - await().untilAsserted(() -> { - assertThat(clientMetadataService.getRefreshTaskCount_TEST_ONLY()).isZero(); - }); + private Region<Object, Object> getRegion(String name) { + InternalCache cache = getCache(); + if (cache != null) { + return cache.getRegion(name); + } + throw new IllegalStateException("Cache or region not found"); } private static class PutFunction extends FunctionAdapter implements DataSerializable { @@ -1996,15 +1854,7 @@ public class PartitionedRegionSingleHopDUnitTest implements Serializable { public static class CustomerIdPartitionResolver<K, V> implements PartitionResolver<K, V> { - private String id; - - public CustomerIdPartitionResolver() { - // required - } - - public CustomerIdPartitionResolver(String id) { - this.id = id; - } + private final String id = getClass().getSimpleName(); @Override public String getName() { diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/InternalClientCache.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/InternalClientCache.java index c2d5be5..7e10090 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/InternalClientCache.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/InternalClientCache.java @@ -48,4 +48,6 @@ public interface InternalClientCache extends ClientCache { CachePerfStats getCachePerfStats(); MeterRegistry getMeterRegistry(); + + ClientMetadataService getClientMetadataService(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/util/UncheckedUtils.java b/geode-core/src/main/java/org/apache/geode/internal/cache/util/UncheckedUtils.java index 195df9f..c03e990 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/util/UncheckedUtils.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/util/UncheckedUtils.java @@ -14,10 +14,16 @@ */ package org.apache.geode.internal.cache.util; +import org.apache.geode.cache.execute.Execution; + @SuppressWarnings({"unchecked", "unused"}) public class UncheckedUtils { public static <T> T cast(Object object) { return (T) object; } + + public static <IN, OUT, AGG> Execution<IN, OUT, AGG> cast(Execution execution) { + return execution; + } }