refactored currentTombstone code

Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/66b5945f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/66b5945f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/66b5945f

Branch: refs/heads/feature/GEODE-1420
Commit: 66b5945f7e4b28b6ddcccb9cacf37e05d31d850f
Parents: cb56ade
Author: Darrel Schneider <dschnei...@pivotal.io>
Authored: Tue Jun 21 16:31:55 2016 -0700
Committer: Darrel Schneider <dschnei...@pivotal.io>
Committed: Tue Jun 21 16:31:55 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/TombstoneService.java        | 137 ++++++++++---------
 1 file changed, 74 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/66b5945f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 7036d45..5c6b1dd 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -547,7 +547,9 @@ public class TombstoneService  implements 
ResourceListener<MemoryEvent> {
      */
     volatile boolean batchExpirationSuspended;
     /**
-     * The sweeper thread's current tombstone
+     * The sweeper thread's current tombstone.
+     * Only set by the run() thread while holding the currentTombstoneLock.
+     * Read by other threads while holding the currentTombstoneLock.
      */
     Tombstone currentTombstone;
     /**
@@ -679,13 +681,13 @@ public class TombstoneService  implements 
ResourceListener<MemoryEvent> {
       this.batchExpirationInProgress = true;
       boolean batchScheduled = false;
       try {
-        final Set<DistributedRegion> regionsAffected = new 
HashSet<DistributedRegion>();
         Set<Tombstone> expired = expiredTombstones;
-        long removalSize = 0;
-        expiredTombstones = new HashSet<Tombstone>();
-        if (expired.size() == 0) {
+        if (expired.isEmpty()) {
           return;
         }
+        expiredTombstones = new HashSet<Tombstone>();
+        final Set<DistributedRegion> regionsAffected = new 
HashSet<DistributedRegion>();
+        long removalSize = 0;
 
         //Update the GC RVV for all of the affected regions.
         //We need to do this so that we can persist the GC RVV before
@@ -762,10 +764,10 @@ public class TombstoneService  implements 
ResourceListener<MemoryEvent> {
     public void run() {
       long minimumRetentionMs = this.expiryTime / 10; // forceExpiration will 
not work on something younger than this
       long maximumSleepTime = 10000;
+      Tombstone myTombstone = null;
       if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
         logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting 
with default sleep interval={}", this.expiryTime);
       }
-      currentTombstone = null;
       // millis we need to run a scan of queue and batch set for resurrected 
tombstones
       long minimumScanTime = 100;
       // how often to perform the scan
@@ -815,64 +817,50 @@ public class TombstoneService  implements 
ResourceListener<MemoryEvent> {
               }
             }
           }
-          if (currentTombstone == null) {
-            try {
-              currentTombstoneLock.lock();
-              try {
-                currentTombstone = tombstones.remove();
-              } finally {
-                currentTombstoneLock.unlock();
-              }
-              if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", 
currentTombstone);
-              }
-            } catch (NoSuchElementException e) {
-              // expected
-              if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                logger.trace(LogMarker.TOMBSTONE, "queue is empty - will 
sleep");
-              }
-              forceExpirationCount = 0;
-            }
+          if (myTombstone == null) {
+            myTombstone = setCurrentToNextTombstone();
           }
-          long sleepTime;
-          if (currentTombstone == null) {
+          long sleepTime = 0;
+          boolean expireMyTombstone = false;
+          if (myTombstone == null) {
             sleepTime = expiryTime;
-          } else if (currentTombstone.getVersionTimeStamp()+expiryTime > now 
&& (forceExpirationCount <= 0 || (currentTombstone.getVersionTimeStamp() + 
expiryTime - now) <= minimumRetentionMs)) {
-            sleepTime = currentTombstone.getVersionTimeStamp()+expiryTime - 
now;
           } else {
+            long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() 
+ expiryTime - now;
             if (forceExpirationCount > 0) {
-              forceExpirationCount--;
+              if (msTillMyTombstoneExpires > 0 && msTillMyTombstoneExpires <= 
minimumRetentionMs) {
+                sleepTime = msTillMyTombstoneExpires;
+              } else {
+                forceExpirationCount--;
+                expireMyTombstone = true;
+              }
+            } else if (msTillMyTombstoneExpires > 0) {
+              sleepTime = msTillMyTombstoneExpires;
+            } else {
+              expireMyTombstone = true;
             }
-            sleepTime = 0;
+          }
+          if (expireMyTombstone) {
             try {
               if (batchMode) {
                 if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", 
currentTombstone);
+                  logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", 
myTombstone);
                 }
-                expiredTombstones.add(currentTombstone);
+                expiredTombstones.add(myTombstone);
               } else {
                 if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "removing expired 
tombstone {}", currentTombstone);
+                  logger.trace(LogMarker.TOMBSTONE, "removing expired 
tombstone {}", myTombstone);
                 }
-                queueSize.addAndGet(-currentTombstone.getSize());
-                
currentTombstone.region.getRegionMap().removeTombstone(currentTombstone.entry, 
currentTombstone, false, true);
-              }
-              currentTombstoneLock.lock();
-              try {
-                currentTombstone = null;
-              } finally {
-                currentTombstoneLock.unlock();
+                queueSize.addAndGet(-myTombstone.getSize());
+                
myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, 
myTombstone, false, true);
               }
+              myTombstone = null;
+              clearCurrentTombstone();
             } catch (CancelException e) {
               return;
             } catch (Exception e) {
               
logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR),
 e);
-              currentTombstoneLock.lock();
-              try {
-                currentTombstone = null;
-              } finally {
-                currentTombstoneLock.unlock();
-              }
+              myTombstone = null;
+              clearCurrentTombstone();
             }
           }
           if (sleepTime > 0) {
@@ -889,20 +877,16 @@ public class TombstoneService  implements 
ResourceListener<MemoryEvent> {
                   if 
(test.region.getRegionMap().isTombstoneNotNeeded(test.entry, 
test.getEntryVersion())) {
                     it.remove();
                     this.queueSize.addAndGet(-test.getSize());
-                    if (test == currentTombstone) {
-                      currentTombstoneLock.lock();
-                      try {
-                        currentTombstone = null;
-                      } finally {
-                        currentTombstoneLock.unlock();
-                      }
+                    if (test == myTombstone) {
+                      myTombstone = null;
+                      clearCurrentTombstone();
                       sleepTime = 0;
                     }
-                  } else if (batchMode && test != currentTombstone && 
(test.getVersionTimeStamp()+expiryTime) <= now) {
+                  } else if (batchMode && test != myTombstone && 
(test.getVersionTimeStamp()+expiryTime) <= now) {
                     it.remove();
                     this.queueSize.addAndGet(-test.getSize());
                     if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                      logger.trace(LogMarker.TOMBSTONE, "expiring tombstone 
{}", currentTombstone);
+                      logger.trace(LogMarker.TOMBSTONE, "expiring tombstone 
{}", test);
                     }
                     expiredTombstones.add(test);
                     sleepTime = 0;
@@ -919,13 +903,9 @@ public class TombstoneService  implements 
ResourceListener<MemoryEvent> {
                     }
                     it.remove();
                     this.queueSize.addAndGet(-test.getSize());
-                    if (test == currentTombstone) {
-                      currentTombstoneLock.lock();
-                      try {
-                        currentTombstone = null;
-                      } finally {
-                        currentTombstoneLock.unlock();
-                      }
+                    if (test == myTombstone) {
+                      myTombstone = null;
+                      clearCurrentTombstone();
                       sleepTime = 0;
                     }
                   }
@@ -980,6 +960,37 @@ public class TombstoneService  implements 
ResourceListener<MemoryEvent> {
         }
       } // while()
     } // run()
+
+    private void clearCurrentTombstone() {
+      currentTombstoneLock.lock();
+      currentTombstone = null;
+      currentTombstoneLock.unlock();
+    }
+
+    /**
+     * Returns the new currentTombstone taken from the tombstones queue; null 
if no next tombstone
+     */
+    private Tombstone setCurrentToNextTombstone() {
+      Tombstone result;
+      currentTombstoneLock.lock();
+      try {
+        result = tombstones.poll();
+        if (result != null) {
+          if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+            logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", 
result);
+          }
+          currentTombstone = result;
+        } else {
+          if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+            logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
+          }
+          forceExpirationCount = 0;
+        }
+      } finally {
+        currentTombstoneLock.unlock();
+      }
+      return result;
+    }
     
   } // class TombstoneSweeper
 

Reply via email to