Fix BlockingHARegionJUnitTest
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/4eaf17af Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/4eaf17af Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/4eaf17af Branch: refs/heads/feature/GEODE-2632-18 Commit: 4eaf17affca640798a1df6f00cbbc363e49e0a48 Parents: 33b8e05 Author: Kirk Lund <kl...@apache.org> Authored: Wed May 31 09:57:42 2017 -0700 Committer: Kirk Lund <kl...@apache.org> Committed: Wed May 31 13:25:45 2017 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/ha/HARegionQueue.java | 155 +-- .../cache/tier/sockets/CacheClientNotifier.java | 1037 +++++++++++------- 2 files changed, 682 insertions(+), 510 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/4eaf17af/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java index 66e34b9..f75a912 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java @@ -2057,21 +2057,6 @@ public class HARegionQueue implements RegionQueue { * a single peek thread. */ private static class BlockingHARegionQueue extends HARegionQueue { - - private static final String EVENT_ENQUEUE_WAIT_TIME_NAME = - DistributionConfig.GEMFIRE_PREFIX + "subscription.EVENT_ENQUEUE_WAIT_TIME"; - - private static final int DEFAULT_EVENT_ENQUEUE_WAIT_TIME = 100; - - /** - * System property name for indicating how much frequently the "Queue full" message should be - * logged. - */ - private static final String MAX_QUEUE_LOG_FREQUENCY = - DistributionConfig.GEMFIRE_PREFIX + "logFrequency.clientQueueReachedMaxLimit"; - - private static final long DEFAULT_LOG_FREQUENCY = 1000; - /** * Guards the Put permits */ @@ -2094,25 +2079,14 @@ public class HARegionQueue implements RegionQueue { */ private final Object permitMon = new Object(); - /** - * Lock on which the take & remove threads block awaiting data from put operations - */ + // Lock on which the take & remove threads block awaiting data from put + // operations private final StoppableReentrantLock lock; /** * Condition object on which peek & take threads will block */ - final StoppableCondition blockCond; - - /** - * System property value denoting the time in milliseconds. Any thread putting an event into a - * subscription queue, which is full, will wait this much time for the queue to make space. - * It'll then enqueue the event possibly causing the queue to grow beyond its capacity/max-size. - * See #51400. - */ - private final int enqueueEventWaitTime; - - private final long logFrequency; + protected final StoppableCondition blockCond; /** * @param hrqa HARegionQueueAttributes through which expiry time etc for the HARegionQueue can @@ -2123,42 +2097,16 @@ public class HARegionQueue implements RegionQueue { HARegionQueueAttributes hrqa, Map haContainer, ClientProxyMembershipID clientProxyId, final byte clientConflation, boolean isPrimary) throws IOException, ClassNotFoundException, CacheException, InterruptedException { - super(regionName, cache, hrqa, haContainer, clientProxyId, clientConflation, isPrimary); this.capacity = hrqa.getBlockingQueueCapacity(); this.putPermits = this.capacity; this.lock = new StoppableReentrantLock(this.region.getCancelCriterion()); - this.blockCond = this.lock.newCondition(); + this.blockCond = lock.newCondition(); super.putGIIDataInRegion(); - - if (getClass() == BlockingHARegionQueue.class) { - this.initialized.set(true); + if (this.getClass() == BlockingHARegionQueue.class) { + initialized.set(true); } - - this.enqueueEventWaitTime = calcEnqueueEventWaitTime(); - this.logFrequency = calcLogFrequency(); - } - - private static int calcEnqueueEventWaitTime() { - int value = Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME); - if (value < 0) { - value = DEFAULT_EVENT_ENQUEUE_WAIT_TIME; - } - return value; - } - - private static long calcLogFrequency() { - long value; - try { - value = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY)); - if (value <= 0) { - value = DEFAULT_LOG_FREQUENCY; - } - } catch (NumberFormatException ignore) { - value = DEFAULT_LOG_FREQUENCY; - } - return value; } @Override @@ -2186,55 +2134,56 @@ public class HARegionQueue implements RegionQueue { * in the HARegionQueue. */ @Override - @SuppressWarnings("TLW_TWO_LOCK_WAIT") + @edu.umd.cs.findbugs.annotations.SuppressWarnings("TLW_TWO_LOCK_WAIT") void checkQueueSizeConstraint() throws InterruptedException { - if (!(this.haContainer instanceof HAContainerMap && isPrimary())) { - // Fix for bug 39413 - return; - } - if (Thread.interrupted()) { - throw new InterruptedException(); - } - - synchronized (this.putGuard) { - if (this.putPermits <= 0) { - synchronized (this.permitMon) { - if (reconcilePutPermits() <= 0) { - if (this.region.getSystem().getConfig().getRemoveUnresponsiveClient()) { - this.isClientSlowReciever = true; - } else { - try { - if ((this.maxQueueSizeHitCount % this.logFrequency) == 0) { - logger.warn(LocalizedMessage.create( - LocalizedStrings.HARegionQueue_CLIENT_QUEUE_FOR_0_IS_FULL, - new Object[] {this.region.getName()})); - this.maxQueueSizeHitCount = 0; - } - ++this.maxQueueSizeHitCount; - this.region.checkReadiness(); // fix for bug 37581 - // TODO: wait called while holding two locks - this.permitMon.wait(this.enqueueEventWaitTime); - this.region.checkReadiness(); // fix for bug 37581 - // Fix for #51400. Allow the queue to grow beyond its - // capacity/maxQueueSize, if it is taking a long time to - // drain the queue, either due to a slower client or the - // deadlock scenario mentioned in the ticket. - reconcilePutPermits(); - if (this.maxQueueSizeHitCount % this.logFrequency == 1) { - logger.info(LocalizedMessage - .create(LocalizedStrings.HARegionQueue_RESUMING_WITH_PROCESSING_PUTS)); + if (this.haContainer instanceof HAContainerMap && isPrimary()) { // Fix for bug 39413 + if (Thread.interrupted()) + throw new InterruptedException(); + synchronized (this.putGuard) { + if (putPermits <= 0) { + synchronized (this.permitMon) { + if (reconcilePutPermits() <= 0) { + if (region.getSystem().getConfig().getRemoveUnresponsiveClient()) { + isClientSlowReciever = true; + } else { + try { + long logFrequency = CacheClientNotifier.DEFAULT_LOG_FREQUENCY; + CacheClientNotifier ccn = CacheClientNotifier.getInstance(); + if (ccn != null) { // check needed for junit tests + logFrequency = ccn.getLogFrequency(); + } + if ((this.maxQueueSizeHitCount % logFrequency) == 0) { + logger.warn(LocalizedMessage.create( + LocalizedStrings.HARegionQueue_CLIENT_QUEUE_FOR_0_IS_FULL, + new Object[] {region.getName()})); + this.maxQueueSizeHitCount = 0; + } + ++this.maxQueueSizeHitCount; + this.region.checkReadiness(); // fix for bug 37581 + // TODO: wait called while holding two locks + this.permitMon.wait(CacheClientNotifier.eventEnqueueWaitTime); + this.region.checkReadiness(); // fix for bug 37581 + // Fix for #51400. Allow the queue to grow beyond its + // capacity/maxQueueSize, if it is taking a long time to + // drain the queue, either due to a slower client or the + // deadlock scenario mentioned in the ticket. + reconcilePutPermits(); + if ((this.maxQueueSizeHitCount % logFrequency) == 1) { + logger.info(LocalizedMessage + .create(LocalizedStrings.HARegionQueue_RESUMING_WITH_PROCESSING_PUTS)); + } + } catch (InterruptedException ex) { + // TODO: The line below is meaningless. Comment it out later + this.permitMon.notifyAll(); + throw ex; } - } catch (InterruptedException ex) { - // TODO: The line below is meaningless. Comment it out later - this.permitMon.notifyAll(); - throw ex; } } - } - } // synchronized (this.permitMon) - } // if (putPermits <= 0) - --this.putPermits; - } // synchronized (this.putGuard) + } // synchronized (this.permitMon) + } // if (putPermits <= 0) + --putPermits; + } // synchronized (this.putGuard) + } } /**