sweeper no longer depends on GemFireCacheImpl making it easier to unit test
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7c325a15 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7c325a15 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7c325a15 Branch: refs/heads/feature/GEODE-1420 Commit: 7c325a15690eaf9f5e5ec9615d3096629d4354c4 Parents: 2e4d4d2 Author: Darrel Schneider <dschnei...@pivotal.io> Authored: Thu Jun 30 14:59:50 2016 -0700 Committer: Darrel Schneider <dschnei...@pivotal.io> Committed: Thu Jun 30 14:59:50 2016 -0700 ---------------------------------------------------------------------- .../gemfire/distributed/internal/CacheTime.java | 29 +++++++++++ .../gemfire/distributed/internal/DSClock.java | 7 +-- .../internal/cache/GemFireCacheImpl.java | 3 +- .../internal/cache/TombstoneService.java | 53 ++++++++++++-------- 4 files changed, 66 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java new file mode 100644 index 0000000..08c1400 --- /dev/null +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.distributed.internal; + +/** + * Provides a method to get the system millisecond clock time + * adjusted for the distributed cache. + */ +public interface CacheTime { + /** + * Returns the system millisecond clock time with adjustments from the distributed cache + * @return the current time + */ + public long cacheTimeMillis(); +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java index d13610a..d96e7c3 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java @@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; * */ -public class DSClock { +public class DSClock implements CacheTime { private static final Logger logger = LogService.getLogger(); @@ -76,10 +76,7 @@ public class DSClock { this.isLoner = lonerDS; } - /** - * Returns the system millisecond clock time with adjustments from the distributed system - * @return the current time - */ + @Override public long cacheTimeMillis() { long result; final long offset = getCacheTimeOffset(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java index 186ebbc..c40a0c3 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java @@ -176,7 +176,7 @@ import com.sun.jna.Platform; * */ @SuppressWarnings("deprecation") -public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee { +public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee, CacheTime { private static final Logger logger = LogService.getLogger(); // moved *SERIAL_NUMBER stuff to DistributionAdvisor @@ -2791,6 +2791,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer * * @return distributed cache time. */ + @Override public long cacheTimeMillis() { if (this.system != null) { return this.system.getClock().cacheTimeMillis(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java index 234454b..3e92c16 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java @@ -16,9 +16,11 @@ */ package com.gemstone.gemfire.internal.cache; +import com.gemstone.gemfire.CancelCriterion; import com.gemstone.gemfire.CancelException; import com.gemstone.gemfire.SystemFailure; import com.gemstone.gemfire.cache.util.ObjectSizer; +import com.gemstone.gemfire.distributed.internal.CacheTime; import com.gemstone.gemfire.distributed.internal.DistributionConfig; import com.gemstone.gemfire.internal.cache.versions.CompactVersionHolder; import com.gemstone.gemfire.internal.cache.versions.VersionSource; @@ -35,6 +37,7 @@ import org.apache.logging.log4j.Logger; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; @@ -121,8 +124,8 @@ public class TombstoneService { } private TombstoneService(GemFireCacheImpl cache) { - this.replicatedTombstoneSweeper = new ReplicateTombstoneSweeper(cache); - this.nonReplicatedTombstoneSweeper = new NonReplicateTombstoneSweeper(cache); + this.replicatedTombstoneSweeper = new ReplicateTombstoneSweeper(cache, cache.getCachePerfStats(), cache.getCancelCriterion(), cache.getDistributionManager().getWaitingThreadPool()); + this.nonReplicatedTombstoneSweeper = new NonReplicateTombstoneSweeper(cache, cache.getCachePerfStats(), cache.getCancelCriterion()); this.replicatedTombstoneSweeper.start(); this.nonReplicatedTombstoneSweeper.start(); } @@ -338,8 +341,8 @@ public class TombstoneService { } } private static class NonReplicateTombstoneSweeper extends TombstoneSweeper { - NonReplicateTombstoneSweeper(GemFireCacheImpl cache) { - super(cache, NON_REPLICATE_TOMBSTONE_TIMEOUT, "Non-replicate Region Garbage Collector"); + NonReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion) { + super(cacheTime, stats, cancelCriterion, NON_REPLICATE_TOMBSTONE_TIMEOUT, "Non-replicate Region Garbage Collector"); } @Override @@ -347,7 +350,7 @@ public class TombstoneService { return false; } @Override protected void updateStatistics() { - cache.getCachePerfStats().setNonReplicatedTombstonesSize(getMemoryEstimate()); + stats.setNonReplicatedTombstonesSize(getMemoryEstimate()); } @Override protected boolean hasExpired(long msTillHeadTombstoneExpires) { return msTillHeadTombstoneExpires <= 0; @@ -380,6 +383,10 @@ public class TombstoneService { private static class ReplicateTombstoneSweeper extends TombstoneSweeper { /** + * Used to execute batch gc message execution in the background. + */ + private final ExecutorService executor; + /** * tombstones that have expired and are awaiting batch removal. This * variable is only accessed by the sweeper thread and so is not guarded */ @@ -413,9 +420,10 @@ public class TombstoneService { */ private int testHook_forceExpirationCount = 0; - ReplicateTombstoneSweeper(GemFireCacheImpl cache) { - super(cache, REPLICATE_TOMBSTONE_TIMEOUT, "Replicate/Partition Region Garbage Collector"); + ReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion, ExecutorService executor) { + super(cacheTime, stats, cancelCriterion, REPLICATE_TOMBSTONE_TIMEOUT, "Replicate/Partition Region Garbage Collector"); this.expiredTombstones = new ArrayList<Tombstone>(); + this.executor = executor; } public int decrementGCBlockCount() { @@ -532,7 +540,7 @@ public class TombstoneService { // do messaging in a pool so this thread is not stuck trying to // communicate with other members - cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() { + executor.execute(new Runnable() { public void run() { try { // this thread should not reference other sweeper state, which is not synchronized @@ -586,7 +594,7 @@ public class TombstoneService { } } @Override protected void updateStatistics() { - cache.getCachePerfStats().setReplicatedTombstonesSize(getMemoryEstimate()); + stats.setReplicatedTombstonesSize(getMemoryEstimate()); } private void checkIfBatchExpirationShouldBeForced() { if (testHook_forceExpirationCount > 0) { @@ -704,26 +712,27 @@ public class TombstoneService { private final StoppableReentrantLock queueHeadLock; - /** - * the cache that owns all of the tombstones in this sweeper - */ - protected final GemFireCacheImpl cache; + protected final CacheTime cacheTime; + protected final CachePerfStats stats; + private final CancelCriterion cancelCriterion; private volatile boolean isStopped; - TombstoneSweeper(GemFireCacheImpl cache, + TombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion, long expiryTime, String threadName) { - this.cache = cache; + this.cacheTime = cacheTime; + this.stats = stats; + this.cancelCriterion = cancelCriterion; this.EXPIRY_TIME = expiryTime; this.PURGE_INTERVAL = Math.min(DEFUNCT_TOMBSTONE_SCAN_INTERVAL, expiryTime); this.tombstones = new ConcurrentLinkedQueue<Tombstone>(); this.memoryUsedEstimate = new AtomicLong(); - this.queueHeadLock = new StoppableReentrantLock(cache.getCancelCriterion()); + this.queueHeadLock = new StoppableReentrantLock(cancelCriterion); this.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors", logger), this); this.sweeperThread.setDaemon(true); this.sweeperThread.setName(threadName); - this.lastPurgeTimestamp = this.cache.cacheTimeMillis(); + this.lastPurgeTimestamp = getNow(); } public void unscheduleTombstones(final LocalRegion r) { @@ -823,11 +832,11 @@ public class TombstoneService { if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) { logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.EXPIRY_TIME); } - while (!isStopped && cache.getCancelCriterion().cancelInProgress() == null) { + while (!isStopped && cancelCriterion.cancelInProgress() == null) { try { updateStatistics(); SystemFailure.checkFailure(); - final long now = this.cache.cacheTimeMillis(); + final long now = getNow(); checkExpiredTombstones(); checkOldestUnexpired(now); purgeObsoleteTombstones(now); @@ -846,6 +855,10 @@ public class TombstoneService { } // while() } // run() + private long getNow() { + return cacheTime.cacheTimeMillis(); + } + private void doSleep() { if (sleepTime <= 0) { return; @@ -894,7 +907,7 @@ public class TombstoneService { if (removedObsoleteTombstone) { sleepTime = 0; } else { - long elapsed = this.cache.cacheTimeMillis() - start; + long elapsed = getNow() - start; sleepTime = sleepTime - elapsed; if (sleepTime <= 0) { minimumPurgeTime = elapsed;