sweeper now used instead of repl vs non-repl variables
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b9f7baaa Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b9f7baaa Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b9f7baaa Branch: refs/heads/feature/GEODE-1420 Commit: b9f7baaac3681b5a3f8ed6157d9210a740db9111 Parents: 66b5945 Author: Darrel Schneider <dschnei...@pivotal.io> Authored: Wed Jun 22 09:24:45 2016 -0700 Committer: Darrel Schneider <dschnei...@pivotal.io> Committed: Wed Jun 22 09:24:45 2016 -0700 ---------------------------------------------------------------------- .../internal/cache/TombstoneService.java | 167 ++++++++----------- 1 file changed, 74 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9f7baaa/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java index 5c6b1dd..7f6140f 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java @@ -107,24 +107,12 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { public static boolean IDLE_EXPIRATION = false; // dunit test hook for forced batch expiration /** - * tasks for cleaning up tombstones - */ - private TombstoneSweeper replicatedTombstoneSweeper; - private TombstoneSweeper nonReplicatedTombstoneSweeper; - - /** a tombstone service is tied to a cache */ - private GemFireCacheImpl cache; - - /** - * two queues, one for replicated regions (including PR buckets) and one for + * two sweepers, one for replicated regions (including PR buckets) and one for * other regions. They have different timeout intervals. */ - private Queue<Tombstone> replicatedTombstones = new ConcurrentLinkedQueue<Tombstone>(); - private Queue<Tombstone> nonReplicatedTombstones = new ConcurrentLinkedQueue<Tombstone>(); + private final TombstoneSweeper replicatedTombstoneSweeper; + private final TombstoneSweeper nonReplicatedTombstoneSweeper; - private AtomicLong replicatedTombstoneQueueSize = new AtomicLong(); - private AtomicLong nonReplicatedTombstoneQueueSize = new AtomicLong(); - public Object blockGCLock = new Object(); private int progressingDeltaGIICount; @@ -135,11 +123,10 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { } private TombstoneService(GemFireCacheImpl cache) { - this.cache = cache; - this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, this.replicatedTombstones, - REPLICATED_TOMBSTONE_TIMEOUT, true, this.replicatedTombstoneQueueSize); - this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, this.nonReplicatedTombstones, - CLIENT_TOMBSTONE_TIMEOUT, false, this.nonReplicatedTombstoneQueueSize); + this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, new ConcurrentLinkedQueue<Tombstone>(), + REPLICATED_TOMBSTONE_TIMEOUT, true, new AtomicLong()); + this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, new ConcurrentLinkedQueue<Tombstone>(), + CLIENT_TOMBSTONE_TIMEOUT, false, new AtomicLong()); startSweeper(this.replicatedTombstoneSweeper); startSweeper(this.nonReplicatedTombstoneSweeper); } @@ -200,20 +187,17 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { logger.warn("Detected an attempt to schedule a tombstone for an entry that is not versioned in region " + r.getFullPath(), new Exception("stack trace")); return; } - boolean useReplicated = useReplicatedQueue(r); Tombstone ts = new Tombstone(entry, r, destroyedVersion); - if (useReplicated) { - this.replicatedTombstones.add(ts); - this.replicatedTombstoneQueueSize.addAndGet(ts.getSize()); - } else { - this.nonReplicatedTombstones.add(ts); - this.nonReplicatedTombstoneQueueSize.addAndGet(ts.getSize()); - } + this.getSweeper(r).scheduleTombstone(ts); } - private boolean useReplicatedQueue(LocalRegion r) { - return (r.getScope().isDistributed() && r.getServerProxy() == null) && r.dataPolicy.withReplication(); + private TombstoneSweeper getSweeper(LocalRegion r) { + if (r.getScope().isDistributed() && r.getServerProxy() == null && r.dataPolicy.withReplication()) { + return this.replicatedTombstoneSweeper; + } else { + return this.nonReplicatedTombstoneSweeper; + } } @@ -223,8 +207,8 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { * @param r */ public void unscheduleTombstones(LocalRegion r) { - Queue<Tombstone> queue = - r.getAttributes().getDataPolicy().withReplication() ? replicatedTombstones : nonReplicatedTombstones; + TombstoneSweeper sweeper = this.getSweeper(r); + Queue<Tombstone> queue = sweeper.getQueue(); long removalSize = 0; for (Iterator<Tombstone> it=queue.iterator(); it.hasNext(); ) { Tombstone t = it.next(); @@ -233,11 +217,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { removalSize += t.getSize(); } } - if (queue == replicatedTombstones) { - replicatedTombstoneQueueSize.addAndGet(-removalSize); - } else { - nonReplicatedTombstoneQueueSize.addAndGet(-removalSize); - } + sweeper.incQueueSize(-removalSize); } public int getGCBlockCount() { @@ -272,34 +252,16 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { } return null; } - Queue<Tombstone> queue; - boolean replicated = false; long removalSize = 0; - Tombstone currentTombstone; - StoppableReentrantLock lock = null; - boolean locked = false; if (logger.isDebugEnabled()) { logger.debug("gcTombstones invoked for region {} and version map {}", r, regionGCVersions); } Set<Tombstone> removals = new HashSet<Tombstone>(); VersionSource myId = r.getVersionMember(); boolean isBucket = r.isUsedForPartitionedRegionBucket(); + final TombstoneSweeper sweeper = this.getSweeper(r); + Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone(); try { - locked = false; - if (r.getServerProxy() != null) { - queue = this.nonReplicatedTombstones; - lock = this.nonReplicatedTombstoneSweeper.currentTombstoneLock; - lock.lock(); - locked = true; - currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone; - } else { - queue = this.replicatedTombstones; - replicated = true; - lock = this.replicatedTombstoneSweeper.currentTombstoneLock; - lock.lock(); - locked = true; - currentTombstone = this.replicatedTombstoneSweeper.currentTombstone; - } if (currentTombstone != null && currentTombstone.region == r) { VersionSource destroyingMember = currentTombstone.getMemberID(); if (destroyingMember == null) { @@ -308,9 +270,12 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { Long maxReclaimedRV = regionGCVersions.get(destroyingMember); if (maxReclaimedRV != null && currentTombstone.getRegionVersion() <= maxReclaimedRV.longValue()) { removals.add(currentTombstone); + removalSize += currentTombstone.getSize(); + // TODO call sweeper.clearCurrentTombstone } } - for (Tombstone t: queue) { + for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) { + Tombstone t = it.next(); if (t.region == r) { VersionSource destroyingMember = t.getMemberID(); if (destroyingMember == null) { @@ -318,22 +283,15 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { } Long maxReclaimedRV = regionGCVersions.get(destroyingMember); if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) { + it.remove(); removals.add(t); removalSize += t.getSize(); } } } - - queue.removeAll(removals); - if (replicated) { - this.replicatedTombstoneQueueSize.addAndGet(-removalSize); - } else { - this.nonReplicatedTombstoneQueueSize.addAndGet(-removalSize); - } + sweeper.incQueueSize(-removalSize); } finally { - if (locked) { - lock.unlock(); - } + sweeper.unlock(); } //Record the GC versions now, so that we can persist them @@ -374,11 +332,15 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { * @param tombstoneKeys the keys removed on the server */ public void gcTombstoneKeys(LocalRegion r, Set<Object> tombstoneKeys) { - Queue<Tombstone> queue = this.nonReplicatedTombstones; + if (r.getServerProxy() == null) { + // if the region does not have a server proxy + // then it will not have any tombstones to gc for the server. + return; + } + final TombstoneSweeper sweeper = this.getSweeper(r); Set<Tombstone> removals = new HashSet<Tombstone>(); - this.nonReplicatedTombstoneSweeper.currentTombstoneLock.lock(); + Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone(); try { - Tombstone currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone; long removalSize = 0; VersionSource myId = r.getVersionMember(); if (logger.isDebugEnabled()) { @@ -391,26 +353,27 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { } if (tombstoneKeys.contains(currentTombstone.entry.getKey())) { removals.add(currentTombstone); + removalSize += currentTombstone.getSize(); + // TODO: shouldn't we call sweeper.clearTombstone()? } } - for (Tombstone t: queue) { + for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) { + Tombstone t = it.next(); if (t.region == r) { VersionSource destroyingMember = t.getMemberID(); if (destroyingMember == null) { destroyingMember = myId; } if (tombstoneKeys.contains(t.entry.getKey())) { + it.remove(); removals.add(t); removalSize += t.getSize(); } } } - - queue.removeAll(removals); - nonReplicatedTombstoneQueueSize.addAndGet(removalSize); - + sweeper.incQueueSize(-removalSize); } finally { - this.nonReplicatedTombstoneSweeper.currentTombstoneLock.unlock(); + sweeper.unlock(); } for (Tombstone t: removals) { @@ -448,16 +411,11 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { * verify whether a tombstone is scheduled for expiration */ public boolean isTombstoneScheduled(LocalRegion r, RegionEntry re) { - Queue<Tombstone> queue; - if (r.getDataPolicy().withReplication()) { - queue = this.replicatedTombstones; - } else { - queue = this.nonReplicatedTombstones; - } + TombstoneSweeper sweeper = this.getSweeper(r); VersionSource myId = r.getVersionMember(); VersionTag entryTag = re.getVersionStamp().asVersionTag(); int entryVersion = entryTag.getEntryVersion(); - for (Tombstone t: queue) { + for (Tombstone t: sweeper.getQueue()) { if (t.region == r) { VersionSource destroyingMember = t.getMemberID(); if (destroyingMember == null) { @@ -470,16 +428,13 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { } } } - if (this.replicatedTombstoneSweeper != null) { - return this.replicatedTombstoneSweeper.hasExpiredTombstone(r, re, entryTag); - } - return false; + return sweeper.hasExpiredTombstone(r, re, entryTag); } @Override public String toString() { - return "Destroyed entries GC service. Replicate Queue=" + this.replicatedTombstones.toString() - + " Non-replicate Queue=" + this.nonReplicatedTombstones + return "Destroyed entries GC service. Replicate Queue=" + this.replicatedTombstoneSweeper.getQueue().toString() + + " Non-replicate Queue=" + this.nonReplicatedTombstoneSweeper.getQueue().toString() + (this.replicatedTombstoneSweeper.expiredTombstones != null? " expired batch size = " + this.replicatedTombstoneSweeper.expiredTombstones.size() : ""); } @@ -526,11 +481,11 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { * are resurrected they are left in this queue and the sweeper thread * figures out that they are no longer valid tombstones. */ - Queue<Tombstone> tombstones; + final Queue<Tombstone> tombstones; /** * The size, in bytes, of the queue */ - AtomicLong queueSize = new AtomicLong(); + final AtomicLong queueSize; /** * the thread that handles tombstone expiration. It reads from the * tombstone queue. @@ -586,7 +541,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { /** * the cache that owns all of the tombstones in this sweeper */ - private GemFireCacheImpl cache; + private final GemFireCacheImpl cache; private volatile boolean isStopped; @@ -606,6 +561,24 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion()); } + + public Tombstone lockAndGetCurrentTombstone() { + this.currentTombstoneLock.lock(); + return this.currentTombstone; + } + + public void unlock() { + this.currentTombstoneLock.unlock(); + } + + public void incQueueSize(long delta) { + this.queueSize.addAndGet(delta); + } + + public Queue<Tombstone> getQueue() { + return this.tombstones; + } + /** stop tombstone removal for sweepers that have batchMode==true */ @SuppressWarnings("unused") void suspendBatchExpiration() { @@ -627,6 +600,11 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { //this.forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT - this.expiredTombstones.size() + 1; } + void scheduleTombstone(Tombstone ts) { + this.tombstones.add(ts); + this.queueSize.addAndGet(ts.getSize()); + } + /** if we should GC the batched tombstones, this method will initiate the operation */ private void processBatch() { if ((!batchExpirationSuspended && @@ -639,6 +617,9 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { /** test hook - unsafe since not synchronized */ boolean hasExpiredTombstone(LocalRegion r, RegionEntry re, VersionTag tag) { + if (this.expiredTombstones == null) { + return false; + } int entryVersion = tag.getEntryVersion(); boolean retry; do {