refactored currentTombstone code
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/66b5945f Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/66b5945f Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/66b5945f Branch: refs/heads/feature/GEODE-1420 Commit: 66b5945f7e4b28b6ddcccb9cacf37e05d31d850f Parents: cb56ade Author: Darrel Schneider <dschnei...@pivotal.io> Authored: Tue Jun 21 16:31:55 2016 -0700 Committer: Darrel Schneider <dschnei...@pivotal.io> Committed: Tue Jun 21 16:31:55 2016 -0700 ---------------------------------------------------------------------- .../internal/cache/TombstoneService.java | 137 ++++++++++--------- 1 file changed, 74 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/66b5945f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java index 7036d45..5c6b1dd 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java @@ -547,7 +547,9 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { */ volatile boolean batchExpirationSuspended; /** - * The sweeper thread's current tombstone + * The sweeper thread's current tombstone. + * Only set by the run() thread while holding the currentTombstoneLock. + * Read by other threads while holding the currentTombstoneLock. */ Tombstone currentTombstone; /** @@ -679,13 +681,13 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { this.batchExpirationInProgress = true; boolean batchScheduled = false; try { - final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>(); Set<Tombstone> expired = expiredTombstones; - long removalSize = 0; - expiredTombstones = new HashSet<Tombstone>(); - if (expired.size() == 0) { + if (expired.isEmpty()) { return; } + expiredTombstones = new HashSet<Tombstone>(); + final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>(); + long removalSize = 0; //Update the GC RVV for all of the affected regions. //We need to do this so that we can persist the GC RVV before @@ -762,10 +764,10 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { public void run() { long minimumRetentionMs = this.expiryTime / 10; // forceExpiration will not work on something younger than this long maximumSleepTime = 10000; + Tombstone myTombstone = null; if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) { logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.expiryTime); } - currentTombstone = null; // millis we need to run a scan of queue and batch set for resurrected tombstones long minimumScanTime = 100; // how often to perform the scan @@ -815,64 +817,50 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { } } } - if (currentTombstone == null) { - try { - currentTombstoneLock.lock(); - try { - currentTombstone = tombstones.remove(); - } finally { - currentTombstoneLock.unlock(); - } - if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) { - logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", currentTombstone); - } - } catch (NoSuchElementException e) { - // expected - if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) { - logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep"); - } - forceExpirationCount = 0; - } + if (myTombstone == null) { + myTombstone = setCurrentToNextTombstone(); } - long sleepTime; - if (currentTombstone == null) { + long sleepTime = 0; + boolean expireMyTombstone = false; + if (myTombstone == null) { sleepTime = expiryTime; - } else if (currentTombstone.getVersionTimeStamp()+expiryTime > now && (forceExpirationCount <= 0 || (currentTombstone.getVersionTimeStamp() + expiryTime - now) <= minimumRetentionMs)) { - sleepTime = currentTombstone.getVersionTimeStamp()+expiryTime - now; } else { + long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime - now; if (forceExpirationCount > 0) { - forceExpirationCount--; + if (msTillMyTombstoneExpires > 0 && msTillMyTombstoneExpires <= minimumRetentionMs) { + sleepTime = msTillMyTombstoneExpires; + } else { + forceExpirationCount--; + expireMyTombstone = true; + } + } else if (msTillMyTombstoneExpires > 0) { + sleepTime = msTillMyTombstoneExpires; + } else { + expireMyTombstone = true; } - sleepTime = 0; + } + if (expireMyTombstone) { try { if (batchMode) { if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) { - logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone); + logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", myTombstone); } - expiredTombstones.add(currentTombstone); + expiredTombstones.add(myTombstone); } else { if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) { - logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", currentTombstone); + logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone); } - queueSize.addAndGet(-currentTombstone.getSize()); - currentTombstone.region.getRegionMap().removeTombstone(currentTombstone.entry, currentTombstone, false, true); - } - currentTombstoneLock.lock(); - try { - currentTombstone = null; - } finally { - currentTombstoneLock.unlock(); + queueSize.addAndGet(-myTombstone.getSize()); + myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone, false, true); } + myTombstone = null; + clearCurrentTombstone(); } catch (CancelException e) { return; } catch (Exception e) { logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e); - currentTombstoneLock.lock(); - try { - currentTombstone = null; - } finally { - currentTombstoneLock.unlock(); - } + myTombstone = null; + clearCurrentTombstone(); } } if (sleepTime > 0) { @@ -889,20 +877,16 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) { it.remove(); this.queueSize.addAndGet(-test.getSize()); - if (test == currentTombstone) { - currentTombstoneLock.lock(); - try { - currentTombstone = null; - } finally { - currentTombstoneLock.unlock(); - } + if (test == myTombstone) { + myTombstone = null; + clearCurrentTombstone(); sleepTime = 0; } - } else if (batchMode && test != currentTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) { + } else if (batchMode && test != myTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) { it.remove(); this.queueSize.addAndGet(-test.getSize()); if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) { - logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone); + logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", test); } expiredTombstones.add(test); sleepTime = 0; @@ -919,13 +903,9 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { } it.remove(); this.queueSize.addAndGet(-test.getSize()); - if (test == currentTombstone) { - currentTombstoneLock.lock(); - try { - currentTombstone = null; - } finally { - currentTombstoneLock.unlock(); - } + if (test == myTombstone) { + myTombstone = null; + clearCurrentTombstone(); sleepTime = 0; } } @@ -980,6 +960,37 @@ public class TombstoneService implements ResourceListener<MemoryEvent> { } } // while() } // run() + + private void clearCurrentTombstone() { + currentTombstoneLock.lock(); + currentTombstone = null; + currentTombstoneLock.unlock(); + } + + /** + * Returns the new currentTombstone taken from the tombstones queue; null if no next tombstone + */ + private Tombstone setCurrentToNextTombstone() { + Tombstone result; + currentTombstoneLock.lock(); + try { + result = tombstones.poll(); + if (result != null) { + if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) { + logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", result); + } + currentTombstone = result; + } else { + if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) { + logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep"); + } + forceExpirationCount = 0; + } + } finally { + currentTombstoneLock.unlock(); + } + return result; + } } // class TombstoneSweeper