This is an automated email from the ASF dual-hosted git repository. mkevo pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 38139fb GEODE-8329: Fix for durable CQ reqistration recovery (#5360) 38139fb is described below commit 38139fbb00cbea872348d3554f9589bd7c5bfdde Author: Jakov Varenina <62134331+jvaren...@users.noreply.github.com> AuthorDate: Mon Dec 7 12:35:49 2020 +0100 GEODE-8329: Fix for durable CQ reqistration recovery (#5360) * GEODE-8329: Fix for durable CQ reqistration recovery This change solves the issue when the client without configured HA is wrongly re-registering durable CQs as non durable during the server failover. * Fix for stressTest * empty commit to re-launch CI --- .../cache/client/internal/QueueManagerImpl.java | 3 +- .../tier/sockets/DurableClientCQDUnitTest.java | 139 +++++++++++++++++++++ .../cache/tier/sockets/DurableClientTestBase.java | 52 ++++++-- 3 files changed, 183 insertions(+), 11 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java index 212d2de..145817c 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java @@ -1112,7 +1112,8 @@ public class QueueManagerImpl implements QueueManager { .set(((DefaultQueryService) this.pool.getQueryService()).getUserAttributes(name)); } try { - if (((CqStateImpl) cqi.getState()).getState() != CqStateImpl.INIT) { + if (((CqStateImpl) cqi.getState()).getState() != CqStateImpl.INIT + && cqi.isDurable() == isDurable) { cqi.createOn(recoveredConnection, isDurable); } } finally { diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java index 4b24be9..cc171eb 100644 --- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java +++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java @@ -18,6 +18,7 @@ package org.apache.geode.internal.cache.tier.sockets; import static org.apache.geode.cache.Region.SEPARATOR; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -51,10 +52,13 @@ import org.apache.geode.distributed.internal.ServerLocation; import org.apache.geode.distributed.internal.ServerLocationAndMemberId; import org.apache.geode.internal.cache.ClientServerObserverAdapter; import org.apache.geode.internal.cache.ClientServerObserverHolder; +import org.apache.geode.test.awaitility.GeodeAwaitility; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.dunit.NetworkUtils; import org.apache.geode.test.dunit.SerializableRunnableIF; import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.WaitCriterion; import org.apache.geode.test.junit.categories.ClientSubscriptionTest; @Category({ClientSubscriptionTest.class}) @@ -145,6 +149,107 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase { } /** + * Test that durable CQ is correctly re-registered to new server after the failover and + * that the durable client functionality works as expected. + * Steps: + * 1. Start two servers + * 2. Start durable client without HA and register durable CQs + * 3. Shutdown the server that is hosting CQs subscription queue (primary server) + * 4. Wait for the durable client to perform the failover to the another server + * 5. Shutdown the durable client with keepAlive flag set to true + * 6. Provision remaining server with the data that should fulfil CQ condition and fill the queue + * 7. Start the durable client again and check that it receives correct events from queue + */ + @Test + public void testDurableCQServerFailoverWithoutHAConfigured() + throws Exception { + String greaterThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID > 5"; + String allQuery = "select * from " + SEPARATOR + regionName + " p where p.ID > -1"; + String lessThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID < 5"; + + // Start a server 1 + server1Port = this.server1VM + .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE)); + + // Start server 2 + server2Port = this.server2VM.invoke(CacheServerTestUtil.class, + "createCacheServer", new Object[] {regionName, Boolean.TRUE}); + + // Start a durable client that is kept alive on the server when it stops normally + durableClientId = getName() + "_client"; + CacheServerTestUtil.createCacheClient( + getClientPool(NetworkUtils.getServerHostName(), server1Port, server2Port, true, 0), + regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE); + + // register non durable cq + createCq("GreaterThan5", greaterThan5Query, false).execute(); + + // register durable cqs + createCq("All", allQuery, true).execute(); + createCq("LessThan5", lessThan5Query, true).execute(); + + // send client ready + CacheServerTestUtil.getClientCache().readyForEvents(); + + int oldPrimaryPort = getPrimaryServerPort(); + // Close the server that is hosting subscription queue + VM primary = getPrimaryServerVM(); + // Verify durable client on server + verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId, + primary); + + primary.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); + + // Wait until failover to the another server is successfully performed + waitForFailoverToPerform(oldPrimaryPort); + primary = getPrimaryServerVM(); + waitForDurableClientPresence(durableClientId, primary, 1); + int primaryPort = getPrimaryServerPort(); + + // Stop the durable client + CacheServerTestUtil.closeCache(true); + + // Start normal publisher client + startClient(publisherClientVM, primaryPort, regionName); + + // Publish some entries + publishEntries(regionName, 10); + + // Restart the durable client + CacheServerTestUtil.createCacheClient( + getClientPool(NetworkUtils.getServerHostName(), primaryPort, true), + regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE); + assertThat(CacheServerTestUtil.getClientCache()).isNotNull(); + + // Re-register non durable cq + createCq("GreaterThan5", greaterThan5Query, false).execute(); + + // Re-register durable cqs + createCq("All", allQuery, true).execute(); + createCq("LessThan5", lessThan5Query, true).execute(); + + // send client ready + CacheServerTestUtil.getClientCache().readyForEvents(); + + // verify cq events for all 3 cqs + checkCqListenerEvents("GreaterThan5", 0 /* numEventsExpected */, + /* numEventsToWaitFor */ 15/* secondsToWait */); + checkCqListenerEvents("LessThan5", 5 /* numEventsExpected */, + /* numEventsToWaitFor */ 15/* secondsToWait */); + checkCqListenerEvents("All", 10 /* numEventsExpected */, + /* numEventsToWaitFor */ 15/* secondsToWait */); + + primary = getPrimaryServerVM(); + // Stop the durable client + CacheServerTestUtil.closeCache(false); + // Stop the publisher client + this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); + // Stop the remaining server + primary.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); + } + + + /** * Test functionality to close the cq and drain all events from the ha queue from the server This * draining should not affect events that still have register interest */ @@ -782,6 +887,8 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase { @Test public void testGetAllDurableCqsFromServer() { + + // Start server 1 server1Port = this.server1VM.invoke(CacheServerTestUtil.class, "createCacheServer", new Object[] {regionName, Boolean.TRUE}); @@ -971,6 +1078,38 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase { vm.invoke(cacheSerializableRunnable); } + public VM getPrimaryServerVM() { + if (this.server1Port == getPrimaryServerPort()) { + return server1VM; + } else { + return server2VM; + } + } + + public int getPrimaryServerPort() { + PoolImpl pool = CacheServerTestUtil.getPool(); + ServerLocation primaryServerLocation = pool.getPrimary(); + return primaryServerLocation.getPort(); + } + + public void waitForFailoverToPerform(int oldPrimaryPort) { + final PoolImpl pool = CacheServerTestUtil.getPool(); + WaitCriterion ev = new WaitCriterion() { + @Override + public boolean done() { + return pool.getPrimary() != null && pool.getPrimary().getPort() != oldPrimaryPort; + } + + @Override + public String description() { + return null; + } + }; + + GeodeAwaitility.await().untilAsserted(ev); + assertNotNull(pool.getPrimary()); + } + void registerDurableCq(final String cqName) { // Durable client registers durable cq on server this.durableClientVM.invoke(new CacheSerializableRunnable("Register Cq") { diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java index 796e58f..d01d5cb 100644 --- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java +++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java @@ -60,10 +60,12 @@ import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.PoolFactoryImpl; import org.apache.geode.internal.cache.ha.HARegionQueue; import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.test.awaitility.GeodeAwaitility; import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.NetworkUtils; import org.apache.geode.test.dunit.SerializableRunnableIF; import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.WaitCriterion; import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; public class DurableClientTestBase extends JUnit4DistributedTestCase { @@ -80,9 +82,9 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase { VM publisherClientVM; protected String regionName; int server1Port; + int server2Port; String durableClientId; - @Override public final void postSetUp() throws Exception { this.server1VM = VM.getVM(0); @@ -172,6 +174,32 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase { verifyDurableClientPresence(durableClientTimeout, durableClientId, serverVM, 0); } + void waitForDurableClientPresence(String durableClientId, VM serverVM, final int count) { + serverVM.invoke(() -> { + if (count > 0) { + + WaitCriterion ev = new WaitCriterion() { + @Override + public boolean done() { + checkNumberOfClientProxies(count); + CacheClientProxy proxy = getClientProxy(); + + if (proxy != null && durableClientId.equals(proxy.getDurableId())) { + return true; + } + return false; + } + + @Override + public String description() { + return null; + } + }; + GeodeAwaitility.await().untilAsserted(ev); + } + }); + } + void verifyDurableClientPresence(int durableClientTimeout, String durableClientId, VM serverVM, final int count) { serverVM.invoke(() -> { @@ -372,7 +400,7 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase { } } - private CqQuery createCq(String cqName, String cqQuery, boolean durable) + CqQuery createCq(String cqName, String cqQuery, boolean durable) throws CqException, CqExistsException { QueryService qs = CacheServerTestUtil.getCache().getQueryService(); CqAttributesFactory cqf = new CqAttributesFactory(); @@ -461,7 +489,6 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase { return bridgeServer; } - Pool getClientPool(String host, int server1Port, int server2Port, boolean establishCallbackConnection, int redundancyLevel) { PoolFactory pf = PoolManager.createFactory(); @@ -664,16 +691,21 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase { void checkCqListenerEvents(VM vm, final String cqName, final int numEvents, final int secondsToWait) { vm.invoke(() -> { - QueryService qs = CacheServerTestUtil.getCache().getQueryService(); - CqQuery cq = qs.getCq(cqName); - // Get the listener and wait for the appropriate number of events - CacheServerTestUtil.ControlCqListener listener = - (CacheServerTestUtil.ControlCqListener) cq.getCqAttributes().getCqListener(); - listener.waitWhileNotEnoughEvents(secondsToWait * 1000, numEvents); - assertThat(numEvents).isEqualTo(listener.events.size()); + checkCqListenerEvents(cqName, numEvents, secondsToWait); }); } + void checkCqListenerEvents(final String cqName, final int numEvents, + final int secondsToWait) { + QueryService qs = CacheServerTestUtil.getCache().getQueryService(); + CqQuery cq = qs.getCq(cqName); + // Get the listener and wait for the appropriate number of events + CacheServerTestUtil.ControlCqListener listener = + (CacheServerTestUtil.ControlCqListener) cq.getCqAttributes().getCqListener(); + listener.waitWhileNotEnoughEvents(secondsToWait * 1000, numEvents); + assertThat(numEvents).isEqualTo(listener.events.size()); + } + void checkListenerEvents(int numberOfEntries, final int sleepMinutes, final int eventType, final VM vm) { vm.invoke(() -> {