This is an automated email from the ASF dual-hosted git repository. klund pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit 3b32fd569b196f7ad1d3e9460f3d5debf67d3ae0 Author: Kirk Lund <[email protected]> AuthorDate: Fri Mar 23 16:38:33 2018 -0700 GEODE-1279: Rename Bug36853EventsExpiryDUnitTest as ClientSubscriptionExpiryDataLossRegressionTest * Rewrite test with a spy CacheListener. --- .../cache/ha/Bug36853EventsExpiryDUnitTest.java | 268 --------------------- ...ntSubscriptionExpiryDataLossRegressionTest.java | 195 +++++++++++++++ 2 files changed, 195 insertions(+), 268 deletions(-) diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/Bug36853EventsExpiryDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/Bug36853EventsExpiryDUnitTest.java deleted file mode 100755 index 8dee70b..0000000 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/Bug36853EventsExpiryDUnitTest.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache.ha; - -import static org.apache.geode.distributed.ConfigurationProperties.*; -import static org.junit.Assert.*; - -import java.util.Properties; - -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.geode.cache.AttributesFactory; -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.DataPolicy; -import org.apache.geode.cache.EntryEvent; -import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionAttributes; -import org.apache.geode.cache.Scope; -import org.apache.geode.cache.server.CacheServer; -import org.apache.geode.cache.util.CacheListenerAdapter; -import org.apache.geode.cache30.CacheTestCase; -import org.apache.geode.cache30.ClientServerTestCase; -import org.apache.geode.distributed.DistributedSystem; -import org.apache.geode.internal.AvailablePort; -import org.apache.geode.internal.cache.tier.sockets.ConflationDUnitTest; -import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.IgnoredException; -import org.apache.geode.test.dunit.LogWriterUtils; -import org.apache.geode.test.dunit.NetworkUtils; -import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; -import org.apache.geode.test.junit.categories.ClientSubscriptionTest; -import org.apache.geode.test.junit.categories.DistributedTest; - -/** - * This is a bug test for 36853 (Expiry logic in HA is used to expire early data that a secondary - * picks up that is not in the primary. But it is also possible that it would cause data that is in - * the primary queue to be expired. And this can cause a data loss. This issue is mostly related to - * Expiry mechanism and not HA, but it affects HA functionality). - * - * This test has a cache-client connected to one cache-server. The expiry-time of events in the - * queue for the client at the server is set low and dispatcher is set for delayed start. This will - * make some of the events in the queue expire before dispatcher can start picking them up for - * delivery to the client. - */ -@Category({DistributedTest.class, ClientSubscriptionTest.class}) -public class Bug36853EventsExpiryDUnitTest extends JUnit4CacheTestCase { - - /** Cache-server */ - private VM server = null; - - /** Client , connected to Cache-server */ - private VM client = null; - - /** Name of the test region */ - private static final String REGION_NAME = - Bug36853EventsExpiryDUnitTest.class.getSimpleName() + "_region"; - - /** The cache instance for test cases */ - private static Cache cache = null; - - /** Boolean to indicate the client to proceed for validation */ - private static volatile boolean proceedForValidation = false; - - /** Counter to indicate number of puts recieved by client */ - private static volatile int putsRecievedByClient; - - /** The last key for operations, to notify for proceeding to validation */ - private static final String LAST_KEY = "LAST_KEY"; - - /** The time in milliseconds by which the start of dispatcher will be delayed */ - private static final int DISPATCHER_SLOWSTART_TIME = 10000; - - /** Number of puts done for the test */ - private static final int TOTAL_PUTS = 5; - - @Override - public final void preSetUp() throws Exception { - disconnectAllFromDS(); - } - - @Override - public final void postSetUp() throws Exception { - final Host host = Host.getHost(0); - server = host.getVM(0); - client = host.getVM(1); - server.invoke(() -> ConflationDUnitTest.setIsSlowStart()); - int PORT2 = ((Integer) server.invoke(() -> Bug36853EventsExpiryDUnitTest.createServerCache())) - .intValue(); - - client.invoke(() -> Bug36853EventsExpiryDUnitTest - .createClientCache(NetworkUtils.getServerHostName(host), new Integer(PORT2))); - } - - /** - * Creates the cache - * - * @param props - distributed system props - * @throws Exception - thrown in any problem occurs in creating cache - */ - private void createCache(Properties props) throws Exception { - DistributedSystem ds = getSystem(props); - cache = CacheFactory.create(ds); - assertNotNull(cache); - } - - /** - * Creates cache and starts the bridge-server - */ - private static Integer createServerCache() throws Exception { - System.setProperty(HARegionQueue.REGION_ENTRY_EXPIRY_TIME, "1"); - System.setProperty("slowStartTimeForTesting", String.valueOf(DISPATCHER_SLOWSTART_TIME)); - new Bug36853EventsExpiryDUnitTest().createCache(new Properties()); - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - factory.setDataPolicy(DataPolicy.REPLICATE); - RegionAttributes attrs = factory.create(); - cache.createRegion(REGION_NAME, attrs); - - CacheServer server = cache.addCacheServer(); - assertNotNull(server); - int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - server.setPort(port); - server.setNotifyBySubscription(true); - server.start(); - return new Integer(server.getPort()); - } - - /** - * Creates the client cache - * - * @param hostName the name of the server's machine - * @param port - bridgeserver port - * @throws Exception - thrown if any problem occurs in setting up the client - */ - private static void createClientCache(String hostName, Integer port) throws Exception { - Properties props = new Properties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, ""); - new Bug36853EventsExpiryDUnitTest().createCache(props); - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - ClientServerTestCase.configureConnectionPool(factory, hostName, port.intValue(), -1, true, -1, - 2, null); - - factory.addCacheListener(new CacheListenerAdapter() { - public void afterCreate(EntryEvent event) { - String key = (String) event.getKey(); - LogWriterUtils.getLogWriter().info("client2 : afterCreate : key =" + key); - if (key.equals(LAST_KEY)) { - - synchronized (Bug36853EventsExpiryDUnitTest.class) { - LogWriterUtils.getLogWriter().info("Notifying client2 to proceed for validation"); - proceedForValidation = true; - Bug36853EventsExpiryDUnitTest.class.notify(); - } - } else { - putsRecievedByClient++; - } - } - }); - RegionAttributes attrs = factory.create(); - Region region = cache.createRegion(REGION_NAME, attrs); - - region.registerInterest("ALL_KEYS"); - } - - /** - * First generates some events, then waits for the time equal to that of delayed start of the - * dispatcher and then does put on the last key for few iterations. The idea is to let the events - * added, before waiting, to expire before the dispatcher to pick them up and then do a put on a - * LAST_KEY couple of times so that atleast one of these is dispatched to client and when client - * recieves this in the listener, the test is notified to proceed for validation. - * - * @throws Exception - thrown if any problem occurs in put operation - */ - private static void generateEvents() throws Exception { - String regionName = Region.SEPARATOR + REGION_NAME; - Region region = cache.getRegion(regionName); - for (int i = 0; i < TOTAL_PUTS; i++) { - - region.put("key" + i, "val-" + i); - } - Thread.sleep(DISPATCHER_SLOWSTART_TIME + 1000); - for (int i = 0; i < 25; i++) { - - region.put(LAST_KEY, "LAST_VALUE"); - } - } - - /** - * First generates some events, then waits for the time equal to that of delayed start of the - * dispatcher and then does put on the last key for few iterations. Whenever the client the create - * corresponding to the LAST_KEY in the listener, the test is notified to proceed for validation. - * Then, it is validated that all the events that were added prior to the LAST_KEY are dispatched - * to the client. Due to the bug#36853, those events will expire and validation will fail. - * - * @throws Exception - thrown if any exception occurs in test - */ - @Test - public void testEventsExpiryBug() throws Exception { - IgnoredException.addIgnoredException("Unexpected IOException"); - IgnoredException.addIgnoredException("Connection reset"); - server.invoke(() -> Bug36853EventsExpiryDUnitTest.generateEvents()); - client.invoke(() -> Bug36853EventsExpiryDUnitTest.validateEventCountAtClient()); - } - - /** - * Waits for the listener to receive all events and validates that no exception occurred in client - */ - private static void validateEventCountAtClient() throws Exception { - if (!proceedForValidation) { - synchronized (Bug36853EventsExpiryDUnitTest.class) { - if (!proceedForValidation) - try { - LogWriterUtils.getLogWriter().info("Client2 going in wait before starting validation"); - Bug36853EventsExpiryDUnitTest.class.wait(5000); - } catch (InterruptedException e) { - fail("interrupted"); - } - } - } - LogWriterUtils.getLogWriter().info("Starting validation on client2"); - assertEquals("Puts recieved by client not equal to the puts done at server.", TOTAL_PUTS, - putsRecievedByClient); - LogWriterUtils.getLogWriter().info("putsRecievedByClient = " + putsRecievedByClient); - LogWriterUtils.getLogWriter().info("Validation complete on client2"); - - } - - /** - * Closes the cache - * - */ - private static void unSetExpiryTimeAndCloseCache() { - System.clearProperty(HARegionQueue.REGION_ENTRY_EXPIRY_TIME); - CacheTestCase.closeCache(); - } - - /** - * Closes the caches on clients and servers - * - * @throws Exception - thrown if any problem occurs in closing client and server caches. - */ - @Override - public final void preTearDownCacheTestCase() throws Exception { - // close client - client.invoke(() -> Bug36853EventsExpiryDUnitTest.unSetExpiryTimeAndCloseCache()); - // close server - server.invoke(() -> Bug36853EventsExpiryDUnitTest.unSetExpiryTimeAndCloseCache()); - - } - -} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/ClientSubscriptionExpiryDataLossRegressionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/ClientSubscriptionExpiryDataLossRegressionTest.java new file mode 100755 index 0000000..668ea8b --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/ClientSubscriptionExpiryDataLossRegressionTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.ha; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.geode.cache30.ClientServerTestCase.configureConnectionPool; +import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; +import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.apache.geode.internal.cache.tier.sockets.ConflationDUnitTest.setIsSlowStart; +import static org.apache.geode.test.dunit.Host.getHost; +import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException; +import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.Properties; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.CacheListener; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.Scope; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.cache.CacheTestCase; +import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; +import org.apache.geode.test.junit.categories.ClientSubscriptionTest; +import org.apache.geode.test.junit.categories.DistributedTest; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; + +/** + * This is a bug test for 36853 (Expiry logic in HA is used to expire early data that a secondary + * picks up that is not in the primary. But it is also possible that it would cause data that is in + * the primary queue to be expired. And this can cause a data loss. This issue is mostly related to + * Expiry mechanism and not HA, but it affects HA functionality). + * + * <p> + * This test has a cache-client connected to one cache-server. The expiry-time of events in the + * queue for the client at the server is set low and dispatcher is set for delayed start. This will + * make some of the events in the queue expire before dispatcher can start picking them up for + * delivery to the client. + * + * <p> + * TRAC #36853: HA events can expire on primary server and this can cause data loss. + */ +@Category({DistributedTest.class, ClientSubscriptionTest.class}) +public class ClientSubscriptionExpiryDataLossRegressionTest extends CacheTestCase { + + /** The time in milliseconds by which the start of dispatcher will be delayed */ + private static final int DISPATCHER_SLOWSTART_TIME = 10_000; + private static final int PUT_COUNT = 5; + + private static CacheListener<String, String> spyCacheListener; + + private String uniqueName; + private String hostName; + private int serverPort; + + private VM server; + private VM client; + + @Rule + public DistributedRestoreSystemProperties restoreSystemProperties = + new DistributedRestoreSystemProperties(); + + @Rule + public SerializableTestName testName = new SerializableTestName(); + + @Before + public void setUp() throws Exception { + server = getHost(0).getVM(0); + client = getHost(0).getVM(1); + + uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName(); + hostName = getServerHostName(getHost(0)); + + server.invoke(() -> setIsSlowStart()); + serverPort = server.invoke(() -> createServerCache()); + + client.invoke(() -> createClientCache()); + + addIgnoredException("Unexpected IOException"); + addIgnoredException("Connection reset"); + } + + @After + public void tearDown() throws Exception { + disconnectAllFromDS(); + } + + /** + * First generate some events, then wait for some time to let the initial events expire before + * the dispatcher sends them to the client. Then do one final put so that the client knows when + * to being validation. + * + * <p> + * Client is waiting for afterCreate to be invoked number of PUT_COUNT times before proceeding + * with validation. + * + * <p> + * If the bug exists or is reintroduced, then the events will expire without reaching the client. + */ + @Test + public void allEventsShouldReachClientWithoutExpiring() throws Exception { + server.invoke(() -> generateEvents()); + client.invoke(() -> validateEventCountAtClient()); + } + + private int createServerCache() throws IOException { + System.setProperty(HARegionQueue.REGION_ENTRY_EXPIRY_TIME, String.valueOf(1)); + System.setProperty("slowStartTimeForTesting", String.valueOf(DISPATCHER_SLOWSTART_TIME)); + + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.DISTRIBUTED_ACK); + factory.setDataPolicy(DataPolicy.REPLICATE); + + getCache().createRegion(uniqueName, factory.create()); + + CacheServer server = getCache().addCacheServer(); + server.setPort(0); + server.setNotifyBySubscription(true); + server.start(); + return server.getPort(); + } + + private void createClientCache() { + Properties config = new Properties(); + config.setProperty(MCAST_PORT, "0"); + config.setProperty(LOCATORS, ""); + + getCache(config); + + spyCacheListener = spy(CacheListener.class); + + AttributesFactory factory = new AttributesFactory(); + factory.addCacheListener(spyCacheListener); + factory.setScope(Scope.DISTRIBUTED_ACK); + + configureConnectionPool(factory, hostName, serverPort, -1, true, -1, 2, null); + + Region region = getCache().createRegion(uniqueName, factory.create()); + + region.registerInterest("ALL_KEYS"); + } + + /** + * First generate some events, then wait for some time to let the initial events expire before + * the dispatcher sends them to the client. Then do one final put so that the client knows when + * to being validation. + * + * <p> + * Client is waiting for afterCreate to be invoked number of PUT_COUNT times before proceeding + * with validation. + */ + private void generateEvents() throws InterruptedException { + Region<String, String> region = getCache().getRegion(uniqueName); + for (int i = 0; i < PUT_COUNT - 1; i++) { + region.put("key" + i, "val-" + i); + } + + Thread.sleep(DISPATCHER_SLOWSTART_TIME + 1000); + + region.put("key" + PUT_COUNT, "LAST_VALUE"); + } + + /** + * Waits for the listener to receive all events + */ + private void validateEventCountAtClient() { + await().atMost(1, MINUTES) + .until(() -> verify(spyCacheListener, times(PUT_COUNT)).afterCreate(any())); + } +} -- To stop receiving notification emails like this one, please contact [email protected].
