http://git-wip-us.apache.org/repos/asf/geode/blob/d393b4aa/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java index 7698550..8915c55 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java @@ -94,9 +94,9 @@ import org.apache.geode.security.AuthenticationRequiredException; import org.apache.geode.security.GemFireSecurityException; /** - * <code>CacheClientUpdater</code> is a thread that processes update messages from a cache server - * and {@linkplain org.apache.geode.cache.Region#localInvalidate(Object) invalidates} the local - * cache based on the contents of those messages. + * {@code CacheClientUpdater} is a thread that processes update messages from a cache server and + * {@linkplain org.apache.geode.cache.Region#localInvalidate(Object) invalidates} the local cache + * based on the contents of those messages. * * @since GemFire 3.5 */ @@ -104,6 +104,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn private static final Logger logger = LogService.getLogger(); + private static final int DEFAULT_SOCKET_BUFFER_SIZE = 32768; + /** * true if the constructor successfully created a connection. If false, the run method for this * thread immediately exits. @@ -129,6 +131,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn * The input stream of the socket */ private final InputStream in; + /** * Failed updater from the endpoint previously known as the primary */ @@ -139,12 +142,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn */ private final ByteBuffer commBuffer; - private boolean commBufferReleased; + private boolean commBufferReleased; // TODO: fix synchronization private final CCUStats stats; /** - * Cache for which we provide service + * Cache for which we provide service TODO: lifecycle and synchronization need work */ private /* final */ InternalCache cache; @@ -175,18 +178,18 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn */ private boolean isOpCompleted; - public final static String CLIENT_UPDATER_THREAD_NAME = "Cache Client Updater Thread "; + public static final String CLIENT_UPDATER_THREAD_NAME = "Cache Client Updater Thread "; /** - * to enable test flag + * to enable test flag TODO: eliminate isUsedByTest */ public static boolean isUsedByTest; /** * Indicates if full value was requested from server as a result of failure in applying delta - * bytes. + * bytes. TODO: only used for test assertion */ - public static boolean fullValueRequested = false; + static boolean fullValueRequested = false; private final ServerLocation location; @@ -195,8 +198,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn private EndpointManager eManager = null; private Endpoint endpoint = null; - static private final long MAX_CACHE_WAIT = Long - .getLong(DistributionConfig.GEMFIRE_PREFIX + "CacheClientUpdater.MAX_WAIT", 120).longValue(); // seconds + private static final long MAX_CACHE_WAIT = + Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "CacheClientUpdater.MAX_WAIT", 120); // seconds /** * Return true if cache appears @@ -231,7 +234,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn boolean interrupted = Thread.interrupted(); try { Thread.sleep(1000); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; } finally { if (interrupted) { @@ -245,12 +248,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn } /** - * Creates a new <code>CacheClientUpdater</code> with a given name that waits for a server to - * connect on a given port. + * Creates a new {@code CacheClientUpdater} with a given name that waits for a server to connect + * on a given port. * * @param name descriptive name, used for our ThreadGroup * @param location the endpoint we represent - * @param primary true if our endpoint is primary TODO ask the ep for this? + * @param primary true if our endpoint is primary * @param ids the system we are distributing messages through * * @throws AuthenticationRequiredException when client is not configured to send credentials using @@ -265,6 +268,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn Endpoint endpoint, int handshakeTimeout, SocketCreator socketCreator) throws AuthenticationRequiredException, AuthenticationFailedException, ServerRefusedConnectionException { + super(LoggingThreadGroup.createThreadGroup("Client update thread"), name); this.setDaemon(true); this.system = (InternalDistributedSystem) ids; @@ -276,6 +280,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn this.eManager = eManager; this.endpoint = endpoint; this.stats = new CCUStats(this.system, this.location); + // Create the connection... final boolean isDebugEnabled = logger.isDebugEnabled(); if (isDebugEnabled) { @@ -291,7 +296,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn try { // Size of the server-to-client communication socket buffers int socketBufferSize = - Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768).intValue(); + Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", DEFAULT_SOCKET_BUFFER_SIZE); mySock = socketCreator.connectForClient(location.getHostName(), location.getPort(), handshakeTimeout, socketBufferSize); @@ -327,31 +332,27 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn } } - { - int bufSize = 1024; - try { - bufSize = mySock.getSendBufferSize(); - if (bufSize < 1024) { - bufSize = 1024; - } - } catch (SocketException ignore) { + int bufSize = 1024; + try { + bufSize = mySock.getSendBufferSize(); + if (bufSize < 1024) { + bufSize = 1024; } - cb = ServerConnection.allocateCommBuffer(bufSize, mySock); - } - { - // create a "server" memberId we currently don't know much about the - // server. - // Would be nice for it to send us its member id - // TODO: change the serverId to use the endpoint's getMemberId() which - // returns a - // DistributedMember (once gfecq branch is merged to trunk). - MemberAttributes ma = - new MemberAttributes(0, -1, DistributionManager.NORMAL_DM_TYPE, -1, null, null, null); - sid = new InternalDistributedMember(mySock.getInetAddress(), mySock.getPort(), false, true, - ma); + } catch (SocketException ignore) { } + cb = ServerConnection.allocateCommBuffer(bufSize, mySock); + + // create a "server" memberId we currently don't know much about the server. + // Would be nice for it to send us its member id + // TODO: change the serverId to use the endpoint's getMemberId() which returns a + // DistributedMember (once gfecq branch is merged to trunk). + MemberAttributes ma = + new MemberAttributes(0, -1, DistributionManager.NORMAL_DM_TYPE, -1, null, null, null); + sid = + new InternalDistributedMember(mySock.getInetAddress(), mySock.getPort(), false, true, ma); + success = true; - } catch (ConnectException e) { + } catch (ConnectException ignore) { if (!quitting()) { logger.warn(LocalizedMessage .create(LocalizedStrings.CacheClientUpdater_0_CONNECTION_WAS_REFUSED, this)); @@ -385,20 +386,22 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn e.getMessage())); } } finally { - connected = success; + this.connected = success; if (mySock != null) { try { mySock.setSoTimeout(0); - } catch (SocketException e) { + } catch (SocketException ignore) { // ignore: nothing we can do about this } } - if (connected) { - socket = mySock; - out = tmpOut; - in = tmpIn; - serverId = sid; - commBuffer = cb; + + if (this.connected) { + this.socket = mySock; + this.out = tmpOut; + this.in = tmpIn; + this.serverId = sid; + this.commBuffer = cb; + // Don't want the timeout after handshake if (mySock != null) { try { @@ -406,12 +409,13 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn } catch (SocketException ignore) { } } + } else { - socket = null; - serverId = null; - commBuffer = null; - out = null; - in = null; + this.socket = null; + this.serverId = null; + this.commBuffer = null; + this.out = null; + this.in = null; if (mySock != null) { try { @@ -439,29 +443,31 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn } public boolean isConnected() { - return connected; + return this.connected; } + @Override public boolean isPrimary() { - return isPrimary; + return this.isPrimary; } public InternalLogWriter getSecurityLogger() { return this.qManager.getSecurityLogger(); } + @Override public void setFailedUpdater(ClientUpdater failedUpdater) { this.failedUpdater = failedUpdater; } /** - * Performs the work of the client update thread. Creates a <code>ServerSocket</code> and waits - * for the server to connect to it. + * Performs the work of the client update thread. Creates a {@code ServerSocket} and waits for the + * server to connect to it. */ @Override public void run() { + EntryLogger.setSource(this.serverId, "RI"); boolean addedListener = false; - EntryLogger.setSource(serverId, "RI"); try { this.system.addDisconnectListener(this); addedListener = true; @@ -472,8 +478,10 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn return; } processMessages(); - } catch (CancelException e) { - return; // just bail + + } catch (CancelException ignore) { + // just bail + } finally { if (addedListener) { this.system.removeDisconnectListener(this); @@ -486,8 +494,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn /** * Notifies this thread to stop processing */ - protected void stopProcessing() { - continueProcessing.set(false);// = false; + private void stopProcessing() { + this.continueProcessing.set(false); } /** @@ -495,39 +503,27 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn * duplicates. Note: this method is not named stop because this is a Thread which has a deprecated * stop method. */ - public void stopUpdater() { + private void stopUpdater() { boolean isSelfDestroying = Thread.currentThread() == this; - stopProcessing(); + // need to also close the socket for this interrupt to wakeup // the thread. This fixes bug 35691. - // this.close(); // this should not be done here. if (this.isAlive()) { if (logger.isDebugEnabled()) { logger.debug("{}: Stopping {}", this.location, this); } + if (!isSelfDestroying) { interrupt(); try { - if (socket != null) { - socket.close(); + if (this.socket != null) { + this.socket.close(); } - } catch (VirtualMachineError err) { - SystemFailure.initiateFailure(err); - // If this ever returns, rethrow the error. We're poisoned - // now, so don't let this thread continue. - throw err; - } catch (Throwable t) { - // Whenever you catch Error or Throwable, you must also - // catch VirtualMachineError (see above). However, there is - // _still_ a possibility that you are dealing with a cascading - // error condition, so you also need to check to see if the JVM - // is still usable: - SystemFailure.checkFailure(); - // dont care... + } catch (IOException e) { if (logger.isDebugEnabled()) { - logger.debug(t.getMessage(), t); + logger.debug(e.getMessage(), e); } } } // !isSelfDestroying @@ -537,32 +533,24 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn /** * Signals the run thread to stop, closes underlying resources. */ + @Override public void close() { - this.continueProcessing.set(false);// = false; // signals we are done. + this.continueProcessing.set(false); // signals we are done. - // Close the socket - // This will also cause the underlying streams to fail. + // Close the socket. This will also cause the underlying streams to fail. try { - if (socket != null) { - socket.close(); + if (this.socket != null) { + this.socket.close(); } - } catch (Exception e) { + } catch (IOException ignore) { // ignore } - try { - this.stats.close(); - } catch (Exception e) { - // ignore - } + this.stats.close(); // close the helper - try { - if (cacheHelper != null) { - cacheHelper.close(); - } - } catch (Exception e) { - // ignore + if (this.cacheHelper != null) { + this.cacheHelper.close(); } releaseCommBuffer(); } @@ -580,22 +568,24 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn /* refinement of method inherited from Thread */ @Override public String toString() { - return this.getName() + " (" + this.location.getHostName() + ":" + this.location.getPort() - + ")"; + return getName() + " (" + this.location.getHostName() + ':' + this.location.getPort() + ')'; } /** * Handle a marker message * - * @param m message containing the data + * @param clientMessage message containing the data */ - private void handleMarker(Message m) { + private void handleMarker(Message clientMessage) { try { final boolean isDebugEnabled = logger.isDebugEnabled(); if (isDebugEnabled) { - logger.debug("Received marker message of length ({} bytes)", m.getPayloadLength()); + logger.debug("Received marker message of length ({} bytes)", + clientMessage.getPayloadLength()); } + this.qManager.getState().processMarker(); + if (isDebugEnabled) { logger.debug("Processed marker message"); } @@ -610,41 +600,40 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn /** * Create or update an entry * - * @param m message containing the data + * @param clientMessage message containing the data */ - private void handleUpdate(Message m) { + private void handleUpdate(Message clientMessage) { String regionName = null; Object key = null; Part valuePart = null; - Object newValue = null; - byte[] deltaBytes = null; - Object fullValue = null; - boolean isValueObject = false; - int partCnt = 0; final boolean isDebugEnabled = logger.isDebugEnabled(); + try { this.isOpCompleted = false; + // Retrieve the data from the put message parts if (isDebugEnabled) { - logger.debug("Received put message of length ({} bytes)", m.getPayloadLength()); + logger.debug("Received put message of length ({} bytes)", clientMessage.getPayloadLength()); } - Part regionNamePart = m.getPart(partCnt++); - Part keyPart = m.getPart(partCnt++); - boolean isDeltaSent = ((Boolean) m.getPart(partCnt++).getObject()).booleanValue(); - valuePart = m.getPart(partCnt++); - Part callbackArgumentPart = m.getPart(partCnt++); - VersionTag versionTag = (VersionTag) m.getPart(partCnt++).getObject(); + int partCnt = 0; + Part regionNamePart = clientMessage.getPart(partCnt++); + Part keyPart = clientMessage.getPart(partCnt++); + boolean isDeltaSent = (Boolean) clientMessage.getPart(partCnt++).getObject(); + valuePart = clientMessage.getPart(partCnt++); + Part callbackArgumentPart = clientMessage.getPart(partCnt++); + VersionTag versionTag = (VersionTag) clientMessage.getPart(partCnt++).getObject(); if (versionTag != null) { versionTag.replaceNullIDs((InternalDistributedMember) this.endpoint.getMemberId()); } - Part isInterestListPassedPart = m.getPart(partCnt++); - Part hasCqsPart = m.getPart(partCnt++); + Part isInterestListPassedPart = clientMessage.getPart(partCnt++); + Part hasCqsPart = clientMessage.getPart(partCnt++); - EventID eventId = (EventID) m.getPart(m.getNumberOfParts() - 1).getObject(); + EventID eventId = + (EventID) clientMessage.getPart(clientMessage.getNumberOfParts() - 1).getObject(); - boolean withInterest = ((Boolean) isInterestListPassedPart.getObject()).booleanValue(); - boolean withCQs = ((Boolean) hasCqsPart.getObject()).booleanValue(); + boolean withInterest = (Boolean) isInterestListPassedPart.getObject(); + boolean withCQs = (Boolean) hasCqsPart.getObject(); regionName = regionNamePart.getString(); key = keyPart.getStringOrObject(); @@ -655,30 +644,39 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn // object, it will be stored as a CachedDeserializable and // deserialized only when requested. - boolean isCreate = (m.getMessageType() == MessageType.LOCAL_CREATE); + boolean isCreate = clientMessage.getMessageType() == MessageType.LOCAL_CREATE; + if (isDebugEnabled) { - logger - .debug( - "Putting entry for region: {} key: {} create: {}{} callbackArgument: {} withInterest={} withCQs={} eventID={} version={}", - regionName, key, isCreate, - (valuePart.isObject() ? new StringBuilder(" value: ") - .append(deserialize(valuePart.getSerializedForm())) : ""), - callbackArgument, withInterest, withCQs, eventId, versionTag); + logger.debug( + "Putting entry for region: {} key: {} create: {}{} callbackArgument: {} withInterest={} withCQs={} eventID={} version={}", + regionName, key, isCreate, + valuePart.isObject() + ? new StringBuilder(" value: ").append(deserialize(valuePart.getSerializedForm())) + : "", + callbackArgument, withInterest, withCQs, eventId, versionTag); } - LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName); + LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName); + + Object newValue = null; + byte[] deltaBytes = null; + Object fullValue = null; + boolean isValueObject; if (!isDeltaSent) { // bug #42162 - must check for a serialized null here byte[] serializedForm = valuePart.getSerializedForm(); + if (isCreate && InternalDataSerializer.isSerializedNull(serializedForm)) { // newValue = null; newValue is already null } else { newValue = valuePart.getSerializedForm(); } + if (withCQs) { fullValue = valuePart.getObject(); } + isValueObject = valuePart.isObject(); } else { deltaBytes = valuePart.getSerializedForm(); @@ -689,40 +687,49 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn if (isDebugEnabled && !quitting()) { logger.debug("{}: Region named {} does not exist", this, regionName); } + } else if (region.hasServerProxy() && ServerResponseMatrix - .checkForValidStateAfterNotification(region, key, m.getMessageType()) + .checkForValidStateAfterNotification(region, key, clientMessage.getMessageType()) && (withInterest || !withCQs)) { @Released EntryEventImpl newEvent = null; + try { // Create an event and put the entry newEvent = EntryEventImpl.create(region, - ((m.getMessageType() == MessageType.LOCAL_CREATE) ? Operation.CREATE - : Operation.UPDATE), + clientMessage.getMessageType() == MessageType.LOCAL_CREATE ? Operation.CREATE + : Operation.UPDATE, key, null /* newValue */, callbackArgument /* callbackArg */, true /* originRemote */, eventId.getDistributedMember()); + newEvent.setVersionTag(versionTag); newEvent.setFromServer(true); + region.basicBridgeClientUpdate(eventId.getDistributedMember(), key, newValue, deltaBytes, - isValueObject, callbackArgument, m.getMessageType() == MessageType.LOCAL_CREATE, - qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent, eventId); + isValueObject, callbackArgument, + clientMessage.getMessageType() == MessageType.LOCAL_CREATE, + this.qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent, + eventId); + this.isOpCompleted = true; + // bug 45520 - ConcurrentCacheModificationException is not thrown and we must check this // flag - // if (newEvent.isConcurrencyConflict()) { - // return; // this is logged elsewhere at fine level - // } if (withCQs && isDeltaSent) { fullValue = newEvent.getNewValue(); } - } catch (InvalidDeltaException ide) { + } catch (InvalidDeltaException ignore) { Part fullValuePart = requestFullValue(eventId, "Caught InvalidDeltaException."); region.getCachePerfStats().incDeltaFullValuesRequested(); - fullValue = newValue = fullValuePart.getObject(); - isValueObject = Boolean.valueOf(fullValuePart.isObject()); + fullValue = newValue = fullValuePart.getObject(); // TODO: fix this line + isValueObject = fullValuePart.isObject(); + region.basicBridgeClientUpdate(eventId.getDistributedMember(), key, newValue, null, - isValueObject, callbackArgument, m.getMessageType() == MessageType.LOCAL_CREATE, - qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent, eventId); + isValueObject, callbackArgument, + clientMessage.getMessageType() == MessageType.LOCAL_CREATE, + this.qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent, + eventId); + this.isOpCompleted = true; } finally { if (newEvent != null) @@ -737,20 +744,19 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn // Update CQs. CQs can exist without client region. if (withCQs) { - Part numCqsPart = m.getPart(partCnt++); + Part numCqsPart = clientMessage.getPart(partCnt++); if (isDebugEnabled) { logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}", numCqsPart.getInt() / 2); } - partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), key, fullValue, - deltaBytes, eventId); + partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(), + clientMessage.getMessageType(), key, fullValue, deltaBytes, eventId); this.isOpCompleted = true; } } catch (Exception e) { String message = LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_PUT_ENTRY_REGION_0_KEY_1_VALUE_2 - .toLocalizedString( - new Object[] {regionName, key, deserialize(valuePart.getSerializedForm())}); + .toLocalizedString(regionName, key, deserialize(valuePart.getSerializedForm())); handleException(message, e); } } @@ -763,12 +769,14 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn if (isDebugEnabled) { logger.debug("{} Requesting full value...", reason); } - Part result = (Part) GetEventValueOp.executeOnPrimary(qManager.getPool(), eventId, null); + Part result = (Part) GetEventValueOp.executeOnPrimary(this.qManager.getPool(), eventId, null); if (result == null) { // Just log a warning. Do not stop CCU thread. + // TODO: throw a subclass of Exception throw new Exception("Could not retrieve full value for " + eventId); } + if (isDebugEnabled) { logger.debug("Full value received."); } @@ -778,39 +786,41 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn /** * Invalidate an entry * - * @param m message describing the entry + * @param clientMessage message describing the entry */ - private void handleInvalidate(Message m) { + private void handleInvalidate(Message clientMessage) { String regionName = null; Object key = null; - int partCnt = 0; - final boolean isDebugEnabled = logger.isDebugEnabled(); + try { this.isOpCompleted = false; + // Retrieve the data from the local-invalidate message parts if (isDebugEnabled) { - logger.debug("Received invalidate message of length ({} bytes)", m.getPayloadLength()); + logger.debug("Received invalidate message of length ({} bytes)", + clientMessage.getPayloadLength()); } - Part regionNamePart = m.getPart(partCnt++); - Part keyPart = m.getPart(partCnt++); - Part callbackArgumentPart = m.getPart(partCnt++); + int partCnt = 0; + Part regionNamePart = clientMessage.getPart(partCnt++); + Part keyPart = clientMessage.getPart(partCnt++); + Part callbackArgumentPart = clientMessage.getPart(partCnt++); - VersionTag versionTag = (VersionTag) m.getPart(partCnt++).getObject(); + VersionTag versionTag = (VersionTag) clientMessage.getPart(partCnt++).getObject(); if (versionTag != null) { versionTag.replaceNullIDs((InternalDistributedMember) this.endpoint.getMemberId()); } - Part isInterestListPassedPart = m.getPart(partCnt++); - Part hasCqsPart = m.getPart(partCnt++); + Part isInterestListPassedPart = clientMessage.getPart(partCnt++); + Part hasCqsPart = clientMessage.getPart(partCnt++); regionName = regionNamePart.getString(); key = keyPart.getStringOrObject(); Object callbackArgument = callbackArgumentPart.getObject(); - boolean withInterest = ((Boolean) isInterestListPassedPart.getObject()).booleanValue(); - boolean withCQs = ((Boolean) hasCqsPart.getObject()).booleanValue(); + boolean withInterest = (Boolean) isInterestListPassedPart.getObject(); + boolean withCQs = (Boolean) hasCqsPart.getObject(); if (isDebugEnabled) { logger.debug( @@ -818,34 +828,36 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn regionName, key, callbackArgument, withInterest, withCQs, versionTag); } - LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName); + LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName); if (region == null) { if (isDebugEnabled && !quitting()) { logger.debug("Region named {} does not exist", regionName); } + } else { if (region.hasServerProxy() && (withInterest || !withCQs)) { try { - Part eid = m.getPart(m.getNumberOfParts() - 1); + Part eid = clientMessage.getPart(clientMessage.getNumberOfParts() - 1); EventID eventId = (EventID) eid.getObject(); + try { region.basicBridgeClientInvalidate(eventId.getDistributedMember(), key, callbackArgument, - qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId, + this.qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId, versionTag); - } catch (ConcurrentCacheModificationException e) { - // return; allow CQs to be processed + } catch (ConcurrentCacheModificationException ignore) { + // allow CQs to be processed } + this.isOpCompleted = true; // fix for 36615 - qManager.getState().incrementInvalidatedStats(); + this.qManager.getState().incrementInvalidatedStats(); if (isDebugEnabled) { logger.debug("Invalidated entry for region: {} key: {} callbackArgument: {}", regionName, key, callbackArgument); } - } catch (EntryNotFoundException e) { - /* ignore */ + } catch (EntryNotFoundException ignore) { if (isDebugEnabled && !quitting()) { logger.debug("Already invalidated entry for region: {} key: {} callbackArgument: {}", regionName, key, callbackArgument); @@ -858,19 +870,20 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn if (withCQs) { // The client may have been registered to receive invalidates for // create and updates operations. Get the actual region operation. - Part regionOpType = m.getPart(partCnt++); - Part numCqsPart = m.getPart(partCnt++); + Part regionOpType = clientMessage.getPart(partCnt++); + Part numCqsPart = clientMessage.getPart(partCnt++); if (isDebugEnabled) { logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}", numCqsPart.getInt() / 2); } - partCnt = processCqs(m, partCnt, numCqsPart.getInt(), regionOpType.getInt(), key, null); + partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(), regionOpType.getInt(), + key, null); this.isOpCompleted = true; } } catch (Exception e) { final String message = LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_INVALIDATE_ENTRY_REGION_0_KEY_1 - .toLocalizedString(new Object[] {regionName, key}); + .toLocalizedString(regionName, key); handleException(message, e); } } @@ -878,26 +891,27 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn /** * locally destroy an entry * - * @param m message describing the entry + * @param clientMessage message describing the entry */ - private void handleDestroy(Message m) { + private void handleDestroy(Message clientMessage) { String regionName = null; Object key = null; - int partCnt = 0; - final boolean isDebugEnabled = logger.isDebugEnabled(); + try { this.isOpCompleted = false; // Retrieve the data from the local-destroy message parts if (isDebugEnabled) { - logger.debug("Received destroy message of length ({} bytes)", m.getPayloadLength()); + logger.debug("Received destroy message of length ({} bytes)", + clientMessage.getPayloadLength()); } - Part regionNamePart = m.getPart(partCnt++); - Part keyPart = m.getPart(partCnt++); - Part callbackArgumentPart = m.getPart(partCnt++); + int partCnt = 0; + Part regionNamePart = clientMessage.getPart(partCnt++); + Part keyPart = clientMessage.getPart(partCnt++); + Part callbackArgumentPart = clientMessage.getPart(partCnt++); - VersionTag versionTag = (VersionTag) m.getPart(partCnt++).getObject(); + VersionTag versionTag = (VersionTag) clientMessage.getPart(partCnt++).getObject(); if (versionTag != null) { versionTag.replaceNullIDs((InternalDistributedMember) this.endpoint.getMemberId()); } @@ -905,8 +919,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn regionName = regionNamePart.getString(); key = keyPart.getStringOrObject(); - Part isInterestListPassedPart = m.getPart(partCnt++); - Part hasCqsPart = m.getPart(partCnt++); + Part isInterestListPassedPart = clientMessage.getPart(partCnt++); + Part hasCqsPart = clientMessage.getPart(partCnt++); boolean withInterest = ((Boolean) isInterestListPassedPart.getObject()).booleanValue(); boolean withCQs = ((Boolean) hasCqsPart.getObject()).booleanValue(); @@ -918,30 +932,32 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn regionName, key, callbackArgument, withInterest, withCQs, versionTag); } - LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName); - EventID eventId = null; + LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName); if (region == null) { if (isDebugEnabled && !quitting()) { logger.debug("Region named {} does not exist", regionName); } + } else if (region.hasServerProxy() && (withInterest || !withCQs)) { + EventID eventId = null; try { - Part eid = m.getPart(m.getNumberOfParts() - 1); + Part eid = clientMessage.getPart(clientMessage.getNumberOfParts() - 1); eventId = (EventID) eid.getObject(); + try { region.basicBridgeClientDestroy(eventId.getDistributedMember(), key, callbackArgument, - qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId, + this.qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId, versionTag); - } catch (ConcurrentCacheModificationException e) { - // return; allow CQs to be processed + } catch (ConcurrentCacheModificationException ignore) { + // allow CQs to be processed } + this.isOpCompleted = true; if (isDebugEnabled) { logger.debug("Destroyed entry for region: {} key: {} callbackArgument: {}", regionName, key, callbackArgument); } - } catch (EntryNotFoundException e) { - /* ignore */ + } catch (EntryNotFoundException ignore) { if (isDebugEnabled && !quitting()) { logger.debug( "Already destroyed entry for region: {} key: {} callbackArgument: {} eventId={}", @@ -952,18 +968,19 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn } if (withCQs) { - Part numCqsPart = m.getPart(partCnt++); + Part numCqsPart = clientMessage.getPart(partCnt++); if (isDebugEnabled) { logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}", numCqsPart.getInt() / 2); } - partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), key, null); + partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(), + clientMessage.getMessageType(), key, null); this.isOpCompleted = true; } } catch (Exception e) { String message = LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_DESTROY_ENTRY_REGION_0_KEY_1 - .toLocalizedString(new Object[] {regionName, key}); + .toLocalizedString(regionName, key); handleException(message, e); } } @@ -971,44 +988,44 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn /** * Locally destroy a region * - * @param m message describing the region + * @param clientMessage message describing the region */ - private void handleDestroyRegion(Message m) { - Part regionNamePart = null, callbackArgumentPart = null; + private void handleDestroyRegion(Message clientMessage) { String regionName = null; - Object callbackArgument = null; - LocalRegion region = null; - int partCnt = 0; - final boolean isDebugEnabled = logger.isDebugEnabled(); + try { // Retrieve the data from the local-destroy-region message parts if (isDebugEnabled) { - logger.debug("Received destroy region message of length ({} bytes)", m.getPayloadLength()); + logger.debug("Received destroy region message of length ({} bytes)", + clientMessage.getPayloadLength()); } - regionNamePart = m.getPart(partCnt++); - callbackArgumentPart = m.getPart(partCnt++); + int partCnt = 0; + Part regionNamePart = clientMessage.getPart(partCnt++); + Part callbackArgumentPart = clientMessage.getPart(partCnt++); regionName = regionNamePart.getString(); - callbackArgument = callbackArgumentPart.getObject(); + Object callbackArgument = callbackArgumentPart.getObject(); - Part hasCqsPart = m.getPart(partCnt++); + Part hasCqsPart = clientMessage.getPart(partCnt++); if (isDebugEnabled) { logger.debug("Destroying region: {} callbackArgument: {}", regionName, callbackArgument); } // Handle CQs if any on this region. - if (((Boolean) hasCqsPart.getObject()).booleanValue()) { - Part numCqsPart = m.getPart(partCnt++); + if ((Boolean) hasCqsPart.getObject()) { + Part numCqsPart = clientMessage.getPart(partCnt++); if (isDebugEnabled) { logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}", numCqsPart.getInt() / 2); } - partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), null, null); + // TODO: partCnt is unused -- does processCqs have side effects + partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(), + clientMessage.getMessageType(), null, null); } // Confirm that the region exists - region = (LocalRegion) cacheHelper.getRegion(regionName); + LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName); if (region == null) { if (isDebugEnabled && !quitting()) { logger.debug("Region named {} does not exist", regionName); @@ -1025,7 +1042,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn logger.debug("Destroyed region: {} callbackArgument: {}", regionName, callbackArgument); } } - } catch (RegionDestroyedException e) { // already destroyed + } catch (RegionDestroyedException ignore) { // already destroyed if (isDebugEnabled) { logger.debug("region already destroyed: {}", regionName); } @@ -1040,24 +1057,24 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn /** * Locally clear a region * - * @param m message describing the region to clear + * @param clientMessage message describing the region to clear */ - private void handleClearRegion(Message m) { + private void handleClearRegion(Message clientMessage) { String regionName = null; - int partCnt = 0; - final boolean isDebugEnabled = logger.isDebugEnabled(); + try { // Retrieve the data from the clear-region message parts if (isDebugEnabled) { logger.debug("{}: Received clear region message of length ({} bytes)", this, - m.getPayloadLength()); + clientMessage.getPayloadLength()); } - Part regionNamePart = m.getPart(partCnt++); - Part callbackArgumentPart = m.getPart(partCnt++); + int partCnt = 0; + Part regionNamePart = clientMessage.getPart(partCnt++); + Part callbackArgumentPart = clientMessage.getPart(partCnt++); - Part hasCqsPart = m.getPart(partCnt++); + Part hasCqsPart = clientMessage.getPart(partCnt++); regionName = regionNamePart.getString(); Object callbackArgument = callbackArgumentPart.getObject(); @@ -1065,17 +1082,18 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn logger.debug("Clearing region: {} callbackArgument: {}", regionName, callbackArgument); } - if (((Boolean) hasCqsPart.getObject()).booleanValue()) { - Part numCqsPart = m.getPart(partCnt++); + if ((Boolean) hasCqsPart.getObject()) { + Part numCqsPart = clientMessage.getPart(partCnt++); if (isDebugEnabled) { logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}", numCqsPart.getInt() / 2); } - partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), null, null); + partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(), + clientMessage.getMessageType(), null, null); } // Confirm that the region exists - LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName); + LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName); if (region == null) { if (isDebugEnabled && !quitting()) { logger.debug("Region named {} does not exist", regionName); @@ -1088,7 +1106,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn if (region.hasServerProxy()) { // Locally clear the region region.basicBridgeClientClear(callbackArgument, - qManager.getState().getProcessedMarker() || !this.isDurableClient); + this.qManager.getState().getProcessedMarker() || !this.isDurableClient); if (isDebugEnabled) { logger.debug("Cleared region: {} callbackArgument: {}", regionName, callbackArgument); @@ -1106,50 +1124,44 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn * Locally invalidate a region NOTE: Added as part of bug#38048. The code only takes care of CQ * processing. Support needs to be added for local region invalidate. * - * @param m message describing the region to clear + * @param clientMessage message describing the region to clear */ - private void handleInvalidateRegion(Message m) { + private void handleInvalidateRegion(Message clientMessage) { String regionName = null; - int partCnt = 0; - final boolean isDebugEnabled = logger.isDebugEnabled(); + try { // Retrieve the data from the invalidate-region message parts if (isDebugEnabled) { logger.debug("{}: Received invalidate region message of length ({} bytes)", this, - m.getPayloadLength()); + clientMessage.getPayloadLength()); } - Part regionNamePart = m.getPart(partCnt++); + int partCnt = 0; + Part regionNamePart = clientMessage.getPart(partCnt++); partCnt++; // Part callbackArgumentPart = m.getPart(partCnt++); - Part hasCqsPart = m.getPart(partCnt++); + Part hasCqsPart = clientMessage.getPart(partCnt++); regionName = regionNamePart.getString(); - // Object callbackArgument = callbackArgumentPart.getObject(); - if (((Boolean) hasCqsPart.getObject()).booleanValue()) { - Part numCqsPart = m.getPart(partCnt++); + if ((Boolean) hasCqsPart.getObject()) { + Part numCqsPart = clientMessage.getPart(partCnt++); if (isDebugEnabled) { logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}", numCqsPart.getInt() / 2); } - partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), null, null); + // TODO: partCnt is unused + partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(), + clientMessage.getMessageType(), null, null); } // Confirm that the region exists - LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName); + LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName); if (region == null) { if (isDebugEnabled && !quitting()) { logger.debug("Region named {} does not exist", regionName); } - return; - } - - // Verify that the region in question should respond to this - // message - if (region.hasServerProxy()) { - return; } } catch (Exception e) { @@ -1163,40 +1175,39 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn /** * Register instantiators locally * - * @param msg message describing the new instantiators + * @param clientMessage message describing the new instantiators * @param eventId eventId of the instantiators */ - private void handleRegisterInstantiator(Message msg, EventID eventId) { + private void handleRegisterInstantiator(Message clientMessage, EventID eventId) { String instantiatorClassName = null; final boolean isDebugEnabled = logger.isDebugEnabled(); + try { - int noOfParts = msg.getNumberOfParts(); + int noOfParts = clientMessage.getNumberOfParts(); if (isDebugEnabled) { logger.debug("{}: Received register instantiators message of parts {}", getName(), noOfParts); } + Assert.assertTrue((noOfParts - 1) % 3 == 0); - for (int i = 0; i < noOfParts - 1; i = i + 3) { + for (int i = 0; i < noOfParts - 1; i += 3) { instantiatorClassName = - (String) CacheServerHelper.deserialize(msg.getPart(i).getSerializedForm()); - String instantiatedClassName = - (String) CacheServerHelper.deserialize(msg.getPart(i + 1).getSerializedForm()); - int id = msg.getPart(i + 2).getInt(); + (String) CacheServerHelper.deserialize(clientMessage.getPart(i).getSerializedForm()); + String instantiatedClassName = (String) CacheServerHelper + .deserialize(clientMessage.getPart(i + 1).getSerializedForm()); + int id = clientMessage.getPart(i + 2).getInt(); InternalInstantiator.register(instantiatorClassName, instantiatedClassName, id, false, - eventId, null/* context */); - // distribute is false because we don't want to propagate this to - // servers recursively + eventId, null); + // distribute is false because we don't want to propagate this to servers recursively } // CALLBACK TESTING PURPOSE ONLY if (PoolImpl.IS_INSTANTIATOR_CALLBACK) { - ClientServerObserver bo = ClientServerObserverHolder.getInstance(); - bo.afterReceivingFromServer(eventId); + ClientServerObserver clientServerObserver = ClientServerObserverHolder.getInstance(); + clientServerObserver.afterReceivingFromServer(eventId); } - } - // TODO bug: can the following catch be more specific? - catch (Exception e) { + } catch (Exception e) { if (isDebugEnabled) { logger.debug("{}: Caught following exception while attempting to read Instantiator : {}", this, instantiatorClassName, e); @@ -1207,6 +1218,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn private void handleRegisterDataSerializer(Message msg, EventID eventId) { Class dataSerializerClass = null; final boolean isDebugEnabled = logger.isDebugEnabled(); + try { int noOfParts = msg.getNumberOfParts(); if (isDebugEnabled) { @@ -1220,8 +1232,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn (String) CacheServerHelper.deserialize(msg.getPart(i).getSerializedForm()); int id = msg.getPart(i + 1).getInt(); InternalDataSerializer.register(dataSerializerClassName, false, eventId, null, id); - // distribute is false because we don't want to propagate this to - // servers recursively + // distribute is false because we don't want to propagate this to servers recursively int numOfClasses = msg.getPart(i + 2).getInt(); int j = 0; @@ -1230,7 +1241,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn (String) CacheServerHelper.deserialize(msg.getPart(i + 3 + j).getSerializedForm()); InternalDataSerializer.updateSupportedClassesMap(dataSerializerClassName, className); } - i = i + 3 + j; + + i += 3 + j; } catch (ClassNotFoundException e) { if (isDebugEnabled) { logger.debug( @@ -1246,9 +1258,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn bo.afterReceivingFromServer(eventId); } - } - // TODO bug: can the following catch be more specific? - catch (Exception e) { + } catch (Exception e) { if (isDebugEnabled) { logger.debug("{}: Caught following exception while attempting to read DataSerializer : {}", this, dataSerializerClass, e); @@ -1259,93 +1269,87 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn /** * Processes message to invoke CQ listeners. */ - private int processCqs(Message m, int startMessagePart, int numCqParts, int messageType, - Object key, Object value) { - return processCqs(m, startMessagePart, numCqParts, messageType, key, value, null, - null/* eventId */); + private int processCqs(Message clientMessage, int startMessagePart, int numCqParts, + int messageType, Object key, Object value) { + return processCqs(clientMessage, startMessagePart, numCqParts, messageType, key, value, null, + null); } - private int processCqs(Message m, int startMessagePart, int numCqParts, int messageType, - Object key, Object value, byte[] delta, EventID eventId) { + private int processCqs(Message clientMessage, int startMessagePart, int numCqParts, + int messageType, Object key, Object value, byte[] delta, EventID eventId) { HashMap cqs = new HashMap(); final boolean isDebugEnabled = logger.isDebugEnabled(); for (int cqCnt = 0; cqCnt < numCqParts;) { - StringBuilder str = null; + StringBuilder sb = null; if (isDebugEnabled) { - str = new StringBuilder(100); - str.append("found these queries: "); + sb = new StringBuilder(100); + sb.append("found these queries: "); } try { // Get CQ Name. - Part cqNamePart = m.getPart(startMessagePart + (cqCnt++)); + Part cqNamePart = clientMessage.getPart(startMessagePart + cqCnt++); // Get CQ Op. - Part cqOpPart = m.getPart(startMessagePart + (cqCnt++)); - cqs.put(cqNamePart.getString(), Integer.valueOf(cqOpPart.getInt())); + Part cqOpPart = clientMessage.getPart(startMessagePart + cqCnt++); + cqs.put(cqNamePart.getString(), cqOpPart.getInt()); - if (str != null) { - str.append(cqNamePart.getString()).append(" op=").append(cqOpPart.getInt()).append(" "); + if (sb != null) { + sb.append(cqNamePart.getString()).append(" op=").append(cqOpPart.getInt()).append(" "); } - } catch (Exception ex) { + } catch (Exception ignore) { logger.warn(LocalizedMessage.create( LocalizedStrings.CacheClientUpdater_ERROR_WHILE_PROCESSING_THE_CQ_MESSAGE_PROBLEM_WITH_READING_MESSAGE_FOR_CQ_0, cqCnt)); } - if (isDebugEnabled && str != null) { - logger.debug(str); + if (isDebugEnabled) { + logger.debug(sb); } } - { - CqService cqService = this.cache.getCqService(); - try { - cqService.dispatchCqListeners(cqs, messageType, key, value, delta, qManager, eventId); - } catch (Exception ex) { - logger.warn(LocalizedMessage.create( - LocalizedStrings.CacheClientUpdater_FAILED_TO_INVOKE_CQ_DISPATCHER_ERROR___0, - ex.getMessage())); - if (isDebugEnabled) { - logger.debug("Failed to invoke CQ Dispatcher.", ex); - } + CqService cqService = this.cache.getCqService(); + try { + cqService.dispatchCqListeners(cqs, messageType, key, value, delta, this.qManager, eventId); + } catch (Exception ex) { + logger.warn(LocalizedMessage.create( + LocalizedStrings.CacheClientUpdater_FAILED_TO_INVOKE_CQ_DISPATCHER_ERROR___0, + ex.getMessage())); + if (isDebugEnabled) { + logger.debug("Failed to invoke CQ Dispatcher.", ex); } } - return (startMessagePart + numCqParts); + return startMessagePart + numCqParts; } - private void handleRegisterInterest(Message m) { + private void handleRegisterInterest(Message clientMessage) { String regionName = null; Object key = null; - int interestType; - byte interestResultPolicy; - boolean isDurable; - boolean receiveUpdatesAsInvalidates; - int partCnt = 0; - final boolean isDebugEnabled = logger.isDebugEnabled(); + try { // Retrieve the data from the add interest message parts if (isDebugEnabled) { logger.debug("{}: Received add interest message of length ({} bytes)", this, - m.getPayloadLength()); + clientMessage.getPayloadLength()); } - Part regionNamePart = m.getPart(partCnt++); - Part keyPart = m.getPart(partCnt++); - Part interestTypePart = m.getPart(partCnt++); - Part interestResultPolicyPart = m.getPart(partCnt++); - Part isDurablePart = m.getPart(partCnt++); - Part receiveUpdatesAsInvalidatesPart = m.getPart(partCnt++); + + int partCnt = 0; + Part regionNamePart = clientMessage.getPart(partCnt++); + Part keyPart = clientMessage.getPart(partCnt++); + Part interestTypePart = clientMessage.getPart(partCnt++); + Part interestResultPolicyPart = clientMessage.getPart(partCnt++); + Part isDurablePart = clientMessage.getPart(partCnt++); + Part receiveUpdatesAsInvalidatesPart = clientMessage.getPart(partCnt++); regionName = regionNamePart.getString(); key = keyPart.getStringOrObject(); - interestType = ((Integer) interestTypePart.getObject()).intValue(); - interestResultPolicy = ((Byte) interestResultPolicyPart.getObject()).byteValue(); - isDurable = ((Boolean) isDurablePart.getObject()).booleanValue(); - receiveUpdatesAsInvalidates = - ((Boolean) receiveUpdatesAsInvalidatesPart.getObject()).booleanValue(); + int interestType = (Integer) interestTypePart.getObject(); + byte interestResultPolicy = (Byte) interestResultPolicyPart.getObject(); + boolean isDurable = (Boolean) isDurablePart.getObject(); + boolean receiveUpdatesAsInvalidates = (Boolean) receiveUpdatesAsInvalidatesPart.getObject(); // Confirm that region exists - LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName); + LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName); if (region == null) { if (isDebugEnabled && !quitting()) { logger.debug("{}: Region named {} does not exist", this, regionName); @@ -1375,38 +1379,34 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn } } - private void handleUnregisterInterest(Message m) { + private void handleUnregisterInterest(Message clientMessage) { String regionName = null; Object key = null; - int interestType; - boolean isDurable; - boolean receiveUpdatesAsInvalidates; - int partCnt = 0; - final boolean isDebugEnabled = logger.isDebugEnabled(); + try { // Retrieve the data from the remove interest message parts if (isDebugEnabled) { logger.debug("{}: Received remove interest message of length ({} bytes)", this, - m.getPayloadLength()); + clientMessage.getPayloadLength()); } - Part regionNamePart = m.getPart(partCnt++); - Part keyPart = m.getPart(partCnt++); - Part interestTypePart = m.getPart(partCnt++); - Part isDurablePart = m.getPart(partCnt++); - Part receiveUpdatesAsInvalidatesPart = m.getPart(partCnt++); + int partCnt = 0; + Part regionNamePart = clientMessage.getPart(partCnt++); + Part keyPart = clientMessage.getPart(partCnt++); + Part interestTypePart = clientMessage.getPart(partCnt++); + Part isDurablePart = clientMessage.getPart(partCnt++); + Part receiveUpdatesAsInvalidatesPart = clientMessage.getPart(partCnt++); // Not reading the eventId part regionName = regionNamePart.getString(); key = keyPart.getStringOrObject(); - interestType = ((Integer) interestTypePart.getObject()).intValue(); - isDurable = ((Boolean) isDurablePart.getObject()).booleanValue(); - receiveUpdatesAsInvalidates = - ((Boolean) receiveUpdatesAsInvalidatesPart.getObject()).booleanValue(); + int interestType = (Integer) interestTypePart.getObject(); + boolean isDurable = (Boolean) isDurablePart.getObject(); + boolean receiveUpdatesAsInvalidates = (Boolean) receiveUpdatesAsInvalidatesPart.getObject(); // Confirm that region exists - LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName); + LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName); if (region == null) { if (isDebugEnabled) { logger.debug("{}: Region named {} does not exist", this, regionName); @@ -1434,14 +1434,17 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn } } - private void handleTombstoneOperation(Message msg) { + private void handleTombstoneOperation(Message clientMessage) { String regionName = "unknown"; + try { // not sure why this isn't done by the caller int partIdx = 0; + // see ClientTombstoneMessage.getGFE70Message - regionName = msg.getPart(partIdx++).getString(); - int op = msg.getPart(partIdx++).getInt(); - LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName); + regionName = clientMessage.getPart(partIdx++).getString(); + int op = clientMessage.getPart(partIdx++).getInt(); + LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName); + if (region == null) { if (!quitting()) { if (logger.isDebugEnabled()) { @@ -1450,24 +1453,29 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn } return; } + if (logger.isDebugEnabled()) { logger.debug("{}: Received tombstone operation for region {} with operation={}", this, region, op); } + if (!region.getConcurrencyChecksEnabled()) { return; } + switch (op) { case 0: Map<VersionSource, Long> regionGCVersions = - (Map<VersionSource, Long>) msg.getPart(partIdx++).getObject(); - EventID eventID = (EventID) msg.getPart(partIdx++).getObject(); + (Map<VersionSource, Long>) clientMessage.getPart(partIdx++).getObject(); + EventID eventID = (EventID) clientMessage.getPart(partIdx++).getObject(); region.expireTombstones(regionGCVersions, eventID, null); break; + case 1: - Set<Object> removedKeys = (Set<Object>) msg.getPart(partIdx++).getObject(); + Set<Object> removedKeys = (Set<Object>) clientMessage.getPart(partIdx++).getObject(); region.expireTombstoneKeys(removedKeys); break; + default: throw new IllegalArgumentException("unknown operation type " + op); } @@ -1483,22 +1491,21 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn */ private boolean quitting() { if (isInterrupted()) { - // Any time an interrupt is thrown at this thread, regard it as a - // request to terminate + // Any time an interrupt is thrown at this thread, regard it as a request to terminate return true; } - if (!continueProcessing.get()) { + if (!this.continueProcessing.get()) { // de facto flag indicating we are to stop return true; } - if (cache != null && cache.getCancelCriterion().isCancelInProgress()) { + if (this.cache != null && this.cache.getCancelCriterion().isCancelInProgress()) { // System is cancelling return true; } // The pool stuff is really sick, so it's possible for us to have a distributed // system that is not the same as our cache. Check it just in case... - if (system.getCancelCriterion().isCancelInProgress()) { + if (this.system.getCancelCriterion().isCancelInProgress()) { return true; } @@ -1520,15 +1527,15 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn this.failedUpdater.join(5000); } } - } catch (InterruptedException ie) { + } catch (InterruptedException ignore) { gotInterrupted = true; - return; // just bail, because I have not done anything yet + // just bail, because I have not done anything yet } finally { if (!gotInterrupted && this.failedUpdater != null) { logger.info(LocalizedMessage.create( LocalizedStrings.CacheClientUpdater_0_HAS_COMPLETED_WAITING_FOR_1, new Object[] {this, this.failedUpdater})); - failedUpdater = null; + this.failedUpdater = null; } } } @@ -1537,6 +1544,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn * Processes messages received from the server. * * Only certain types of messages are handled. + * + * TODO: Method 'processMessages' is too complex to analyze by data flow algorithm * * @see MessageType#CLIENT_MARKER * @see MessageType#LOCAL_CREATE @@ -1547,11 +1556,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn * @see MessageType#CLEAR_REGION * @see ClientUpdateMessage */ - protected void processMessages() { + private void processMessages() { final boolean isDebugEnabled = logger.isDebugEnabled(); try { - Part eid = null; - Message _message = initializeMessage(); + Message clientMessage = initializeMessage(); + if (quitting()) { if (isDebugEnabled) { logger.debug("processMessages quitting early because we have stopped"); @@ -1559,11 +1568,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn // our caller calls close which will notify all waiters for our init return; } + logger.info(LocalizedMessage .create(LocalizedStrings.CacheClientUpdater_0_READY_TO_PROCESS_MESSAGES, this)); - while (continueProcessing.get()) { - // SystemFailure.checkFailure(); dm will check this + while (this.continueProcessing.get()) { if (quitting()) { if (isDebugEnabled) { logger.debug("termination detected"); @@ -1583,12 +1592,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn try { // Read the message - _message.recv(); + clientMessage.recv(); // Wait for the previously failed cache client updater // to finish. This will avoid out of order messages. waitForFailedUpdater(); - cache.waitForRegisterInterestsInProgress(); + this.cache.waitForRegisterInterestsInProgress(); if (quitting()) { if (isDebugEnabled) { logger.debug("processMessages quitting before processing message"); @@ -1597,7 +1606,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn } // If the message is a ping, ignore it - if (_message.getMessageType() == MessageType.SERVER_TO_CLIENT_PING) { + if (clientMessage.getMessageType() == MessageType.SERVER_TO_CLIENT_PING) { if (isDebugEnabled) { logger.debug("{}: Received ping", this); } @@ -1605,76 +1614,80 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn } boolean isDeltaSent = false; - boolean isCreateOrUpdate = _message.getMessageType() == MessageType.LOCAL_CREATE - || _message.getMessageType() == MessageType.LOCAL_UPDATE; + boolean isCreateOrUpdate = clientMessage.getMessageType() == MessageType.LOCAL_CREATE + || clientMessage.getMessageType() == MessageType.LOCAL_UPDATE; if (isCreateOrUpdate) { - isDeltaSent = ((Boolean) _message.getPart(2).getObject()).booleanValue(); + isDeltaSent = (Boolean) clientMessage.getPart(2).getObject(); } // extract the eventId and verify if it is a duplicate event // if it is a duplicate event, ignore // @since GemFire 5.1 - int numberOfParts = _message.getNumberOfParts(); - eid = _message.getPart(numberOfParts - 1); + int numberOfParts = clientMessage.getNumberOfParts(); + Part eid = clientMessage.getPart(numberOfParts - 1); + // TODO the message handling methods also deserialized the eventID - inefficient EventID eventId = (EventID) eid.getObject(); // no need to verify if the instantiator msg is duplicate or not - if (_message.getMessageType() != MessageType.REGISTER_INSTANTIATORS - && _message.getMessageType() != MessageType.REGISTER_DATASERIALIZERS) { + if (clientMessage.getMessageType() != MessageType.REGISTER_INSTANTIATORS + && clientMessage.getMessageType() != MessageType.REGISTER_DATASERIALIZERS) { if (this.qManager.getState().verifyIfDuplicate(eventId, !(this.isDurableClient || isDeltaSent))) { continue; } } + if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) { - logger.trace(LogMarker.BRIDGE_SERVER, - "Processing event with id {}" + eventId.expensiveToString()); + logger.trace(LogMarker.BRIDGE_SERVER, "Processing event with id {}", + eventId.expensiveToString()); } + this.isOpCompleted = true; + // Process the message - switch (_message.getMessageType()) { + switch (clientMessage.getMessageType()) { case MessageType.LOCAL_CREATE: case MessageType.LOCAL_UPDATE: - handleUpdate(_message); + handleUpdate(clientMessage); break; case MessageType.LOCAL_INVALIDATE: - handleInvalidate(_message); + handleInvalidate(clientMessage); break; case MessageType.LOCAL_DESTROY: - handleDestroy(_message); + handleDestroy(clientMessage); break; case MessageType.LOCAL_DESTROY_REGION: - handleDestroyRegion(_message); + handleDestroyRegion(clientMessage); break; case MessageType.CLEAR_REGION: - handleClearRegion(_message); + handleClearRegion(clientMessage); break; case MessageType.REGISTER_INSTANTIATORS: - handleRegisterInstantiator(_message, eventId); + handleRegisterInstantiator(clientMessage, eventId); break; case MessageType.REGISTER_DATASERIALIZERS: - handleRegisterDataSerializer(_message, eventId); + handleRegisterDataSerializer(clientMessage, eventId); break; case MessageType.CLIENT_MARKER: - handleMarker(_message); + handleMarker(clientMessage); break; case MessageType.INVALIDATE_REGION: - handleInvalidateRegion(_message); + handleInvalidateRegion(clientMessage); break; case MessageType.CLIENT_REGISTER_INTEREST: - handleRegisterInterest(_message); + handleRegisterInterest(clientMessage); break; case MessageType.CLIENT_UNREGISTER_INTEREST: - handleUnregisterInterest(_message); + handleUnregisterInterest(clientMessage); break; case MessageType.TOMBSTONE_OPERATION: - handleTombstoneOperation(_message); + handleTombstoneOperation(clientMessage); break; default: logger.warn(LocalizedMessage.create( LocalizedStrings.CacheClientUpdater_0_RECEIVED_AN_UNSUPPORTED_MESSAGE_TYPE_1, - new Object[] {this, MessageType.getString(_message.getMessageType())})); + new Object[] {this, MessageType.getString(clientMessage.getMessageType())})); break; } @@ -1689,7 +1702,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn // likely to send pings... // and the ClientHealthMonitor will cause a disconnect - } catch (InterruptedIOException e) { + } catch (InterruptedIOException ignore) { // Per Sun's support web site, this exception seems to be peculiar // to Solaris, and may eventually not even be generated there. // @@ -1697,62 +1710,59 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn // isInterrupted() is false. (How very odd!) // // We regard it the same as an InterruptedException - this.endPointDied = true; - continueProcessing.set(false);// = false; + this.continueProcessing.set(false); if (isDebugEnabled) { logger.debug("InterruptedIOException"); } + } catch (IOException e) { - this.endPointDied = true; // Either the server went away, or we caught a closing condition. if (!quitting()) { // Server departed; print a message. - String message = ": Caught the following exception and will exit: "; - String errMessage = e.getMessage(); - if (errMessage == null) { - errMessage = ""; - } - ClientServerObserver bo = ClientServerObserverHolder.getInstance(); - bo.beforeFailoverByCacheClientUpdater(this.location); - eManager.serverCrashed(this.endpoint); + ClientServerObserver clientServerObserver = ClientServerObserverHolder.getInstance(); + clientServerObserver.beforeFailoverByCacheClientUpdater(this.location); + this.eManager.serverCrashed(this.endpoint); if (isDebugEnabled) { - logger.debug("" + message + e); + logger.debug("Caught the following exception and will exit", e); } } // !quitting // In any event, terminate this thread. - continueProcessing.set(false);// = false; + this.continueProcessing.set(false); if (isDebugEnabled) { logger.debug("terminated due to IOException"); } + } catch (Exception e) { if (!quitting()) { - this.endPointDied = true; - ClientServerObserver bo = ClientServerObserverHolder.getInstance(); - bo.beforeFailoverByCacheClientUpdater(this.location); - eManager.serverCrashed(this.endpoint); + ClientServerObserver clientServerObserver = ClientServerObserverHolder.getInstance(); + clientServerObserver.beforeFailoverByCacheClientUpdater(this.location); + this.eManager.serverCrashed(this.endpoint); String message = ": Caught the following exception and will exit: "; handleException(message, e); } + // In any event, terminate this thread. - continueProcessing.set(false);// = false; // force termination + this.continueProcessing.set(false);// = false; // force termination if (isDebugEnabled) { logger.debug("CCU terminated due to Exception"); } + } finally { - _message.clear(); + clientMessage.clear(); } } // while + } finally { if (isDebugEnabled) { logger.debug("has stopped and cleaning the helper .."); } - this.close(); // added to fixes some race conditions associated with 38382 + close(); // added to fix some race conditions associated with 38382 // this will make sure that if this thread dies without starting QueueMgr then it will start.. // 1. above we ignore InterruptedIOException and this thread dies without informing QueueMgr - // 2. if there is some other race codition with continueProcessing flag - this.qManager.checkEndpoint(this, endpoint); + // 2. if there is some other race condition with continueProcessing flag + this.qManager.checkEndpoint(this, this.endpoint); } } @@ -1785,12 +1795,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn */ private Object deserialize(byte[] serializedBytes) { Object deserializedObject = serializedBytes; - // This is a debugging method so ignore all exceptions like - // ClassNotFoundException + // This is a debugging method so ignore all exceptions like ClassNotFoundException try { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(serializedBytes)); deserializedObject = DataSerializer.readObject(dis); - } catch (Exception e) { + } catch (ClassNotFoundException | IOException ignore) { } return deserializedObject; } @@ -1799,18 +1808,14 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn * @return the local port of our {@link #socket} */ protected int getLocalPort() { - return socket.getLocalPort(); + return this.socket.getLocalPort(); } + @Override public void onDisconnect(InternalDistributedSystem sys) { stopUpdater(); } - /** - * true if the EndPoint represented by this updater thread has died. - */ - private volatile boolean endPointDied = false; - private void verifySocketBufferSize(int requestedBufferSize, int actualBufferSize, String type) { if (actualBufferSize < requestedBufferSize) { logger.info(LocalizedMessage.create( @@ -1826,11 +1831,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn * @since GemFire 5.7 */ public static class CCUStats implements MessageStats { - // static fields + private static final StatisticsType type; - private final static int messagesBeingReceivedId; - private final static int messageBytesBeingReceivedId; - private final static int receivedBytesId; + private static final int messagesBeingReceivedId; + private static final int messageBytesBeingReceivedId; + private static final int receivedBytesId; static { StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton(); @@ -1852,7 +1857,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn // instance fields private final Statistics stats; - public CCUStats(DistributedSystem ids, ServerLocation location) { + CCUStats(DistributedSystem ids, ServerLocation location) { // no need for atomic since only a single thread will be writing these this.stats = ids.createStatistics(type, "CacheClientUpdater-" + location); } @@ -1861,25 +1866,29 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn this.stats.close(); } + @Override public void incReceivedBytes(long v) { this.stats.incLong(receivedBytesId, v); } + @Override public void incSentBytes(long v) { // noop since we never send messages } + @Override public void incMessagesBeingReceived(int bytes) { - stats.incInt(messagesBeingReceivedId, 1); + this.stats.incInt(messagesBeingReceivedId, 1); if (bytes > 0) { - stats.incLong(messageBytesBeingReceivedId, bytes); + this.stats.incLong(messageBytesBeingReceivedId, bytes); } } + @Override public void decMessagesBeingReceived(int bytes) { - stats.incInt(messagesBeingReceivedId, -1); + this.stats.incInt(messagesBeingReceivedId, -1); if (bytes > 0) { - stats.incLong(messageBytesBeingReceivedId, -bytes); + this.stats.incLong(messageBytesBeingReceivedId, -bytes); } } @@ -1893,7 +1902,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn } } + @Override public boolean isProcessing() { - return continueProcessing.get(); + return this.continueProcessing.get(); } }
http://git-wip-us.apache.org/repos/asf/geode/blob/d393b4aa/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java index be30061..39c2f3a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java @@ -152,7 +152,8 @@ public class ChunkedMessage extends Message { public void setLastChunkAndNumParts(boolean lastChunk, int numParts) { setLastChunk(lastChunk); - if (this.serverConnection != null && this.serverConnection.getClientVersion().compareTo(Version.GFE_65) >= 0) { + if (this.serverConnection != null + && this.serverConnection.getClientVersion().compareTo(Version.GFE_65) >= 0) { // we us e three bits for number of parts in last chunk byte // we us e three bits for number of parts in last chunk byte byte localLastChunk = (byte) (numParts << 5); @@ -240,7 +241,8 @@ public class ChunkedMessage extends Message { int totalBytesRead = 0; do { int bytesRead = 0; - bytesRead = inputStream.read(cb.array(), totalBytesRead, CHUNK_HEADER_LENGTH - totalBytesRead); + bytesRead = + inputStream.read(cb.array(), totalBytesRead, CHUNK_HEADER_LENGTH - totalBytesRead); if (bytesRead == -1) { throw new EOFException( LocalizedStrings.ChunkedMessage_CHUNK_READ_ERROR_CONNECTION_RESET.toLocalizedString()); http://git-wip-us.apache.org/repos/asf/geode/blob/d393b4aa/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java index 354ad0f..2ac6fea 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java @@ -84,7 +84,8 @@ public class Message { // Tentative workaround to avoid OOM stated in #46754. public static final ThreadLocal<Integer> MESSAGE_TYPE = new ThreadLocal<>(); - public static final String MAX_MESSAGE_SIZE_PROPERTY = DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size"; + public static final String MAX_MESSAGE_SIZE_PROPERTY = + DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size"; static final int DEFAULT_MAX_MESSAGE_SIZE = 1073741824; @@ -299,8 +300,8 @@ public class Message { } else { HeapDataOutputStream hdos = new HeapDataOutputStream(str); try { - this.messageModified = true; - part.setPartState(hdos, false); + this.messageModified = true; + part.setPartState(hdos, false); } finally { close(hdos); } @@ -309,8 +310,8 @@ public class Message { } /* - * Adds a new part to this message that contains a {@code byte} array (as opposed to a - * serialized object). + * Adds a new part to this message that contains a {@code byte} array (as opposed to a serialized + * object). * * @see #addPart(byte[], boolean) */ @@ -378,7 +379,7 @@ public class Message { if (this.version.equals(Version.CURRENT)) { v = null; } - + // create the HDOS with a flag telling it that it can keep any byte[] or ByteBuffers/ByteSources // passed to it. HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v, true); @@ -399,12 +400,12 @@ public class Message { if (zipValues) { throw new UnsupportedOperationException("zipValues no longer supported"); } - + Version v = this.version; if (this.version.equals(Version.CURRENT)) { v = null; } - + HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v); try { BlobHelper.serializeTo(o, hdos); @@ -520,7 +521,8 @@ public class Message { } protected void packHeaderInfoForSending(int msgLen, boolean isSecurityHeader) { - // setting second bit of flags byte for client this is not require but this makes all changes easily at client side right now just see this bit and process security header + // setting second bit of flags byte for client this is not require but this makes all changes + // easily at client side right now just see this bit and process security header byte flagsByte = this.flags; if (isSecurityHeader) { flagsByte |= MESSAGE_HAS_SECURE_PART; @@ -529,7 +531,7 @@ public class Message { flagsByte |= MESSAGE_IS_RETRY; } getCommBuffer().putInt(this.messageType).putInt(msgLen).putInt(this.numberOfParts) - .putInt(this.transactionId).put(flagsByte); + .putInt(this.transactionId).put(flagsByte); } protected Part getSecurityPart() { @@ -601,7 +603,7 @@ public class Message { if (msgLen > this.maxMessageSize) { throw new MessageTooLargeException("Message size (" + msgLen - + ") exceeds gemfire.client.max-message-size setting (" + this.maxMessageSize + ")"); + + ") exceeds gemfire.client.max-message-size setting (" + this.maxMessageSize + ")"); } commBuffer.clear(); @@ -673,7 +675,7 @@ public class Message { void fetchHeader() throws IOException { final ByteBuffer cb = getCommBuffer(); cb.clear(); - + // messageType is invalidated here and can be used as an indicator // of problems reading the message this.messageType = MessageType.INVALID; @@ -693,7 +695,7 @@ public class Message { } } while (cb.remaining() > 0); cb.flip(); - + } else { int hdr = 0; do { @@ -728,7 +730,7 @@ public class Message { throw new IOException(LocalizedStrings.Message_INVALID_MESSAGE_TYPE_0_WHILE_READING_HEADER .toLocalizedString(type)); } - + int timeToWait = 0; if (this.serverConnection != null) { // Keep track of the fact that a message is being processed. @@ -736,7 +738,7 @@ public class Message { timeToWait = this.serverConnection.getClientReadTimeout(); } this.readHeader = true; - + if (this.messageLimiter != null) { for (;;) { this.serverConnection.getCachedRegionHelper().checkCancelInProgress(null); @@ -764,15 +766,13 @@ public class Message { } } // for } - + if (len > 0) { if (this.maxIncomingMessageLength > 0 && len > this.maxIncomingMessageLength) { throw new IOException(LocalizedStrings.Message_MESSAGE_SIZE_0_EXCEEDED_MAX_LIMIT_OF_1 - .toLocalizedString(new Object[] { - len, this.maxIncomingMessageLength - })); + .toLocalizedString(new Object[] {len, this.maxIncomingMessageLength})); } - + if (this.dataLimiter != null) { for (;;) { if (this.serverConnection != null) { @@ -840,7 +840,7 @@ public class Message { if (len > 0 && numParts <= 0 || len <= 0 && numParts > 0) { throw new IOException( LocalizedStrings.Message_PART_LENGTH_0_AND_NUMBER_OF_PARTS_1_INCONSISTENT - .toLocalizedString(new Object[] { len, numParts })); + .toLocalizedString(new Object[] {len, numParts})); } Integer msgType = MESSAGE_TYPE.get(); @@ -854,7 +854,7 @@ public class Message { + MessageType.getString(msgType) + " operation."); } } - + setNumberOfParts(numParts); if (numParts <= 0) { return; @@ -872,7 +872,8 @@ public class Message { int readSecurePart = checkAndSetSecurityPart(); int bytesRemaining = len; - for (int i = 0; i < numParts + readSecurePart || readSecurePart == 1 && cb.remaining() > 0; i++) { + for (int i = 0; i < numParts + readSecurePart + || readSecurePart == 1 && cb.remaining() > 0; i++) { int bytesReadThisTime = readPartChunk(bytesRemaining); bytesRemaining -= bytesReadThisTime; @@ -887,7 +888,7 @@ public class Message { int partLen = cb.getInt(); byte partType = cb.get(); byte[] partBytes = null; - + if (partLen > 0) { partBytes = new byte[partLen]; int alreadyReadBytes = cb.remaining(); @@ -897,7 +898,7 @@ public class Message { } cb.get(partBytes, 0, alreadyReadBytes); } - + // now we need to read partLen - alreadyReadBytes off the wire int off = alreadyReadBytes; int remaining = partLen - off; @@ -965,20 +966,20 @@ public class Message { // we already have the next part header in commBuffer so just return return 0; } - + if (commBuffer.position() != 0) { commBuffer.compact(); } else { commBuffer.position(commBuffer.limit()); commBuffer.limit(commBuffer.capacity()); } - + if (this.serverConnection != null) { // Keep track of the fact that we are making progress this.serverConnection.updateProcessingMessage(); } int bytesRead = 0; - + if (this.socketChannel != null) { int remaining = commBuffer.remaining(); if (remaining > bytesRemaining) { @@ -1006,7 +1007,7 @@ public class Message { bytesToRead = bytesRemaining; } int pos = commBuffer.position(); - + while (bytesToRead > 0) { int res = this.inputStream.read(commBuffer.array(), pos, bytesToRead); if (res != -1) { @@ -1022,7 +1023,7 @@ public class Message { .toLocalizedString()); } } - + commBuffer.position(pos); } commBuffer.flip(); http://git-wip-us.apache.org/repos/asf/geode/blob/d393b4aa/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index dfda14f..485ccae 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -723,7 +723,10 @@ public class ServerConnection implements Runnable { ThreadState threadState = null; try { if (msg != null) { - // Since this thread is not interrupted when the cache server is shutdown, test again after a message has been read. This is a bit of a hack. I think this thread should be interrupted, but currently AcceptorImpl doesn't keep track of the threads that it launches. + // Since this thread is not interrupted when the cache server is shutdown, test again after + // a message has been read. This is a bit of a hack. I think this thread should be + // interrupted, but currently AcceptorImpl doesn't keep track of the threads that it + // launches. if (!this.processMessages || (crHelper.isShutdown())) { if (logger.isDebugEnabled()) { logger.debug("{} ignoring message of type {} from client {} due to shutdown.",