This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new f117a59548 GEODE-10260: make sure message is added before they are
processed. (#7628)
f117a59548 is described below
commit f117a59548fe392725616bf9f354748e1ba785d9
Author: Jinmei Liao <[email protected]>
AuthorDate: Thu Apr 28 11:58:49 2022 -0700
GEODE-10260: make sure message is added before they are processed. (#7628)
---
.../apache/geode/internal/cache/FilterProfile.java | 31 ++++++----
.../geode/cache/query/cq/CQDistributedTest.java | 71 ++++++++++++++++++----
2 files changed, 78 insertions(+), 24 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
index 3ae0185302..ed52e24190 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
@@ -1846,18 +1846,27 @@ public class FilterProfile implements
DataSerializableFixedID {
}
CacheDistributionAdvisor cda = (CacheDistributionAdvisor)
r.getDistributionAdvisor();
- CacheDistributionAdvisor.CacheProfile cp =
- (CacheDistributionAdvisor.CacheProfile)
cda.getProfile(getSender());
- if (cp == null) { // PR accessors do not keep filter profiles around
- if (logger.isDebugEnabled()) {
- logger.debug(
- "No cache profile to update, adding filter profile message to
queue. Message :{}",
- this);
+ CacheDistributionAdvisor.CacheProfile cp;
+
+ // prevent adding the message to queue after we have processed the
queue
+ // in CreateRegionReplyProcessor.process
+ synchronized (cda) {
+ cp = (CacheDistributionAdvisor.CacheProfile)
cda.getProfile(getSender());
+ if (cp == null) {
+ // only need to hold the lock if cda doesn't have the profile yet.
This makes sure
+ // we add the message to the queue before they are processed
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "No cache profile to update, adding filter profile message
to queue. Message :{}",
+ this);
+ }
+ FilterProfile localFP = ((LocalRegion) r).getFilterProfile();
+ localFP.addToFilterProfileQueue(getSender(), this);
+ dm.getCancelCriterion().checkCancelInProgress(null);
}
- FilterProfile localFP = ((LocalRegion) r).getFilterProfile();
- localFP.addToFilterProfileQueue(getSender(), this);
- dm.getCancelCriterion().checkCancelInProgress(null);
- } else {
+ }
+
+ if (cp != null) {
cp.hasCacheServer = true;
FilterProfile fp = cp.filterProfile;
if (fp == null) { // PR accessors do not keep filter profiles around
diff --git
a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
index 4bb7f2fa21..8c35047951 100644
---
a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
+++
b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
@@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.util.Properties;
+import java.util.concurrent.Future;
import org.junit.Before;
import org.junit.Rule;
@@ -41,9 +42,11 @@ import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.data.Portfolio;
+import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
@Category({ClientSubscriptionTest.class})
@@ -58,24 +61,26 @@ public class CQDistributedTest implements Serializable {
private TestCqListener testListener;
private TestCqListener2 testListener2;
- private Region region;
+ private Region<Integer, Portfolio> region;
@Rule
public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
@Before
public void before() throws Exception {
- locator = clusterStartupRule.startLocatorVM(1, new Properties());
+ locator = clusterStartupRule.startLocatorVM(0, new Properties());
Integer locator1Port = locator.getPort();
- server = clusterStartupRule.startServerVM(3, locator1Port);
+ server = clusterStartupRule.startServerVM(1, locator1Port);
createServerRegion(server, RegionShortcut.PARTITION);
- server2 = clusterStartupRule.startServerVM(4, locator1Port);
+ server2 = clusterStartupRule.startServerVM(2, locator1Port);
createServerRegion(server2, RegionShortcut.PARTITION);
ClientCache clientCache = createClientCache(locator1Port);
region =
-
clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
+ clientCache
+ .<Integer,
Portfolio>createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+ .create("region");
qs = clientCache.getQueryService();
CqAttributesFactory cqaf = new CqAttributesFactory();
@@ -87,12 +92,52 @@ public class CQDistributedTest implements Serializable {
cqa = cqaf.create();
}
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ @Test
+ // Before the fix, this test will reproduce pretty consistently if we put a
sleep statement before
+ // we do localFP.addToFilterProfileQueue in
FilterProfile$OperationMessage.process().
+ public void filterProfileUpdate() throws Exception {
+ MemberVM newServer = clusterStartupRule.startServerVM(3,
locator.getPort());
+
+ // create 10 cqs to begin with
+ for (int i = 0; i < 10; i++) {
+ qs.newCq("query_" + i, "Select * from " + SEPARATOR + "region r where
r.ID = " + i, cqa)
+ .execute();
+ }
+
+ AsyncInvocation regionCreate = newServer.invokeAsync(() -> {
+ ClusterStartupRule.memberStarter.createRegion(RegionShortcut.PARTITION,
"region");
+ });
+
+ Future<Void> createCqs = executor.submit(() -> {
+ for (int i = 10; i < 100; i++) {
+ qs.newCq("query_" + i, "Select * from " + SEPARATOR + "region r where
r.ID = " + i, cqa)
+ .execute();
+ }
+ });
+
+ regionCreate.await();
+ createCqs.get();
+
+ newServer.invoke(() -> {
+ Region<Integer, Portfolio> regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
+ for (int i = 0; i < 100; i++) {
+ regionOnServer.put(i, new Portfolio(i));
+ }
+ });
+
+ // make sure all cq's will get its own event, so total events = total # of
cqs.
+ await().untilAsserted(() ->
assertThat(testListener.onEventCalls).isEqualTo(100));
+ }
+
@Test
public void cqUsingModShouldFireEventsWhenFilterCriteriaIsMet() throws
Exception {
qs.newCq("Select * from " + SEPARATOR + "region r where r.ID % 2 = 1",
cqa).execute();
server.invoke(() -> {
- Region regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
regionOnServer.put(0, new Portfolio(0));
regionOnServer.put(1, new Portfolio(1));
regionOnServer.put(2, new Portfolio(2));
@@ -108,7 +153,7 @@ public class CQDistributedTest implements Serializable {
public void cqUsingPlusShouldFireEventsWhenFilterCriteriaIsMet() throws
Exception {
qs.newCq("Select * from " + SEPARATOR + "region r where r.ID + 3 > 4",
cqa).execute();
server.invoke(() -> {
- Region regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
regionOnServer.put(0, new Portfolio(0));
regionOnServer.put(1, new Portfolio(1));
regionOnServer.put(2, new Portfolio(2));
@@ -124,7 +169,7 @@ public class CQDistributedTest implements Serializable {
public void cqUsingSubtractShouldFireEventsWhenFilterCriteriaIsMet() throws
Exception {
qs.newCq("Select * from " + SEPARATOR + "region r where r.ID - 3 < 0",
cqa).execute();
server.invoke(() -> {
- Region regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
regionOnServer.put(0, new Portfolio(0));
regionOnServer.put(1, new Portfolio(1));
regionOnServer.put(2, new Portfolio(2));
@@ -140,7 +185,7 @@ public class CQDistributedTest implements Serializable {
public void cqUsingDivideShouldFireEventsWhenFilterCriteriaIsMet() throws
Exception {
qs.newCq("Select * from " + SEPARATOR + "region r where r.ID / 2 = 1",
cqa).execute();
server.invoke(() -> {
- Region regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
regionOnServer.put(0, new Portfolio(0));
regionOnServer.put(1, new Portfolio(1));
regionOnServer.put(2, new Portfolio(2));
@@ -156,7 +201,7 @@ public class CQDistributedTest implements Serializable {
public void cqUsingMultiplyShouldFireEventsWhenFilterCriteriaIsMet() throws
Exception {
qs.newCq("Select * from " + SEPARATOR + "region r where r.ID * 2 > 3",
cqa).execute();
server.invoke(() -> {
- Region regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
regionOnServer.put(0, new Portfolio(0));
regionOnServer.put(1, new Portfolio(1));
regionOnServer.put(2, new Portfolio(2));
@@ -172,7 +217,7 @@ public class CQDistributedTest implements Serializable {
public void
cqExecuteWithInitialResultsWithValuesMatchingPrimaryKeyShouldNotThrowClassCastException()
throws Exception {
server.invoke(() -> {
- Region regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
ClusterStartupRule.getCache().getQueryService().createKeyIndex("PrimaryKeyIndex",
"ID",
SEPARATOR + "region");
regionOnServer.put(0, new Portfolio(0));
@@ -193,7 +238,7 @@ public class CQDistributedTest implements Serializable {
qs.newCq("Select * from /region r where r.ID = 1", cqa).execute();
server.invoke(() -> {
- Region regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
final CacheTransactionManager txMgr =
ClusterStartupRule.getCache().getCacheTransactionManager();
@@ -222,7 +267,7 @@ public class CQDistributedTest implements Serializable {
qs.newCq("Select * from /region r where r.ID = 1", cqa).execute();
server.invoke(() -> {
- Region regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer =
ClusterStartupRule.getCache().getRegion("region");
// CREATE new entry
regionOnServer.put(0, new Portfolio(1));