This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new d48607d GEODE-4868: depose primary should reduce brq's size in
deposePrimaryForColocatedChildren (#1625)
d48607d is described below
commit d48607d51f718a6c497d7e6b14a8dc9b87fe2e67
Author: Xiaojian Zhou <[email protected]>
AuthorDate: Fri Mar 16 11:07:30 2018 -0700
GEODE-4868: depose primary should reduce brq's size in
deposePrimaryForColocatedChildren (#1625)
---
.../org/apache/geode/internal/cache/BucketAdvisor.java | 8 ++++----
.../internal/cache/wan/AsyncEventQueueTestBase.java | 3 +++
.../wan/asyncqueue/AsyncEventListenerDUnitTest.java | 17 +++++++++++++++++
3 files changed, 24 insertions(+), 4 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index fc14773..074a60d 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -279,10 +279,6 @@ public class BucketAdvisor extends
CacheDistributionAdvisor {
} finally {
this.activePrimaryMoveLock.unlock();
if (needToSendProfileUpdate) {
- if (this.getBucket() instanceof BucketRegionQueue) {
- BucketRegionQueue brq = (BucketRegionQueue) this.getBucket();
- brq.decQueueSize(brq.size());
- }
sendProfileUpdate();
}
}
@@ -316,6 +312,10 @@ public class BucketAdvisor extends
CacheDistributionAdvisor {
if (b != null) {
BucketAdvisor ba = b.getBucketAdvisor();
deposedChildPrimaries = ba.deposePrimary() && deposedChildPrimaries;
+ if (b instanceof BucketRegionQueue) {
+ BucketRegionQueue brq = (BucketRegionQueue) b;
+ brq.decQueueSize(brq.size());
+ }
}
}
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index 545d0ca..7a956c8 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -717,6 +717,9 @@ public class AsyncEventQueueTestBase extends
JUnit4DistributedTestCase {
}
}
final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)
queue).getStatistics();
+ Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ .until(() -> assertEquals("Expected queue entries: " + queueSize + "
but actual entries: "
+ + statistics.getEventQueueSize(), queueSize,
statistics.getEventQueueSize()));
assertEquals(queueSize, statistics.getEventQueueSize());
assertEquals(eventsReceived, statistics.getEventsReceived());
assertEquals(eventsQueued, statistics.getEventsQueued());
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index 465f35a..aa1db53 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -1519,6 +1519,11 @@ public class AsyncEventListenerDUnitTest extends
AsyncEventQueueTestBase {
() ->
AsyncEventQueueTestBase.getAllPrimaryBucketsOnTheNode(getTestMethodName() +
"_PR"));
LogWriterUtils.getLogWriter().info("Primary buckets on vm2: " +
primaryBucketsvm2);
+
+ // before shutdown vm2, both vm1 and vm2 should have 40 events in primary
queue
+ vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln",
40, 80, 80, 0));
+ vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln",
40, 80, 80, 0));
+
// ---------------------------- Kill vm2 --------------------------
vm2.invoke(() -> AsyncEventQueueTestBase.killSender());
// ----------------------------------------------------------------
@@ -1527,6 +1532,8 @@ public class AsyncEventListenerDUnitTest extends
AsyncEventQueueTestBase {
vm3.invoke(createCacheRunnable(lnPort));
vm3.invoke(() ->
AsyncEventQueueTestBase.createAsyncEventQueueWithListener2("ln", true, 100, 5,
false, null));
+ // vm3 will move some primary buckets from vm1, but vm1's primary queue
size did not reduce
+ vm3.invoke(pauseAsyncEventQueueRunnable());
vm3.invoke(() ->
AsyncEventQueueTestBase.createPRWithRedundantCopyWithAsyncEventQueue(
getTestMethodName() + "_PR", "ln", isOffHeap()));
@@ -1535,7 +1542,17 @@ public class AsyncEventListenerDUnitTest extends
AsyncEventQueueTestBase {
String regionName = getTestMethodName() + "_PR";
Set<Integer> primaryBucketsvm3 = (Set<Integer>) vm3
.invoke(() ->
AsyncEventQueueTestBase.getAllPrimaryBucketsOnTheNode(regionName));
+ LogWriterUtils.getLogWriter().info("Primary buckets on vm3: " +
primaryBucketsvm3);
+ Set<Integer> primaryBucketsvm1 = (Set<Integer>) vm1.invoke(
+ () ->
AsyncEventQueueTestBase.getAllPrimaryBucketsOnTheNode(getTestMethodName() +
"_PR"));
+ LogWriterUtils.getLogWriter()
+ .info("After shutdown vm2, started vm3, Primary buckets on vm1: " +
primaryBucketsvm1);
+
+ // vm1.invoke(()->AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln",
80, 80, 80, 0));
+ vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln",
40, 80, 80, 0));
+ vm3.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln",
40, 0, 0, 0));
+ vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln"));
vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln"));
vm1.invoke(() ->
AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
--
To stop receiving notification emails like this one, please contact
[email protected].