Repository: incubator-geode Updated Branches: refs/heads/develop 1ab59ffb2 -> 88b561d64
GEODE-1800 StoppableCondition has faulty code in await() The faulty methods in StoppableCondition have been removed. I considered removing StoppableCondition entirely and just using a Condition on the lock wrapped by the Stoppable lock class but doing so would require additional work to make the CancelCriterion available to the code that uses these StoppableConditions, so other than removing the methods I've left the class alone. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/88b561d6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/88b561d6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/88b561d6 Branch: refs/heads/develop Commit: 88b561d64dc328ca2cf9d36ea4a94bc58d80ff19 Parents: 1ab59ff Author: Bruce Schuchardt <bschucha...@pivotal.io> Authored: Fri Aug 19 14:29:54 2016 -0700 Committer: Bruce Schuchardt <bschucha...@pivotal.io> Committed: Fri Aug 19 14:31:16 2016 -0700 ---------------------------------------------------------------------- .../internal/InternalDistributedSystem.java | 13 ----- .../internal/locks/GrantorRequestProcessor.java | 5 +- .../internal/cache/ha/HARegionQueue.java | 8 +-- .../parallel/ParallelGatewaySenderQueue.java | 2 +- .../util/concurrent/StoppableCondition.java | 58 ++++---------------- 5 files changed, 20 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/88b561d6/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java index 49a4c97..b9566df 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java @@ -422,9 +422,6 @@ public class InternalDistributedSystem this.startTime = System.currentTimeMillis(); this.grc = new GrantorRequestProcessor.GrantorRequestContext(stopper); - elderLock = new StoppableReentrantLock(stopper); - elderLockCondition = elderLock.newCondition(); - this.creationStack = TEST_CREATION_STACK_GENERATOR.get().generateCreationStack(this.originalConfig); // if (DistributionConfigImpl.multicastTest) { @@ -1504,16 +1501,6 @@ public class InternalDistributedSystem return sb.toString(); } - private final StoppableReentrantLock elderLock; - private final StoppableCondition elderLockCondition; - - public StoppableReentrantLock getElderLock() { - return elderLock; - } - public StoppableCondition getElderLockCondition() { - return elderLockCondition; - } - /** * Returns the current configuration of this distributed system. */ http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/88b561d6/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/locks/GrantorRequestProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/locks/GrantorRequestProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/locks/GrantorRequestProcessor.java index e22a8c0..5a6b867 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/locks/GrantorRequestProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/locks/GrantorRequestProcessor.java @@ -21,6 +21,8 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; import org.apache.logging.log4j.Logger; @@ -210,8 +212,7 @@ public class GrantorRequestProcessor extends ReplyProcessor21 { logger.info(LogMarker.DLS, message); boolean interrupted = Thread.interrupted(); try { - grc.elderLockCondition.await(); - break; + grc.elderLockCondition.await(sys.getConfig().getMemberTimeout()); } catch (InterruptedException ignore) { interrupted = true; sys.getCancelCriterion().checkCancelInProgress(ignore); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/88b561d6/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java index 6ac56ff..2b8934e 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java @@ -18,6 +18,7 @@ package com.gemstone.gemfire.internal.cache.ha; import com.gemstone.gemfire.*; import com.gemstone.gemfire.cache.*; +import com.gemstone.gemfire.cache.TimeoutException; import com.gemstone.gemfire.cache.query.internal.CqQueryVsdStats; import com.gemstone.gemfire.cache.query.internal.cq.CqService; import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery; @@ -48,11 +49,10 @@ import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.*; /** * An implementation of Queue using Gemfire Region as the underlying @@ -2539,7 +2539,7 @@ public class HARegionQueue implements RegionQueue region.getCache().getCancelCriterion().checkCancelInProgress(null); boolean interrupted = Thread.currentThread().isInterrupted(); try { - blockCond.await(); + blockCond.await(StoppableCondition.TIME_TO_WAIT); } catch (InterruptedException ie) { interrupted = true; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/88b561d6/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 1b5c11f..444a493 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1728,7 +1728,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { try { boolean wasEmpty = regionToDispatchedKeysMap.isEmpty(); while (regionToDispatchedKeysMap.isEmpty()) { - regionToDispatchedKeysMapEmpty.await(); + regionToDispatchedKeysMapEmpty.await(StoppableCondition.TIME_TO_WAIT); } if (wasEmpty) continue; // TODO: This should be optimized. http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/88b561d6/geode-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/StoppableCondition.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/StoppableCondition.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/StoppableCondition.java index 5c33498..04b958a 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/StoppableCondition.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/StoppableCondition.java @@ -27,6 +27,8 @@ package com.gemstone.gemfire.internal.util.concurrent; +import static com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch.RETRY_TIME; + import java.util.Date; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -36,25 +38,19 @@ import com.gemstone.gemfire.internal.Assert; /** * This class is functionally equivalent to {@link java.util.concurrent.locks.Condition}; - * however, it does not implement the interface, in an attempt to encourage - * GemFire API writers to refer to this "stoppable" version instead. - * <p> - * It is implemented as a strict "cover" for a genuine {@link java.util.concurrent.locks.Condition}. - * + * however it only implements the acquire(long) method. Its purpose is to perform a + * cancellation check */ -public class StoppableCondition implements /* Condition, */ java.io.Serializable { - private static final long serialVersionUID = -7091681525970431937L; +public class StoppableCondition implements java.io.Serializable { + private static final long serialVersionUID = -7091681525970431937L; + + /** The underlying condition **/ + private final Condition condition; - /** The underlying condition **/ - private final Condition condition; - - /** The cancellation object */ - private final CancelCriterion stopper; + /** The cancellation object */ + private final CancelCriterion stopper; - /** - * This is how often waiters will wake up to check for cancellation - */ - private static final long RETRY_TIME = 15 * 1000; // milliseconds + public static final long TIME_TO_WAIT = 15000; /** * Create a new StoppableCondition based on given condition and @@ -67,41 +63,11 @@ public class StoppableCondition implements /* Condition, */ java.io.Serializable this.stopper = stopper; } - public void awaitUninterruptibly() { - for (;;) { - boolean interrupted = Thread.interrupted(); - try { - await(); - break; - } - catch (InterruptedException e) { - interrupted = true; - } - finally { - if (interrupted) Thread.currentThread().interrupt(); - } - } - } - - public void await() throws InterruptedException { - if (Thread.interrupted()) throw new InterruptedException(); - for (;;) { - stopper.checkCancelInProgress(null); - if (await(RETRY_TIME)) - break; - } - } - public boolean await(long timeoutMs) throws InterruptedException { stopper.checkCancelInProgress(null); return condition.await(timeoutMs, TimeUnit.MILLISECONDS); } - public boolean awaitUntil(Date deadline) throws InterruptedException { - stopper.checkCancelInProgress(null); - return condition.awaitUntil(deadline); - } - public synchronized void signal() { condition.signal(); }