Cleanup BaseCommand
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/fec1be92 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/fec1be92 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/fec1be92 Branch: refs/heads/feature/GEODE-2632-17 Commit: fec1be9246ce2791841a8e8ebbf3dd1f7a8699f9 Parents: c3d4687 Author: Kirk Lund <kl...@apache.org> Authored: Fri May 19 14:57:20 2017 -0700 Committer: Kirk Lund <kl...@apache.org> Committed: Tue May 23 14:47:38 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/geode/GemFireException.java | 93 +-- .../org/apache/geode/cache/CacheException.java | 14 +- .../geode/cache/CacheRuntimeException.java | 15 +- .../org/apache/geode/internal/DSFIDFactory.java | 2 +- .../geode/internal/cache/PartitionedRegion.java | 3 +- .../geode/internal/cache/ha/HARegionQueue.java | 157 ++-- .../geode/internal/cache/tier/Command.java | 17 +- .../cache/tier/sockets/BaseCommand.java | 720 ++++++++----------- .../cache/tier/sockets/BaseCommandQuery.java | 20 +- .../cache/tier/sockets/CacheClientNotifier.java | 231 ++---- .../ServerInterestRegistrationMessage.java | 120 ++++ .../cache/tier/sockets/command/AddPdxEnum.java | 22 +- .../cache/tier/sockets/command/AddPdxType.java | 22 +- .../cache/tier/sockets/command/ClearRegion.java | 52 +- .../cache/tier/sockets/command/ClientReady.java | 20 +- .../tier/sockets/command/CloseConnection.java | 28 +- .../tier/sockets/command/CommitCommand.java | 24 +- .../cache/tier/sockets/command/ContainsKey.java | 46 +- .../tier/sockets/command/ContainsKey66.java | 48 +- .../tier/sockets/command/CreateRegion.java | 44 +- .../cache/tier/sockets/command/Default.java | 9 +- .../cache/tier/sockets/command/Destroy.java | 74 +- .../cache/tier/sockets/command/Destroy65.java | 102 +-- .../cache/tier/sockets/command/Destroy70.java | 4 +- .../tier/sockets/command/DestroyRegion.java | 75 +- .../tier/sockets/command/ExecuteFunction.java | 26 +- .../tier/sockets/command/ExecuteFunction65.java | 30 +- .../tier/sockets/command/ExecuteFunction66.java | 40 +- .../tier/sockets/command/ExecuteFunction70.java | 4 +- .../sockets/command/ExecuteRegionFunction.java | 36 +- .../command/ExecuteRegionFunction61.java | 40 +- .../command/ExecuteRegionFunction65.java | 40 +- .../command/ExecuteRegionFunction66.java | 44 +- .../command/ExecuteRegionFunctionSingleHop.java | 46 +- .../sockets/command/GatewayReceiverCommand.java | 214 +++--- .../cache/tier/sockets/command/Get70.java | 74 +- .../cache/tier/sockets/command/GetAll.java | 52 +- .../cache/tier/sockets/command/GetAll651.java | 53 +- .../cache/tier/sockets/command/GetAll70.java | 54 +- .../cache/tier/sockets/command/GetAllForRI.java | 2 +- .../sockets/command/GetAllWithCallback.java | 59 +- .../command/GetClientPRMetadataCommand.java | 26 +- .../command/GetClientPRMetadataCommand66.java | 26 +- .../GetClientPartitionAttributesCommand.java | 28 +- .../GetClientPartitionAttributesCommand66.java | 28 +- .../sockets/command/GetFunctionAttribute.java | 16 +- .../tier/sockets/command/GetPDXEnumById.java | 24 +- .../tier/sockets/command/GetPDXIdForEnum.java | 22 +- .../tier/sockets/command/GetPDXIdForType.java | 24 +- .../tier/sockets/command/GetPDXTypeById.java | 24 +- .../tier/sockets/command/GetPdxEnums70.java | 22 +- .../tier/sockets/command/GetPdxTypes70.java | 22 +- .../cache/tier/sockets/command/Invalid.java | 9 +- .../cache/tier/sockets/command/Invalidate.java | 74 +- .../tier/sockets/command/Invalidate70.java | 4 +- .../cache/tier/sockets/command/KeySet.java | 57 +- .../cache/tier/sockets/command/MakePrimary.java | 22 +- .../tier/sockets/command/ManagementCommand.java | 2 +- .../cache/tier/sockets/command/PeriodicAck.java | 32 +- .../cache/tier/sockets/command/Ping.java | 28 +- .../cache/tier/sockets/command/Put.java | 86 +-- .../cache/tier/sockets/command/Put61.java | 106 +-- .../cache/tier/sockets/command/Put65.java | 152 ++-- .../cache/tier/sockets/command/Put70.java | 4 +- .../cache/tier/sockets/command/PutAll.java | 84 +-- .../cache/tier/sockets/command/PutAll70.java | 98 +-- .../cache/tier/sockets/command/PutAll80.java | 112 ++- .../sockets/command/PutUserCredentials.java | 32 +- .../cache/tier/sockets/command/Query.java | 26 +- .../cache/tier/sockets/command/Query651.java | 41 +- .../command/RegisterDataSerializers.java | 30 +- .../sockets/command/RegisterInstantiators.java | 36 +- .../tier/sockets/command/RegisterInterest.java | 88 +-- .../sockets/command/RegisterInterest61.java | 98 +-- .../sockets/command/RegisterInterestList.java | 88 +-- .../sockets/command/RegisterInterestList61.java | 94 +-- .../sockets/command/RegisterInterestList66.java | 92 +-- .../cache/tier/sockets/command/RemoveAll.java | 103 ++- .../tier/sockets/command/RemoveUserAuth.java | 32 +- .../cache/tier/sockets/command/Request.java | 68 +- .../tier/sockets/command/RequestEventValue.java | 52 +- .../tier/sockets/command/RollbackCommand.java | 20 +- .../cache/tier/sockets/command/Size.java | 34 +- .../tier/sockets/command/TXFailoverCommand.java | 28 +- .../command/TXSynchronizationCommand.java | 51 +- .../sockets/command/UnregisterInterest.java | 50 +- .../sockets/command/UnregisterInterestList.java | 50 +- .../command/UpdateClientNotification.java | 4 +- .../cache/tier/sockets/command/CloseCQ.java | 34 +- .../cache/tier/sockets/command/ExecuteCQ.java | 42 +- .../cache/tier/sockets/command/ExecuteCQ61.java | 53 +- .../cache/tier/sockets/command/GetCQStats.java | 29 +- .../tier/sockets/command/GetDurableCQs.java | 40 +- .../cache/tier/sockets/command/MonitorCQ.java | 31 +- .../cache/tier/sockets/command/StopCQ.java | 34 +- 95 files changed, 2549 insertions(+), 2739 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/GemFireException.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/GemFireException.java b/geode-core/src/main/java/org/apache/geode/GemFireException.java index 02bf025..3a69307 100644 --- a/geode-core/src/main/java/org/apache/geode/GemFireException.java +++ b/geode-core/src/main/java/org/apache/geode/GemFireException.java @@ -12,74 +12,55 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode; /** * This is the abstract superclass of exceptions that are thrown to indicate incorrect usage of * GemFire. - * + * <p> * Since these exceptions are unchecked, this class really <em>ought</em> to be called - * <code>GemFireRuntimeException</code>; however, the current name is retained for compatibility's + * {@code GemFireRuntimeException}; however, the current name is retained for compatibility's * sake. - * - * @see org.apache.geode.GemFireCheckedException + * <p> + * This class is abstract to enforce throwing more specific exception types. Please avoid using + * GemFireException to describe an arbitrary error condition + * + * @see GemFireCheckedException * @see org.apache.geode.cache.CacheRuntimeException */ -// Implementation note: This class is abstract so that we are forced -// to have more specific exception types. We want to avoid using -// GemFireException to describe an arbitrary error condition (think -// GsError). public abstract class GemFireException extends RuntimeException { - public static final long serialVersionUID = -6972360779789402295L; - - /** The cause of this <code>GemFireException</code> */ - // private Throwable cause; - - ////////////////////// Constructors ////////////////////// + private static final long serialVersionUID = -6972360779789402295L; /** - * Creates a new <code>GemFireException</code> with no detailed message. + * Creates a new {@code GemFireException} with no detailed message. */ public GemFireException() { super(); } /** - * Creates a new <code>GemFireException</code> with the given detail message. + * Creates a new {@code GemFireException} with the given detail message. */ public GemFireException(String message) { super(message); } /** - * Creates a new <code>GemFireException</code> with the given detail message and cause. + * Creates a new {@code GemFireException} with the given detail message and cause. */ public GemFireException(String message, Throwable cause) { super(message, cause); - // this.cause = cause; } /** - * Creates a new <code>GemFireException</code> with the given cause and no detail message + * Creates a new {@code GemFireException} with the given cause and no detail message */ public GemFireException(Throwable cause) { super(cause); - // this.cause = cause; } - //////////////////// Instance Methods //////////////////// - - /** - * Returns the cause of this <code>GemFireException</code> or <code>null</code> if the cause is - * nonexistent or unknown. - */ - // public Throwable getCause() { - // return this.cause; - // } - /** - * Returns the root cause of this <code>GemFireException</code> or <code>null</code> if the cause + * Returns the root cause of this {@code GemFireException} or {@code null} if the cause * is nonexistent or unknown. */ public Throwable getRootCause() { @@ -93,52 +74,4 @@ public abstract class GemFireException extends RuntimeException { return root; } - // public void printStackTrace() { - // super.printStackTrace(); - // if (this.cause != null) { - // System.err.println("Caused by:"); - // this.cause.printStackTrace(); - // } - // } - - // public void printStackTrace(java.io.PrintWriter pw) { - // super.printStackTrace(pw); - // - // if (this.cause != null) { - // pw.println("Caused by:"); - // this.cause.printStackTrace(pw); - // } - // } - // - // public String getMessage() { - // if (this.cause != null) { - // String ourMsg = super.getMessage(); - // if (ourMsg == null || ourMsg.length() == 0) { - // //ourMsg = super.toString(); //causes inifinite recursion - // ourMsg = ""; - // } - // StringBuffer sb = new StringBuffer(ourMsg); - // sb.append(" Caused by: "); - // String causeMsg = this.cause.getMessage(); - // if (causeMsg == null || causeMsg.length() == 0) { - // causeMsg = this.cause.toString(); - // } - // sb.append(causeMsg); - // return sb.toString(); - // } else { - // return super.getMessage(); - // } - // } - - /** - * Represent the receiver as well as the cause - */ - // public String toString() { - // String result = super.toString(); - // if (cause != null) { - // result = result + ", caused by " + cause.toString(); - // } - // return result; - // } - } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/cache/CacheException.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/CacheException.java b/geode-core/src/main/java/org/apache/geode/cache/CacheException.java index 79591d6..6309ad1 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/CacheException.java +++ b/geode-core/src/main/java/org/apache/geode/cache/CacheException.java @@ -16,36 +16,34 @@ package org.apache.geode.cache; import org.apache.geode.GemFireException; - /** * A generic exception, which indicates a cache error has occurred. All the other cache exceptions * are subclasses of this class. This class is abstract and therefore only subclasses are * instantiated. * - * * @since GemFire 2.0 */ public abstract class CacheException extends GemFireException { - public static final long serialVersionUID = 7699432887938858940L; + private static final long serialVersionUID = 7699432887938858940L; - /** Constructs a new <code>CacheException</code>. */ + /** Constructs a new {@code CacheException}. */ public CacheException() { super(); } - /** Constructs a new <code>CacheException</code> with a message string. */ + /** Constructs a new {@code CacheException} with a message string. */ public CacheException(String s) { super(s); } /** - * Constructs a <code>CacheException</code> with a message string and a base exception + * Constructs a {@code CacheException} with a message string and a base exception */ public CacheException(String s, Throwable cause) { super(s, cause); } - /** Constructs a <code>CacheException</code> with a cause */ + /** Constructs a {@code CacheException} with a cause */ public CacheException(Throwable cause) { super(cause); } @@ -57,7 +55,7 @@ public abstract class CacheException extends GemFireException { if (cause != null) { String causeStr = cause.toString(); final String glue = ", caused by "; - StringBuffer sb = new StringBuffer(result.length() + causeStr.length() + glue.length()); + StringBuilder sb = new StringBuilder(result.length() + causeStr.length() + glue.length()); sb.append(result).append(glue).append(causeStr); result = sb.toString(); } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/cache/CacheRuntimeException.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/CacheRuntimeException.java b/geode-core/src/main/java/org/apache/geode/cache/CacheRuntimeException.java index a723b32..89b596f 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/CacheRuntimeException.java +++ b/geode-core/src/main/java/org/apache/geode/cache/CacheRuntimeException.java @@ -12,7 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.cache; import org.apache.geode.GemFireException; @@ -22,20 +21,18 @@ import org.apache.geode.GemFireException; * cache exceptions are the subclass of this class. This class is abstract so only subclasses can be * instantiated * - * * @since GemFire 3.0 */ public abstract class CacheRuntimeException extends GemFireException { - public static final long serialVersionUID = 6750107573015376688L; + private static final long serialVersionUID = 6750107573015376688L; /** - * Creates a new instance of <code>CacheRuntimeException</code> without detail message. + * Creates a new instance of {@code CacheRuntimeException} without detail message. */ public CacheRuntimeException() {} - /** - * Constructs an instance of <code>CacheRuntimeException</code> with the specified detail message. + * Constructs an instance of {@code CacheRuntimeException} with the specified detail message. * * @param msg the detail message */ @@ -44,7 +41,7 @@ public abstract class CacheRuntimeException extends GemFireException { } /** - * Constructs an instance of <code>CacheRuntimeException</code> with the specified detail message + * Constructs an instance of {@code CacheRuntimeException} with the specified detail message * and cause. * * @param msg the detail message @@ -55,7 +52,7 @@ public abstract class CacheRuntimeException extends GemFireException { } /** - * Constructs an instance of <code>CacheRuntimeException</code> with the specified cause. + * Constructs an instance of {@code CacheRuntimeException} with the specified cause. * * @param cause the causal Throwable */ @@ -70,7 +67,7 @@ public abstract class CacheRuntimeException extends GemFireException { if (cause != null) { String causeStr = cause.toString(); final String glue = ", caused by "; - StringBuffer sb = new StringBuffer(result.length() + causeStr.length() + glue.length()); + StringBuilder sb = new StringBuilder(result.length() + causeStr.length() + glue.length()); sb.append(result).append(glue).append(causeStr); result = sb.toString(); } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java index ac500e6..5b0d86b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java @@ -372,7 +372,7 @@ import org.apache.geode.internal.cache.snapshot.FlowController.FlowControlAbortM import org.apache.geode.internal.cache.snapshot.FlowController.FlowControlAckMessage; import org.apache.geode.internal.cache.snapshot.SnapshotPacket; import org.apache.geode.internal.cache.snapshot.SnapshotPacket.SnapshotRecord; -import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.ServerInterestRegistrationMessage; +import org.apache.geode.internal.cache.tier.sockets.ServerInterestRegistrationMessage; import org.apache.geode.internal.cache.tier.sockets.ClientBlacklistProcessor.ClientBlacklistMessage; import org.apache.geode.internal.cache.tier.sockets.ClientDataSerializerMessage; import org.apache.geode.internal.cache.tier.sockets.ClientInstantiatorMessage; http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 8d4eaf7..02d04b3 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -183,7 +183,6 @@ import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse; import org.apache.geode.internal.cache.partitioned.DestroyMessage; import org.apache.geode.internal.cache.partitioned.DestroyMessage.DestroyResponse; -import org.apache.geode.internal.cache.partitioned.DestroyRegionOnDataStoreMessage; import org.apache.geode.internal.cache.partitioned.DumpAllPRConfigMessage; import org.apache.geode.internal.cache.partitioned.DumpB2NRegion; import org.apache.geode.internal.cache.partitioned.DumpB2NRegion.DumpB2NResponse; @@ -4479,7 +4478,7 @@ public class PartitionedRegion extends LocalRegion values.addObjectPart(key, ge.value, ge.isObject, ge.versionTag); } - if (values.size() == BaseCommand.maximumChunkSize) { + if (values.size() == BaseCommand.MAXIMUM_CHUNK_SIZE) { BaseCommand.sendNewRegisterInterestResponseChunk(this, "keyList", values, false, servConn); values.clear(); http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java index f75a912..c0d3342 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java @@ -2057,6 +2057,21 @@ public class HARegionQueue implements RegionQueue { * a single peek thread. */ private static class BlockingHARegionQueue extends HARegionQueue { + + private static final String EVENT_ENQUEUE_WAIT_TIME_NAME = + DistributionConfig.GEMFIRE_PREFIX + "subscription.EVENT_ENQUEUE_WAIT_TIME"; + + private static final int DEFAULT_EVENT_ENQUEUE_WAIT_TIME = 100; + + /** + * System property name for indicating how much frequently the "Queue full" message should be + * logged. + */ + private static final String MAX_QUEUE_LOG_FREQUENCY = + DistributionConfig.GEMFIRE_PREFIX + "logFrequency.clientQueueReachedMaxLimit"; + + private static final long DEFAULT_LOG_FREQUENCY = 1000; + /** * Guards the Put permits */ @@ -2079,14 +2094,26 @@ public class HARegionQueue implements RegionQueue { */ private final Object permitMon = new Object(); - // Lock on which the take & remove threads block awaiting data from put - // operations + /** + * Lock on which the take & remove threads block awaiting data from put + * operations + */ private final StoppableReentrantLock lock; /** * Condition object on which peek & take threads will block */ - protected final StoppableCondition blockCond; + final StoppableCondition blockCond; + + /** + * System property value denoting the time in milliseconds. Any thread putting an event into a + * subscription queue, which is full, will wait this much time for the queue to make space. It'll + * then enqueue the event possibly causing the queue to grow beyond its capacity/max-size. See + * #51400. + */ + private final int enqueueEventWaitTime; + + private final long logFrequency; /** * @param hrqa HARegionQueueAttributes through which expiry time etc for the HARegionQueue can @@ -2097,16 +2124,43 @@ public class HARegionQueue implements RegionQueue { HARegionQueueAttributes hrqa, Map haContainer, ClientProxyMembershipID clientProxyId, final byte clientConflation, boolean isPrimary) throws IOException, ClassNotFoundException, CacheException, InterruptedException { + super(regionName, cache, hrqa, haContainer, clientProxyId, clientConflation, isPrimary); this.capacity = hrqa.getBlockingQueueCapacity(); this.putPermits = this.capacity; this.lock = new StoppableReentrantLock(this.region.getCancelCriterion()); - this.blockCond = lock.newCondition(); + this.blockCond = this.lock.newCondition(); super.putGIIDataInRegion(); - if (this.getClass() == BlockingHARegionQueue.class) { - initialized.set(true); + + if (getClass() == BlockingHARegionQueue.class) { + this.initialized.set(true); } + + this.enqueueEventWaitTime = calcEnqueueEventWaitTime(); + this.logFrequency = calcLogFrequency(); + } + + private static int calcEnqueueEventWaitTime() { + int value = + Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME); + if (value < 0) { + value = DEFAULT_EVENT_ENQUEUE_WAIT_TIME; + } + return value; + } + + private static long calcLogFrequency() { + long value; + try { + value = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY)); + if (value <= 0) { + value = DEFAULT_LOG_FREQUENCY; + } + } catch (NumberFormatException ignore) { + value = DEFAULT_LOG_FREQUENCY; + } + return value; } @Override @@ -2134,56 +2188,55 @@ public class HARegionQueue implements RegionQueue { * in the HARegionQueue. */ @Override - @edu.umd.cs.findbugs.annotations.SuppressWarnings("TLW_TWO_LOCK_WAIT") + @SuppressWarnings("TLW_TWO_LOCK_WAIT") void checkQueueSizeConstraint() throws InterruptedException { - if (this.haContainer instanceof HAContainerMap && isPrimary()) { // Fix for bug 39413 - if (Thread.interrupted()) - throw new InterruptedException(); - synchronized (this.putGuard) { - if (putPermits <= 0) { - synchronized (this.permitMon) { - if (reconcilePutPermits() <= 0) { - if (region.getSystem().getConfig().getRemoveUnresponsiveClient()) { - isClientSlowReciever = true; - } else { - try { - long logFrequency = CacheClientNotifier.DEFAULT_LOG_FREQUENCY; - CacheClientNotifier ccn = CacheClientNotifier.getInstance(); - if (ccn != null) { // check needed for junit tests - logFrequency = ccn.getLogFrequency(); - } - if ((this.maxQueueSizeHitCount % logFrequency) == 0) { - logger.warn(LocalizedMessage.create( - LocalizedStrings.HARegionQueue_CLIENT_QUEUE_FOR_0_IS_FULL, - new Object[] {region.getName()})); - this.maxQueueSizeHitCount = 0; - } - ++this.maxQueueSizeHitCount; - this.region.checkReadiness(); // fix for bug 37581 - // TODO: wait called while holding two locks - this.permitMon.wait(CacheClientNotifier.eventEnqueueWaitTime); - this.region.checkReadiness(); // fix for bug 37581 - // Fix for #51400. Allow the queue to grow beyond its - // capacity/maxQueueSize, if it is taking a long time to - // drain the queue, either due to a slower client or the - // deadlock scenario mentioned in the ticket. - reconcilePutPermits(); - if ((this.maxQueueSizeHitCount % logFrequency) == 1) { - logger.info(LocalizedMessage - .create(LocalizedStrings.HARegionQueue_RESUMING_WITH_PROCESSING_PUTS)); - } - } catch (InterruptedException ex) { - // TODO: The line below is meaningless. Comment it out later - this.permitMon.notifyAll(); - throw ex; + if (!(this.haContainer instanceof HAContainerMap && isPrimary())) { + // Fix for bug 39413 + return; + } + if (Thread.interrupted()) { + throw new InterruptedException(); + } + + synchronized (this.putGuard) { + if (this.putPermits <= 0) { + synchronized (this.permitMon) { + if (reconcilePutPermits() <= 0) { + if (this.region.getSystem().getConfig().getRemoveUnresponsiveClient()) { + this.isClientSlowReciever = true; + } else { + try { + if ((this.maxQueueSizeHitCount % this.logFrequency) == 0) { + logger.warn(LocalizedMessage.create( + LocalizedStrings.HARegionQueue_CLIENT_QUEUE_FOR_0_IS_FULL, + new Object[] { this.region.getName() })); + this.maxQueueSizeHitCount = 0; + } + ++this.maxQueueSizeHitCount; + this.region.checkReadiness(); // fix for bug 37581 + // TODO: wait called while holding two locks + this.permitMon.wait(this.enqueueEventWaitTime); + this.region.checkReadiness(); // fix for bug 37581 + // Fix for #51400. Allow the queue to grow beyond its + // capacity/maxQueueSize, if it is taking a long time to + // drain the queue, either due to a slower client or the + // deadlock scenario mentioned in the ticket. + reconcilePutPermits(); + if (this.maxQueueSizeHitCount % this.logFrequency == 1) { + logger.info(LocalizedMessage + .create(LocalizedStrings.HARegionQueue_RESUMING_WITH_PROCESSING_PUTS)); } + } catch (InterruptedException ex) { + // TODO: The line below is meaningless. Comment it out later + this.permitMon.notifyAll(); + throw ex; } } - } // synchronized (this.permitMon) - } // if (putPermits <= 0) - --putPermits; - } // synchronized (this.putGuard) - } + } + } // synchronized (this.permitMon) + } // if (putPermits <= 0) + --this.putPermits; + } // synchronized (this.putGuard) } /** http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Command.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Command.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Command.java index 0c1c42a..d7f7c7b 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Command.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Command.java @@ -12,22 +12,17 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * - */ package org.apache.geode.internal.cache.tier; -import org.apache.geode.internal.cache.tier.sockets.*; +import org.apache.geode.internal.cache.tier.sockets.Message; +import org.apache.geode.internal.cache.tier.sockets.ServerConnection; -/** - * - */ public interface Command { - public void execute(Message msg, ServerConnection servConn); + void execute(Message message, ServerConnection serverConnection); - public final int RESPONDED = 1; + int RESPONDED = 1; - public final int REQUIRES_RESPONSE = 2; + int REQUIRES_RESPONSE = 2; - public final int REQUIRES_CHUNKED_RESPONSE = 3; + int REQUIRES_CHUNKED_RESPONSE = 3; }