This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch support/1.12 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.12 by this push: new edb24b8 GEODE-9596: Avoid creating multiple cq maps in ClientUpdateMessage (#6869) edb24b8 is described below commit edb24b8b2ccd498a9fb710872679c661213f8968 Author: Eric Shu <e...@pivotal.io> AuthorDate: Fri Sep 17 15:48:49 2021 -0700 GEODE-9596: Avoid creating multiple cq maps in ClientUpdateMessage (#6869) * This can occur if HAContainer enables eviction. * Also addClientCq can be accessed concurrently by intialization threads during queue registration. (cherry picked from commit 00b41562f8c4e5982190795c0426e5884f57724a) --- .../internal/cache/ha/HARegionQueueJUnitTest.java | 6 +- .../geode/internal/cache/ha/HARegionQueue.java | 2 +- .../tier/sockets/ClientUpdateMessageImpl.java | 28 ++- .../tier/sockets/ClientUpdateMessageImplTest.java | 118 +++++++++++ .../geode/cache/query/cq/dunit/CqDUnitTest.java | 228 +++++++++++++++++++++ 5 files changed, 362 insertions(+), 20 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java index 67c39a8..93b5716 100755 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java @@ -1569,9 +1569,9 @@ public class HARegionQueueJUnitTest { regionQueue.putEntryConditionallyIntoHAContainer(mockHAEventWrapper); regionQueue.putEntryConditionallyIntoHAContainer(mockHAEventWrapper); - verify(mockClientUpdateMessage, times(1)).addClientCqs(mockClientProxyMembershipId, - mockCqNameToOp); - verify(mockClientUpdateMessage, times(1)).addClientInterestList(mockClientProxyMembershipId, + verify(mockClientUpdateMessage).addOrSetClientCqs(mockClientProxyMembershipId, + mockClientCqConcurrentMap); + verify(mockClientUpdateMessage).addClientInterestList(mockClientProxyMembershipId, true); // Mock that the ClientUpdateMessage is only interested in invalidates, then do another put diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java index eab0025..593909f 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java @@ -3445,7 +3445,7 @@ public class HARegionQueue implements RegionQueue { if (haEventWrapper.getClientCqs() != null) { CqNameToOp clientCQ = haEventWrapper.getClientCqs().get(proxyID); if (clientCQ != null) { - msg.addClientCqs(proxyID, clientCQ); + msg.addOrSetClientCqs(proxyID, haEventWrapper.getClientCqs()); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java index 1558409..7d67cbb 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java @@ -1038,33 +1038,29 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N return this._clientCqs; } - /** - * Add cqs for the given client. - * - */ - public void addClientCqs(ClientProxyMembershipID clientId, CqNameToOp filteredCqs) { - if (this._clientCqs == null) { - this._clientCqs = new ClientCqConcurrentMap(); - this._hasCqs = true; + public void addOrSetClientCqs(ClientProxyMembershipID proxyID, ClientCqConcurrentMap clientCqs) { + if (_clientCqs == null) { + _clientCqs = clientCqs; + } else { + _clientCqs.put(proxyID, clientCqs.get(proxyID)); } - this._clientCqs.put(clientId, filteredCqs); } - void addClientCq(ClientProxyMembershipID clientId, String cqName, Integer cqEvent) { - if (this._clientCqs == null) { - this._clientCqs = new ClientCqConcurrentMap(); - this._hasCqs = true; + synchronized void addClientCq(ClientProxyMembershipID clientId, String cqName, Integer cqEvent) { + if (_clientCqs == null) { + _clientCqs = new ClientCqConcurrentMap(); + _hasCqs = true; } - CqNameToOp cqInfo = this._clientCqs.get(clientId); + CqNameToOp cqInfo = _clientCqs.get(clientId); if (cqInfo == null) { cqInfo = new CqNameToOpSingleEntry(cqName, cqEvent); - this._clientCqs.put(clientId, cqInfo); + _clientCqs.put(clientId, cqInfo); } else if (!cqInfo.isFull()) { cqInfo.add(cqName, cqEvent); } else { cqInfo = new CqNameToOpHashMap((CqNameToOpSingleEntry) cqInfo); cqInfo.add(cqName, cqEvent); - this._clientCqs.put(clientId, cqInfo); + _clientCqs.put(clientId, cqInfo); } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImplTest.java index 3363482..6d6095d 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImplTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImplTest.java @@ -22,7 +22,12 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.withSettings; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.apache.geode.CopyHelper; @@ -32,8 +37,29 @@ import org.apache.geode.internal.cache.EnumListenerEvent; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.statistics.StatisticsClock; import org.apache.geode.test.fake.Fakes; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; public class ClientUpdateMessageImplTest implements Serializable { + private final ClientProxyMembershipID client1 = mock(ClientProxyMembershipID.class); + private final ClientProxyMembershipID client2 = mock(ClientProxyMembershipID.class); + private final ClientUpdateMessageImpl.ClientCqConcurrentMap clientCqs = + new ClientUpdateMessageImpl.ClientCqConcurrentMap(); + + @Rule + public ExecutorServiceRule executorService = new ExecutorServiceRule(); + + @Before + public void setUp() { + ClientUpdateMessageImpl.CqNameToOpHashMap cqs1 = + new ClientUpdateMessageImpl.CqNameToOpHashMap(2); + cqs1.add("cqName1", 1); + cqs1.add("cqName2", 2); + clientCqs.put(client1, cqs1); + ClientUpdateMessageImpl.CqNameToOpSingleEntry cqs2 = + new ClientUpdateMessageImpl.CqNameToOpSingleEntry("cqName3", 3); + clientCqs.put(client2, cqs2); + } + @Test public void addInterestedClientTest() { ClientUpdateMessageImpl clientUpdateMessageImpl = new ClientUpdateMessageImpl(); @@ -109,4 +135,96 @@ public class ClientUpdateMessageImplTest implements Serializable { when(localRegion.getName()).thenReturn(regionName); return new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_CREATE, null, null); } + + @Test + public void addClientCqCanBeExecutedConcurrently() throws Exception { + ClientUpdateMessageImpl clientUpdateMessageImpl = new ClientUpdateMessageImpl(); + + int numOfEvents = 4; + int[] cqEvents = new int[numOfEvents]; + String[] cqNames = new String[numOfEvents]; + ClientProxyMembershipID[] clients = new ClientProxyMembershipID[numOfEvents]; + prepareCqInfo(numOfEvents, cqEvents, cqNames, clients); + + addClientCqConcurrently(clientUpdateMessageImpl, numOfEvents, cqEvents, cqNames, clients); + + assertThat(clientUpdateMessageImpl.getClientCqs()).hasSize(2); + assertThat(clientUpdateMessageImpl.getClientCqs().get(client1)).isInstanceOf( + ClientUpdateMessageImpl.CqNameToOpHashMap.class); + ClientUpdateMessageImpl.CqNameToOpHashMap client1Cqs = + (ClientUpdateMessageImpl.CqNameToOpHashMap) clientUpdateMessageImpl.getClientCqs() + .get(client1); + for (int i = 0; i < 3; i++) { + assertThat(client1Cqs.get("cqName" + i)).isEqualTo(i); + } + + assertThat(clientUpdateMessageImpl.getClientCqs().get(client2)).isInstanceOf( + ClientUpdateMessageImpl.CqNameToOpSingleEntry.class); + ClientUpdateMessageImpl.CqNameToOpSingleEntry client2Cqs = + (ClientUpdateMessageImpl.CqNameToOpSingleEntry) clientUpdateMessageImpl.getClientCqs() + .get(client2); + assertThat(client2Cqs.isEmpty()).isFalse(); + } + + private void prepareCqInfo(int numOfEvents, int[] cqEvents, String[] cqNames, + ClientProxyMembershipID[] clients) { + for (int i = 0; i < numOfEvents; i++) { + cqEvents[i] = i; + cqNames[i] = "cqName" + i; + if (i < 3) { + clients[i] = client1; + } else { + clients[i] = client2; + } + } + } + + private void addClientCqConcurrently(ClientUpdateMessageImpl clientUpdateMessageImpl, + int numOfEvents, int[] cqEvents, String[] cqNames, ClientProxyMembershipID[] clients) + throws InterruptedException, java.util.concurrent.ExecutionException { + List<Future<Void>> futures = new ArrayList<>(); + for (int i = 0; i < numOfEvents; i++) { + ClientProxyMembershipID client = clients[i]; + String cqName = cqNames[i]; + int cqEvent = cqEvents[i]; + futures.add(executorService + .submit(() -> clientUpdateMessageImpl.addClientCq(client, cqName, cqEvent))); + } + for (Future<Void> future : futures) { + future.get(); + } + } + + @Test + public void addOrSetClientCqsCanSetIfCqsMapIsNull() { + ClientUpdateMessageImpl clientUpdateMessageImpl = new ClientUpdateMessageImpl(); + + clientUpdateMessageImpl.addOrSetClientCqs(client1, clientCqs); + + assertThat(clientUpdateMessageImpl.getClientCqs()).isEqualTo(clientCqs); + } + + @Test + public void addOrSetClientCqsCanAddCqsIfCqsMapNotNull() { + ClientUpdateMessageImpl clientUpdateMessageImpl = new ClientUpdateMessageImpl(); + ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class); + clientUpdateMessageImpl.addClientCq(clientProxyMembershipID, "cqName", 10); + + clientUpdateMessageImpl.addOrSetClientCqs(client1, clientCqs); + + assertThat(clientUpdateMessageImpl.getClientCqs()).hasSize(2); + assertThat(clientUpdateMessageImpl.getClientCqs().get(client1)).isInstanceOf( + ClientUpdateMessageImpl.CqNameToOpHashMap.class); + ClientUpdateMessageImpl.CqNameToOpHashMap client1Cqs = + (ClientUpdateMessageImpl.CqNameToOpHashMap) clientUpdateMessageImpl.getClientCqs() + .get(client1); + assertThat(client1Cqs.get("cqName1")).isEqualTo(1); + assertThat(client1Cqs.get("cqName2")).isEqualTo(2); + + assertThat(clientUpdateMessageImpl.getClientCqs().get(clientProxyMembershipID)).isInstanceOf( + ClientUpdateMessageImpl.CqNameToOpSingleEntry.class); + + assertThat(clientUpdateMessageImpl.getClientCqs().get(client2)).isNull(); + } + } diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqDUnitTest.java new file mode 100644 index 0000000..cf1bdc0 --- /dev/null +++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqDUnitTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.query.cq.dunit; + +import static org.apache.geode.cache.Region.SEPARATOR; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.apache.geode.test.dunit.VM.getHostName; +import static org.apache.geode.test.dunit.VM.getVM; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientRegionFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.Pool; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.cache.query.CqAttributes; +import org.apache.geode.cache.query.CqAttributesFactory; +import org.apache.geode.cache.query.CqEvent; +import org.apache.geode.cache.query.CqListener; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.cache.query.data.Portfolio; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.server.ClientSubscriptionConfig; +import org.apache.geode.internal.cache.DiskStoreAttributes; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.ClientCacheRule; +import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; +import org.apache.geode.test.dunit.rules.DistributedRule; +import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; + +public class CqDUnitTest implements Serializable { + private static final int VM_COUNT = 8; + + private final String REGION_NAME = "region"; + + private String hostName; + private VM[] clients; + private int serverPort; + private TestCqListener[] testListener; + private int totalPut; + private int totalCQInvocations; + private final String cqName = "cqName"; + + @Rule + public DistributedRule distributedRule = new DistributedRule(VM_COUNT); + + @Rule + public CacheRule cacheRule = new CacheRule(VM_COUNT); + + @Rule + public ClientCacheRule clientCacheRule = new ClientCacheRule(VM_COUNT); + + @Rule + public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder(); + + @Rule + public SerializableTestName testName = new SerializableTestName(); + + @Rule + public DistributedRestoreSystemProperties restoreSystemProperties = + new DistributedRestoreSystemProperties(); + + @Before + public void setUp() { + hostName = getHostName(); + VM server = getVM(0); + getVM(0).invoke(() -> cacheRule.createCache()); + testListener = new TestCqListener[VM_COUNT - 1]; + + clients = new VM[VM_COUNT - 1]; + for (int i = 0; i < VM_COUNT - 1; i++) { + clients[i] = getVM(i + 1); + clients[i].invoke(() -> clientCacheRule.createClientCache()); + testListener[i] = new TestCqListener(); + } + + serverPort = server.invoke(() -> createSubscriptionServer(cacheRule.getCache())); + + for (VM client : clients) { + client.invoke(this::createRegionOnClient); + } + + totalPut = 500; + totalCQInvocations = totalPut * clients.length; + } + + @Test + public void clientCanInvokeCQListenersWhenHAContainerEnablesEviction() throws Exception { + registerCqs(); + + AsyncInvocation<?>[] asyncInvocations = new AsyncInvocation[VM_COUNT - 1]; + for (int i = 0; i < VM_COUNT - 1; i++) { + asyncInvocations[i] = clients[i].invokeAsync(this::doPuts); + } + + for (int i = 0; i < VM_COUNT - 1; i++) { + asyncInvocations[i].await(); + } + + for (int i = 0; i < VM_COUNT - 1; i++) { + clients[i].invoke(this::verifyCQListenerInvocations); + } + } + + private void verifyCQListenerInvocations() { + await().untilAsserted(() -> { + QueryService cqService = clientCacheRule.getClientCache().getQueryService(); + CqListener cqListener = cqService.getCq(cqName).getCqAttributes().getCqListener(); + + assertThat(totalCQInvocations).isEqualTo(((TestCqListener) cqListener).numEvents.get()); + }); + } + + private void registerCqs() { + for (int i = 0; i < VM_COUNT - 1; i++) { + registerCq(clients[i]); + } + } + + private void registerCq(VM client) { + client.invoke(() -> { + ClientCache clientCache = clientCacheRule.getClientCache(); + + QueryService queryService = clientCache.getQueryService(); + CqAttributesFactory cqaf = new CqAttributesFactory(); + + int i = client.getId() - 1; + cqaf.addCqListener(testListener[i]); + CqAttributes cqAttributes = cqaf.create(); + + queryService.newCq(cqName, "Select * from " + SEPARATOR + REGION_NAME + " where ID > 0", + cqAttributes) + .executeWithInitialResults(); + }); + } + + private void doPuts() { + ClientCache clientCache = clientCacheRule.getClientCache(); + Region<Object, Object> region = clientCache.getRegion(REGION_NAME); + + + for (int i = totalPut; i > 0; i--) { + doPut(region, i); + } + } + + private void doPut(Region<Object, Object> region, int i) { + region.put(i, new Portfolio(i)); + } + + private static class TestCqListener implements CqListener, Serializable { + AtomicInteger numEvents = new AtomicInteger(); + + @Override + public void onEvent(CqEvent aCqEvent) { + numEvents.incrementAndGet(); + } + + @Override + public void onError(CqEvent aCqEvent) {} + } + + private void createRegionOnClient() { + Pool pool = PoolManager.createFactory().addServer(hostName, serverPort) + .setSubscriptionEnabled(true).create("poolName"); + + ClientRegionFactory<Object, Object> crf = + clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY); + crf.setPoolName(pool.getName()); + crf.create(REGION_NAME); + } + + private int createSubscriptionServer(InternalCache cache) throws IOException { + initializeDiskStore(cache); + createRegionOnServer(cache); + return initializeCacheServerWithSubscription(cache); + } + + private void initializeDiskStore(InternalCache cache) throws IOException { + DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes(); + diskStoreAttributes.name = "clientQueueDS"; + diskStoreAttributes.diskDirs = new File[] {tempDir.newFolder(testName + "_dir")}; + cache.createDiskStoreFactory(diskStoreAttributes).create("clientQueueDS"); + } + + private void createRegionOnServer(InternalCache cache) { + cache.createRegionFactory(RegionShortcut.REPLICATE).create(REGION_NAME); + } + + private int initializeCacheServerWithSubscription(InternalCache cache) throws IOException { + CacheServer cacheServer = cache.addCacheServer(); + ClientSubscriptionConfig clientSubscriptionConfig = cacheServer.getClientSubscriptionConfig(); + clientSubscriptionConfig.setEvictionPolicy("entry"); + clientSubscriptionConfig.setCapacity(1); + clientSubscriptionConfig.setDiskStoreName("clientQueueDS"); + cacheServer.setPort(0); + cacheServer.setHostnameForClients(hostName); + cacheServer.start(); + return cacheServer.getPort(); + } +}