This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.12 by this push:
     new edb24b8  GEODE-9596: Avoid creating multiple cq maps in 
ClientUpdateMessage (#6869)
edb24b8 is described below

commit edb24b8b2ccd498a9fb710872679c661213f8968
Author: Eric Shu <e...@pivotal.io>
AuthorDate: Fri Sep 17 15:48:49 2021 -0700

    GEODE-9596: Avoid creating multiple cq maps in ClientUpdateMessage (#6869)
    
      * This can occur if HAContainer enables eviction.
      * Also addClientCq can be accessed concurrently by intialization threads 
during queue registration.
    
    (cherry picked from commit 00b41562f8c4e5982190795c0426e5884f57724a)
---
 .../internal/cache/ha/HARegionQueueJUnitTest.java  |   6 +-
 .../geode/internal/cache/ha/HARegionQueue.java     |   2 +-
 .../tier/sockets/ClientUpdateMessageImpl.java      |  28 ++-
 .../tier/sockets/ClientUpdateMessageImplTest.java  | 118 +++++++++++
 .../geode/cache/query/cq/dunit/CqDUnitTest.java    | 228 +++++++++++++++++++++
 5 files changed, 362 insertions(+), 20 deletions(-)

diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
index 67c39a8..93b5716 100755
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
@@ -1569,9 +1569,9 @@ public class HARegionQueueJUnitTest {
     regionQueue.putEntryConditionallyIntoHAContainer(mockHAEventWrapper);
     regionQueue.putEntryConditionallyIntoHAContainer(mockHAEventWrapper);
 
-    verify(mockClientUpdateMessage, 
times(1)).addClientCqs(mockClientProxyMembershipId,
-        mockCqNameToOp);
-    verify(mockClientUpdateMessage, 
times(1)).addClientInterestList(mockClientProxyMembershipId,
+    
verify(mockClientUpdateMessage).addOrSetClientCqs(mockClientProxyMembershipId,
+        mockClientCqConcurrentMap);
+    
verify(mockClientUpdateMessage).addClientInterestList(mockClientProxyMembershipId,
         true);
 
     // Mock that the ClientUpdateMessage is only interested in invalidates, 
then do another put
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index eab0025..593909f 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -3445,7 +3445,7 @@ public class HARegionQueue implements RegionQueue {
       if (haEventWrapper.getClientCqs() != null) {
         CqNameToOp clientCQ = haEventWrapper.getClientCqs().get(proxyID);
         if (clientCQ != null) {
-          msg.addClientCqs(proxyID, clientCQ);
+          msg.addOrSetClientCqs(proxyID, haEventWrapper.getClientCqs());
         }
       }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index 1558409..7d67cbb 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -1038,33 +1038,29 @@ public class ClientUpdateMessageImpl implements 
ClientUpdateMessage, Sizeable, N
     return this._clientCqs;
   }
 
-  /**
-   * Add cqs for the given client.
-   *
-   */
-  public void addClientCqs(ClientProxyMembershipID clientId, CqNameToOp 
filteredCqs) {
-    if (this._clientCqs == null) {
-      this._clientCqs = new ClientCqConcurrentMap();
-      this._hasCqs = true;
+  public void addOrSetClientCqs(ClientProxyMembershipID proxyID, 
ClientCqConcurrentMap clientCqs) {
+    if (_clientCqs == null) {
+      _clientCqs = clientCqs;
+    } else {
+      _clientCqs.put(proxyID, clientCqs.get(proxyID));
     }
-    this._clientCqs.put(clientId, filteredCqs);
   }
 
-  void addClientCq(ClientProxyMembershipID clientId, String cqName, Integer 
cqEvent) {
-    if (this._clientCqs == null) {
-      this._clientCqs = new ClientCqConcurrentMap();
-      this._hasCqs = true;
+  synchronized void addClientCq(ClientProxyMembershipID clientId, String 
cqName, Integer cqEvent) {
+    if (_clientCqs == null) {
+      _clientCqs = new ClientCqConcurrentMap();
+      _hasCqs = true;
     }
-    CqNameToOp cqInfo = this._clientCqs.get(clientId);
+    CqNameToOp cqInfo = _clientCqs.get(clientId);
     if (cqInfo == null) {
       cqInfo = new CqNameToOpSingleEntry(cqName, cqEvent);
-      this._clientCqs.put(clientId, cqInfo);
+      _clientCqs.put(clientId, cqInfo);
     } else if (!cqInfo.isFull()) {
       cqInfo.add(cqName, cqEvent);
     } else {
       cqInfo = new CqNameToOpHashMap((CqNameToOpSingleEntry) cqInfo);
       cqInfo.add(cqName, cqEvent);
-      this._clientCqs.put(clientId, cqInfo);
+      _clientCqs.put(clientId, cqInfo);
     }
   }
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImplTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImplTest.java
index 3363482..6d6095d 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImplTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImplTest.java
@@ -22,7 +22,12 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.withSettings;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
 
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.geode.CopyHelper;
@@ -32,8 +37,29 @@ import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.statistics.StatisticsClock;
 import org.apache.geode.test.fake.Fakes;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
 public class ClientUpdateMessageImplTest implements Serializable {
+  private final ClientProxyMembershipID client1 = 
mock(ClientProxyMembershipID.class);
+  private final ClientProxyMembershipID client2 = 
mock(ClientProxyMembershipID.class);
+  private final ClientUpdateMessageImpl.ClientCqConcurrentMap clientCqs =
+      new ClientUpdateMessageImpl.ClientCqConcurrentMap();
+
+  @Rule
+  public ExecutorServiceRule executorService = new ExecutorServiceRule();
+
+  @Before
+  public void setUp() {
+    ClientUpdateMessageImpl.CqNameToOpHashMap cqs1 =
+        new ClientUpdateMessageImpl.CqNameToOpHashMap(2);
+    cqs1.add("cqName1", 1);
+    cqs1.add("cqName2", 2);
+    clientCqs.put(client1, cqs1);
+    ClientUpdateMessageImpl.CqNameToOpSingleEntry cqs2 =
+        new ClientUpdateMessageImpl.CqNameToOpSingleEntry("cqName3", 3);
+    clientCqs.put(client2, cqs2);
+  }
+
   @Test
   public void addInterestedClientTest() {
     ClientUpdateMessageImpl clientUpdateMessageImpl = new 
ClientUpdateMessageImpl();
@@ -109,4 +135,96 @@ public class ClientUpdateMessageImplTest implements 
Serializable {
     when(localRegion.getName()).thenReturn(regionName);
     return new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_CREATE, null, 
null);
   }
+
+  @Test
+  public void addClientCqCanBeExecutedConcurrently() throws Exception {
+    ClientUpdateMessageImpl clientUpdateMessageImpl = new 
ClientUpdateMessageImpl();
+
+    int numOfEvents = 4;
+    int[] cqEvents = new int[numOfEvents];
+    String[] cqNames = new String[numOfEvents];
+    ClientProxyMembershipID[] clients = new 
ClientProxyMembershipID[numOfEvents];
+    prepareCqInfo(numOfEvents, cqEvents, cqNames, clients);
+
+    addClientCqConcurrently(clientUpdateMessageImpl, numOfEvents, cqEvents, 
cqNames, clients);
+
+    assertThat(clientUpdateMessageImpl.getClientCqs()).hasSize(2);
+    
assertThat(clientUpdateMessageImpl.getClientCqs().get(client1)).isInstanceOf(
+        ClientUpdateMessageImpl.CqNameToOpHashMap.class);
+    ClientUpdateMessageImpl.CqNameToOpHashMap client1Cqs =
+        (ClientUpdateMessageImpl.CqNameToOpHashMap) 
clientUpdateMessageImpl.getClientCqs()
+            .get(client1);
+    for (int i = 0; i < 3; i++) {
+      assertThat(client1Cqs.get("cqName" + i)).isEqualTo(i);
+    }
+
+    
assertThat(clientUpdateMessageImpl.getClientCqs().get(client2)).isInstanceOf(
+        ClientUpdateMessageImpl.CqNameToOpSingleEntry.class);
+    ClientUpdateMessageImpl.CqNameToOpSingleEntry client2Cqs =
+        (ClientUpdateMessageImpl.CqNameToOpSingleEntry) 
clientUpdateMessageImpl.getClientCqs()
+            .get(client2);
+    assertThat(client2Cqs.isEmpty()).isFalse();
+  }
+
+  private void prepareCqInfo(int numOfEvents, int[] cqEvents, String[] cqNames,
+      ClientProxyMembershipID[] clients) {
+    for (int i = 0; i < numOfEvents; i++) {
+      cqEvents[i] = i;
+      cqNames[i] = "cqName" + i;
+      if (i < 3) {
+        clients[i] = client1;
+      } else {
+        clients[i] = client2;
+      }
+    }
+  }
+
+  private void addClientCqConcurrently(ClientUpdateMessageImpl 
clientUpdateMessageImpl,
+      int numOfEvents, int[] cqEvents, String[] cqNames, 
ClientProxyMembershipID[] clients)
+      throws InterruptedException, java.util.concurrent.ExecutionException {
+    List<Future<Void>> futures = new ArrayList<>();
+    for (int i = 0; i < numOfEvents; i++) {
+      ClientProxyMembershipID client = clients[i];
+      String cqName = cqNames[i];
+      int cqEvent = cqEvents[i];
+      futures.add(executorService
+          .submit(() -> clientUpdateMessageImpl.addClientCq(client, cqName, 
cqEvent)));
+    }
+    for (Future<Void> future : futures) {
+      future.get();
+    }
+  }
+
+  @Test
+  public void addOrSetClientCqsCanSetIfCqsMapIsNull() {
+    ClientUpdateMessageImpl clientUpdateMessageImpl = new 
ClientUpdateMessageImpl();
+
+    clientUpdateMessageImpl.addOrSetClientCqs(client1, clientCqs);
+
+    assertThat(clientUpdateMessageImpl.getClientCqs()).isEqualTo(clientCqs);
+  }
+
+  @Test
+  public void addOrSetClientCqsCanAddCqsIfCqsMapNotNull() {
+    ClientUpdateMessageImpl clientUpdateMessageImpl = new 
ClientUpdateMessageImpl();
+    ClientProxyMembershipID clientProxyMembershipID = 
mock(ClientProxyMembershipID.class);
+    clientUpdateMessageImpl.addClientCq(clientProxyMembershipID, "cqName", 10);
+
+    clientUpdateMessageImpl.addOrSetClientCqs(client1, clientCqs);
+
+    assertThat(clientUpdateMessageImpl.getClientCqs()).hasSize(2);
+    
assertThat(clientUpdateMessageImpl.getClientCqs().get(client1)).isInstanceOf(
+        ClientUpdateMessageImpl.CqNameToOpHashMap.class);
+    ClientUpdateMessageImpl.CqNameToOpHashMap client1Cqs =
+        (ClientUpdateMessageImpl.CqNameToOpHashMap) 
clientUpdateMessageImpl.getClientCqs()
+            .get(client1);
+    assertThat(client1Cqs.get("cqName1")).isEqualTo(1);
+    assertThat(client1Cqs.get("cqName2")).isEqualTo(2);
+
+    
assertThat(clientUpdateMessageImpl.getClientCqs().get(clientProxyMembershipID)).isInstanceOf(
+        ClientUpdateMessageImpl.CqNameToOpSingleEntry.class);
+
+    assertThat(clientUpdateMessageImpl.getClientCqs().get(client2)).isNull();
+  }
+
 }
diff --git 
a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqDUnitTest.java
 
b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqDUnitTest.java
new file mode 100644
index 0000000..cf1bdc0
--- /dev/null
+++ 
b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqDUnitTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.data.Portfolio;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.server.ClientSubscriptionConfig;
+import org.apache.geode.internal.cache.DiskStoreAttributes;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import 
org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class CqDUnitTest implements Serializable {
+  private static final int VM_COUNT = 8;
+
+  private final String REGION_NAME = "region";
+
+  private String hostName;
+  private VM[] clients;
+  private int serverPort;
+  private TestCqListener[] testListener;
+  private int totalPut;
+  private int totalCQInvocations;
+  private final String cqName = "cqName";
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule(VM_COUNT);
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule(VM_COUNT);
+
+  @Rule
+  public ClientCacheRule clientCacheRule = new ClientCacheRule(VM_COUNT);
+
+  @Rule
+  public SerializableTemporaryFolder tempDir = new 
SerializableTemporaryFolder();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
+
+  @Before
+  public void setUp() {
+    hostName = getHostName();
+    VM server = getVM(0);
+    getVM(0).invoke(() -> cacheRule.createCache());
+    testListener = new TestCqListener[VM_COUNT - 1];
+
+    clients = new VM[VM_COUNT - 1];
+    for (int i = 0; i < VM_COUNT - 1; i++) {
+      clients[i] = getVM(i + 1);
+      clients[i].invoke(() -> clientCacheRule.createClientCache());
+      testListener[i] = new TestCqListener();
+    }
+
+    serverPort = server.invoke(() -> 
createSubscriptionServer(cacheRule.getCache()));
+
+    for (VM client : clients) {
+      client.invoke(this::createRegionOnClient);
+    }
+
+    totalPut = 500;
+    totalCQInvocations = totalPut * clients.length;
+  }
+
+  @Test
+  public void clientCanInvokeCQListenersWhenHAContainerEnablesEviction() 
throws Exception {
+    registerCqs();
+
+    AsyncInvocation<?>[] asyncInvocations = new AsyncInvocation[VM_COUNT - 1];
+    for (int i = 0; i < VM_COUNT - 1; i++) {
+      asyncInvocations[i] = clients[i].invokeAsync(this::doPuts);
+    }
+
+    for (int i = 0; i < VM_COUNT - 1; i++) {
+      asyncInvocations[i].await();
+    }
+
+    for (int i = 0; i < VM_COUNT - 1; i++) {
+      clients[i].invoke(this::verifyCQListenerInvocations);
+    }
+  }
+
+  private void verifyCQListenerInvocations() {
+    await().untilAsserted(() -> {
+      QueryService cqService = 
clientCacheRule.getClientCache().getQueryService();
+      CqListener cqListener = 
cqService.getCq(cqName).getCqAttributes().getCqListener();
+
+      assertThat(totalCQInvocations).isEqualTo(((TestCqListener) 
cqListener).numEvents.get());
+    });
+  }
+
+  private void registerCqs() {
+    for (int i = 0; i < VM_COUNT - 1; i++) {
+      registerCq(clients[i]);
+    }
+  }
+
+  private void registerCq(VM client) {
+    client.invoke(() -> {
+      ClientCache clientCache = clientCacheRule.getClientCache();
+
+      QueryService queryService = clientCache.getQueryService();
+      CqAttributesFactory cqaf = new CqAttributesFactory();
+
+      int i = client.getId() - 1;
+      cqaf.addCqListener(testListener[i]);
+      CqAttributes cqAttributes = cqaf.create();
+
+      queryService.newCq(cqName, "Select * from " + SEPARATOR + REGION_NAME + 
" where ID > 0",
+          cqAttributes)
+          .executeWithInitialResults();
+    });
+  }
+
+  private void doPuts() {
+    ClientCache clientCache = clientCacheRule.getClientCache();
+    Region<Object, Object> region = clientCache.getRegion(REGION_NAME);
+
+
+    for (int i = totalPut; i > 0; i--) {
+      doPut(region, i);
+    }
+  }
+
+  private void doPut(Region<Object, Object> region, int i) {
+    region.put(i, new Portfolio(i));
+  }
+
+  private static class TestCqListener implements CqListener, Serializable {
+    AtomicInteger numEvents = new AtomicInteger();
+
+    @Override
+    public void onEvent(CqEvent aCqEvent) {
+      numEvents.incrementAndGet();
+    }
+
+    @Override
+    public void onError(CqEvent aCqEvent) {}
+  }
+
+  private void createRegionOnClient() {
+    Pool pool = PoolManager.createFactory().addServer(hostName, serverPort)
+        .setSubscriptionEnabled(true).create("poolName");
+
+    ClientRegionFactory<Object, Object> crf =
+        
clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY);
+    crf.setPoolName(pool.getName());
+    crf.create(REGION_NAME);
+  }
+
+  private int createSubscriptionServer(InternalCache cache) throws IOException 
{
+    initializeDiskStore(cache);
+    createRegionOnServer(cache);
+    return initializeCacheServerWithSubscription(cache);
+  }
+
+  private void initializeDiskStore(InternalCache cache) throws IOException {
+    DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes();
+    diskStoreAttributes.name = "clientQueueDS";
+    diskStoreAttributes.diskDirs = new File[] {tempDir.newFolder(testName + 
"_dir")};
+    cache.createDiskStoreFactory(diskStoreAttributes).create("clientQueueDS");
+  }
+
+  private void createRegionOnServer(InternalCache cache) {
+    cache.createRegionFactory(RegionShortcut.REPLICATE).create(REGION_NAME);
+  }
+
+  private int initializeCacheServerWithSubscription(InternalCache cache) 
throws IOException {
+    CacheServer cacheServer = cache.addCacheServer();
+    ClientSubscriptionConfig clientSubscriptionConfig = 
cacheServer.getClientSubscriptionConfig();
+    clientSubscriptionConfig.setEvictionPolicy("entry");
+    clientSubscriptionConfig.setCapacity(1);
+    clientSubscriptionConfig.setDiskStoreName("clientQueueDS");
+    cacheServer.setPort(0);
+    cacheServer.setHostnameForClients(hostName);
+    cacheServer.start();
+    return cacheServer.getPort();
+  }
+}

Reply via email to