http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java index d44a4ad..704f2da 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java @@ -79,14 +79,14 @@ public class GatewayReceiverCommand extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keyPart = null, valuePart = null, callbackArgPart = null; String regionName = null; Object callbackArg = null, key = null; int partNumber = 0; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - GatewayReceiverStats stats = (GatewayReceiverStats) servConn.getCacheServerStats(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + GatewayReceiverStats stats = (GatewayReceiverStats) serverConnection.getCacheServerStats(); EventID eventId = null; LocalRegion region = null; List<BatchException70> exceptions = new ArrayList<BatchException70>(); @@ -102,20 +102,20 @@ public class GatewayReceiverCommand extends BaseCommand { // statement so that all messages can take advantage of it. boolean earlyAck = false;// msg.getEarlyAck(); - stats.incBatchSize(msg.getPayloadLength()); + stats.incBatchSize(clientMessage.getPayloadLength()); // Retrieve the number of events - Part numberOfEventsPart = msg.getPart(0); + Part numberOfEventsPart = clientMessage.getPart(0); int numberOfEvents = numberOfEventsPart.getInt(); stats.incEventsReceived(numberOfEvents); // Retrieve the batch id - Part batchIdPart = msg.getPart(1); + Part batchIdPart = clientMessage.getPart(1); int batchId = batchIdPart.getInt(); // If this batch has already been seen, do not reply. // Instead, drop the batch and continue. - if (batchId <= servConn.getLatestBatchIdReplied()) { + if (batchId <= serverConnection.getLatestBatchIdReplied()) { if (GatewayReceiver.APPLY_RETRIES) { // Do nothing!!! logger.warn(LocalizedMessage.create( @@ -125,17 +125,17 @@ public class GatewayReceiverCommand extends BaseCommand { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_RECEIVED_PROCESS_BATCH_REQUEST_0_THAT_HAS_ALREADY_BEEN_OR_IS_BEING_PROCESSED__THIS_PROCESS_BATCH_REQUEST_IS_BEING_IGNORED, batchId)); - writeReply(msg, servConn, batchId, numberOfEvents); + writeReply(clientMessage, serverConnection, batchId, numberOfEvents); return; } stats.incDuplicateBatchesReceived(); } // Verify the batches arrive in order - if (batchId != servConn.getLatestBatchIdReplied() + 1) { + if (batchId != serverConnection.getLatestBatchIdReplied() + 1) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_RECEIVED_PROCESS_BATCH_REQUEST_0_OUT_OF_ORDER_THE_ID_OF_THE_LAST_BATCH_PROCESSED_WAS_1_THIS_BATCH_REQUEST_WILL_BE_PROCESSED_BUT_SOME_MESSAGES_MAY_HAVE_BEEN_LOST, - new Object[] {batchId, servConn.getLatestBatchIdReplied()})); + new Object[] {batchId, serverConnection.getLatestBatchIdReplied()})); stats.incOutoforderBatchesReceived(); } @@ -146,7 +146,7 @@ public class GatewayReceiverCommand extends BaseCommand { // If early ack mode, acknowledge right away // Not sure if earlyAck makes sense with sliding window if (earlyAck) { - servConn.incrementLatestBatchIdReplied(batchId); + serverConnection.incrementLatestBatchIdReplied(batchId); // writeReply(msg, servConn); // servConn.setAsTrue(RESPONDED); @@ -162,13 +162,13 @@ public class GatewayReceiverCommand extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug( "{}: Received process batch request {} containing {} events ({} bytes) with {} acknowledgement on {}", - servConn.getName(), batchId, numberOfEvents, msg.getPayloadLength(), - (earlyAck ? "early" : "normal"), servConn.getSocketString()); + serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(), + (earlyAck ? "early" : "normal"), serverConnection.getSocketString()); if (earlyAck) { logger.debug( "{}: Sent process batch early response for batch {} containing {} events ({} bytes) with {} acknowledgement on {}", - servConn.getName(), batchId, numberOfEvents, msg.getPayloadLength(), - (earlyAck ? "early" : "normal"), servConn.getSocketString()); + serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(), + (earlyAck ? "early" : "normal"), serverConnection.getSocketString()); } } // logger.warn("Received process batch request " + batchId + " containing @@ -185,10 +185,10 @@ public class GatewayReceiverCommand extends BaseCommand { // Retrieve the events from the message parts. The '2' below // represents the number of events (part0) and the batchId (part1) partNumber = 2; - int dsid = msg.getPart(partNumber++).getInt(); + int dsid = clientMessage.getPart(partNumber++).getInt(); boolean removeOnException = - msg.getPart(partNumber++).getSerializedForm()[0] == 1 ? true : false; + clientMessage.getPart(partNumber++).getSerializedForm()[0] == 1 ? true : false; // Keep track of whether a response has been written for // exceptions @@ -202,7 +202,7 @@ public class GatewayReceiverCommand extends BaseCommand { indexWithoutPDXEvent++; // System.out.println("Processing event " + i + " in batch " + batchId + " // starting with part number " + partNumber); - Part actionTypePart = msg.getPart(partNumber); + Part actionTypePart = clientMessage.getPart(partNumber); int actionType = actionTypePart.getInt(); long versionTimeStamp = VersionTag.ILLEGAL_VERSION_TIMESTAMP; @@ -211,14 +211,15 @@ public class GatewayReceiverCommand extends BaseCommand { boolean callbackArgExists = false; try { - Part possibleDuplicatePart = msg.getPart(partNumber + 1); + Part possibleDuplicatePart = clientMessage.getPart(partNumber + 1); byte[] possibleDuplicatePartBytes; try { possibleDuplicatePartBytes = (byte[]) possibleDuplicatePart.getObject(); } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] { + serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; @@ -231,7 +232,7 @@ public class GatewayReceiverCommand extends BaseCommand { callbackArg = null; // Retrieve the region name from the message parts - regionNamePart = msg.getPart(partNumber + 2); + regionNamePart = clientMessage.getPart(partNumber + 2); regionName = regionNamePart.getString(); if (regionName.equals(PeerTypeRegistration.REGION_FULL_PATH)) { indexWithoutPDXEvent--; @@ -243,28 +244,30 @@ public class GatewayReceiverCommand extends BaseCommand { // duplication of events, but it is unused now. In // fact the event id is overridden by the FROM_GATEWAY // token. - Part eventIdPart = msg.getPart(partNumber + 3); - eventIdPart.setVersion(servConn.getClientVersion()); + Part eventIdPart = clientMessage.getPart(partNumber + 3); + eventIdPart.setVersion(serverConnection.getClientVersion()); // String eventId = eventIdPart.getString(); try { eventId = (EventID) eventIdPart.getObject(); } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] { + serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; } // Retrieve the key from the message parts - keyPart = msg.getPart(partNumber + 4); + keyPart = clientMessage.getPart(partNumber + 4); try { key = keyPart.getStringOrObject(); } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] { + serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; @@ -281,7 +284,7 @@ public class GatewayReceiverCommand extends BaseCommand { */ // Retrieve the value from the message parts (do not deserialize it) - valuePart = msg.getPart(partNumber + 5); + valuePart = clientMessage.getPart(partNumber + 5); // try { // logger.warn(getName() + ": Creating key " + key + " value " + // valuePart.getObject()); @@ -289,18 +292,19 @@ public class GatewayReceiverCommand extends BaseCommand { // Retrieve the callbackArg from the message parts if necessary int index = partNumber + 6; - callbackArgExistsPart = msg.getPart(index++); { + callbackArgExistsPart = clientMessage.getPart(index++); { byte[] partBytes = (byte[]) callbackArgExistsPart.getObject(); callbackArgExists = partBytes[0] == 0x01; } if (callbackArgExists) { - callbackArgPart = msg.getPart(index++); + callbackArgPart = clientMessage.getPart(index++); try { callbackArg = callbackArgPart.getObject(); } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_CREATE_REQUEST_1_FOR_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] { + serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; @@ -309,14 +313,14 @@ public class GatewayReceiverCommand extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug( "{}: Processing batch create request {} on {} for region {} key {} value {} callbackArg {}, eventId={}", - servConn.getName(), batchId, servConn.getSocketString(), regionName, key, + serverConnection.getName(), batchId, serverConnection.getSocketString(), regionName, key, valuePart, callbackArg, eventId); } - versionTimeStamp = msg.getPart(index++).getLong(); + versionTimeStamp = clientMessage.getPart(index++).getLong(); // Process the create request if (key == null || regionName == null) { StringId message = null; - Object[] messageArgs = new Object[] {servConn.getName(), Integer.valueOf(batchId)}; + Object[] messageArgs = new Object[] { serverConnection.getName(), Integer.valueOf(batchId)}; if (key == null) { message = LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_CREATE_REQUEST_1_IS_NULL; @@ -331,7 +335,7 @@ public class GatewayReceiverCommand extends BaseCommand { } region = (LocalRegion) crHelper.getRegion(regionName); if (region == null) { - handleRegionNull(servConn, regionName, batchId); + handleRegionNull(serverConnection, regionName, batchId); } else { clientEvent = new EventIDHolder(eventId); if (versionTimeStamp > 0) { @@ -348,7 +352,7 @@ public class GatewayReceiverCommand extends BaseCommand { boolean isObject = valuePart.isObject(); // [sumedh] This should be done on client while sending // since that is the WAN gateway - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { PutOperationContext putContext = authzRequest.putAuthorize(regionName, key, value, isObject, callbackArg); @@ -361,29 +365,31 @@ public class GatewayReceiverCommand extends BaseCommand { result = addPdxType(crHelper, key, value); } else { result = region.basicBridgeCreate(key, value, isObject, callbackArg, - servConn.getProxyID(), false, clientEvent, false); + serverConnection.getProxyID(), false, clientEvent, false); // If the create fails (presumably because it already exists), // attempt to update the entry if (!result) { result = region.basicBridgePut(key, value, null, isObject, callbackArg, - servConn.getProxyID(), false, clientEvent); + serverConnection.getProxyID(), false, clientEvent); } } if (result || clientEvent.isConcurrencyConflict()) { - servConn.setModificationInfo(true, regionName, key); + serverConnection.setModificationInfo(true, regionName, key); stats.incCreateRequest(); } else { // This exception will be logged in the catch block below throw new Exception( LocalizedStrings.ProcessBatch_0_FAILED_TO_CREATE_OR_UPDATE_ENTRY_FOR_REGION_1_KEY_2_VALUE_3_CALLBACKARG_4 - .toLocalizedString(new Object[] {servConn.getName(), regionName, key, + .toLocalizedString(new Object[] { + serverConnection.getName(), regionName, key, valuePart, callbackArg})); } } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_CREATE_REQUEST_1_FOR_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] { + serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; @@ -400,7 +406,7 @@ public class GatewayReceiverCommand extends BaseCommand { */ // Retrieve the value from the message parts (do not deserialize it) - valuePart = msg.getPart(partNumber + 5); + valuePart = clientMessage.getPart(partNumber + 5); // try { // logger.warn(getName() + ": Updating key " + key + " value " + // valuePart.getObject()); @@ -408,34 +414,35 @@ public class GatewayReceiverCommand extends BaseCommand { // Retrieve the callbackArg from the message parts if necessary index = partNumber + 6; - callbackArgExistsPart = msg.getPart(index++); { + callbackArgExistsPart = clientMessage.getPart(index++); { byte[] partBytes = (byte[]) callbackArgExistsPart.getObject(); callbackArgExists = partBytes[0] == 0x01; } if (callbackArgExists) { - callbackArgPart = msg.getPart(index++); + callbackArgPart = clientMessage.getPart(index++); try { callbackArg = callbackArgPart.getObject(); } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_REQUEST_1_CONTAINING_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] { + serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; } } - versionTimeStamp = msg.getPart(index++).getLong(); + versionTimeStamp = clientMessage.getPart(index++).getLong(); if (logger.isDebugEnabled()) { logger.debug( "{}: Processing batch update request {} on {} for region {} key {} value {} callbackArg {}", - servConn.getName(), batchId, servConn.getSocketString(), regionName, key, + serverConnection.getName(), batchId, serverConnection.getSocketString(), regionName, key, valuePart, callbackArg); } // Process the update request if (key == null || regionName == null) { StringId message = null; - Object[] messageArgs = new Object[] {servConn.getName(), Integer.valueOf(batchId)}; + Object[] messageArgs = new Object[] { serverConnection.getName(), Integer.valueOf(batchId)}; if (key == null) { message = LocalizedStrings.ProcessBatch_0_THE_INPUT_KEY_FOR_THE_BATCH_UPDATE_REQUEST_1_IS_NULL; @@ -450,7 +457,7 @@ public class GatewayReceiverCommand extends BaseCommand { } region = (LocalRegion) crHelper.getRegion(regionName); if (region == null) { - handleRegionNull(servConn, regionName, batchId); + handleRegionNull(serverConnection, regionName, batchId); } else { clientEvent = new EventIDHolder(eventId); if (versionTimeStamp > 0) { @@ -465,7 +472,7 @@ public class GatewayReceiverCommand extends BaseCommand { try { byte[] value = valuePart.getSerializedForm(); boolean isObject = valuePart.isObject(); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { PutOperationContext putContext = authzRequest.putAuthorize(regionName, key, value, isObject, callbackArg, PutOperationContext.UPDATE); @@ -477,14 +484,14 @@ public class GatewayReceiverCommand extends BaseCommand { result = addPdxType(crHelper, key, value); } else { result = region.basicBridgePut(key, value, null, isObject, callbackArg, - servConn.getProxyID(), false, clientEvent); + serverConnection.getProxyID(), false, clientEvent); } if (result || clientEvent.isConcurrencyConflict()) { - servConn.setModificationInfo(true, regionName, key); + serverConnection.setModificationInfo(true, regionName, key); stats.incUpdateRequest(); } else { final Object[] msgArgs = - new Object[] {servConn.getName(), regionName, key, valuePart, callbackArg}; + new Object[] { serverConnection.getName(), regionName, key, valuePart, callbackArg}; final StringId message = LocalizedStrings.ProcessBatch_0_FAILED_TO_UPDATE_ENTRY_FOR_REGION_1_KEY_2_VALUE_3_AND_CALLBACKARG_4; String s = message.toLocalizedString(msgArgs); @@ -493,16 +500,16 @@ public class GatewayReceiverCommand extends BaseCommand { } } catch (CancelException e) { // FIXME better exception hierarchy would avoid this check - if (servConn.getCachedRegionHelper().getCache().getCancelCriterion() - .isCancelInProgress()) { + if (serverConnection.getCachedRegionHelper().getCache().getCancelCriterion() + .isCancelInProgress()) { if (logger.isDebugEnabled()) { logger.debug( "{} ignoring message of type {} from client {} because shutdown occurred during message processing.", - servConn.getName(), MessageType.getString(msg.getMessageType()), - servConn.getProxyID()); + serverConnection.getName(), MessageType.getString(clientMessage.getMessageType()), + serverConnection.getProxyID()); } - servConn.setFlagProcessMessagesAsFalse(); - servConn.setClientDisconnectedException(e); + serverConnection.setFlagProcessMessagesAsFalse(); + serverConnection.setClientDisconnectedException(e); } else { throw e; } @@ -511,7 +518,8 @@ public class GatewayReceiverCommand extends BaseCommand { // Preserve the connection under all circumstances logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_REQUEST_1_CONTAINING_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] { + serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; @@ -521,28 +529,29 @@ public class GatewayReceiverCommand extends BaseCommand { case 2: // Destroy // Retrieve the callbackArg from the message parts if necessary index = partNumber + 5; - callbackArgExistsPart = msg.getPart(index++); { + callbackArgExistsPart = clientMessage.getPart(index++); { byte[] partBytes = (byte[]) callbackArgExistsPart.getObject(); callbackArgExists = partBytes[0] == 0x01; } if (callbackArgExists) { - callbackArgPart = msg.getPart(index++); + callbackArgPart = clientMessage.getPart(index++); try { callbackArg = callbackArgPart.getObject(); } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_DESTROY_REQUEST_1_CONTAINING_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] { + serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; } } - versionTimeStamp = msg.getPart(index++).getLong(); + versionTimeStamp = clientMessage.getPart(index++).getLong(); if (logger.isDebugEnabled()) { logger.debug("{}: Processing batch destroy request {} on {} for region {} key {}", - servConn.getName(), batchId, servConn.getSocketString(), regionName, key); + serverConnection.getName(), batchId, serverConnection.getSocketString(), regionName, key); } // Process the destroy request @@ -556,14 +565,14 @@ public class GatewayReceiverCommand extends BaseCommand { message = LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_DESTROY_REQUEST_1_IS_NULL; } - Object[] messageArgs = new Object[] {servConn.getName(), Integer.valueOf(batchId)}; + Object[] messageArgs = new Object[] { serverConnection.getName(), Integer.valueOf(batchId)}; String s = message.toLocalizedString(messageArgs); logger.warn(s); throw new Exception(s); } region = (LocalRegion) crHelper.getRegion(regionName); if (region == null) { - handleRegionNull(servConn, regionName, batchId); + handleRegionNull(serverConnection, regionName, batchId); } else { clientEvent = new EventIDHolder(eventId); if (versionTimeStamp > 0) { @@ -576,20 +585,20 @@ public class GatewayReceiverCommand extends BaseCommand { handleMessageRetry(region, clientEvent); // Destroy the entry try { - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { DestroyOperationContext destroyContext = authzRequest.destroyAuthorize(regionName, key, callbackArg); callbackArg = destroyContext.getCallbackArg(); } - region.basicBridgeDestroy(key, callbackArg, servConn.getProxyID(), false, + region.basicBridgeDestroy(key, callbackArg, serverConnection.getProxyID(), false, clientEvent); - servConn.setModificationInfo(true, regionName, key); + serverConnection.setModificationInfo(true, regionName, key); stats.incDestroyRequest(); } catch (EntryNotFoundException e) { logger.info(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_DURING_BATCH_DESTROY_NO_ENTRY_WAS_FOUND_FOR_KEY_1, - new Object[] {servConn.getName(), key})); + new Object[] { serverConnection.getName(), key})); // throw new Exception(e); } } @@ -598,43 +607,44 @@ public class GatewayReceiverCommand extends BaseCommand { try { // Region name - regionNamePart = msg.getPart(partNumber + 2); + regionNamePart = clientMessage.getPart(partNumber + 2); regionName = regionNamePart.getString(); // Retrieve the event id from the message parts - eventIdPart = msg.getPart(partNumber + 3); + eventIdPart = clientMessage.getPart(partNumber + 3); eventId = (EventID) eventIdPart.getObject(); // Retrieve the key from the message parts - keyPart = msg.getPart(partNumber + 4); + keyPart = clientMessage.getPart(partNumber + 4); key = keyPart.getStringOrObject(); // Retrieve the callbackArg from the message parts if necessary index = partNumber + 5; - callbackArgExistsPart = msg.getPart(index++); + callbackArgExistsPart = clientMessage.getPart(index++); byte[] partBytes = (byte[]) callbackArgExistsPart.getObject(); callbackArgExists = partBytes[0] == 0x01; if (callbackArgExists) { - callbackArgPart = msg.getPart(index++); + callbackArgPart = clientMessage.getPart(index++); callbackArg = callbackArgPart.getObject(); } } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_VERSION_REQUEST_1_CONTAINING_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] { + serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; } - versionTimeStamp = msg.getPart(index++).getLong(); + versionTimeStamp = clientMessage.getPart(index++).getLong(); if (logger.isDebugEnabled()) { logger.debug( "{}: Processing batch update-version request {} on {} for region {} key {} value {} callbackArg {}", - servConn.getName(), batchId, servConn.getSocketString(), regionName, key, + serverConnection.getName(), batchId, serverConnection.getSocketString(), regionName, key, valuePart, callbackArg); } // Process the update time-stamp request @@ -642,7 +652,8 @@ public class GatewayReceiverCommand extends BaseCommand { StringId message = LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_VERSION_REQUEST_1_CONTAINING_2_EVENTS; - Object[] messageArgs = new Object[] {servConn.getName(), Integer.valueOf(batchId), + Object[] messageArgs = new Object[] { + serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}; String s = message.toLocalizedString(messageArgs); logger.warn(s); @@ -652,7 +663,7 @@ public class GatewayReceiverCommand extends BaseCommand { region = (LocalRegion) crHelper.getRegion(regionName); if (region == null) { - handleRegionNull(servConn, regionName, batchId); + handleRegionNull(serverConnection, regionName, batchId); } else { clientEvent = new EventIDHolder(eventId); @@ -668,13 +679,13 @@ public class GatewayReceiverCommand extends BaseCommand { // Update the version tag try { - region.basicBridgeUpdateVersionStamp(key, callbackArg, servConn.getProxyID(), + region.basicBridgeUpdateVersionStamp(key, callbackArg, serverConnection.getProxyID(), false, clientEvent); } catch (EntryNotFoundException e) { logger.info(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_DURING_BATCH_UPDATE_VERSION_NO_ENTRY_WAS_FOUND_FOR_KEY_1, - new Object[] {servConn.getName(), key})); + new Object[] { serverConnection.getName(), key})); // throw new Exception(e); } } @@ -684,29 +695,30 @@ public class GatewayReceiverCommand extends BaseCommand { default: logger.fatal(LocalizedMessage.create( LocalizedStrings.Processbatch_0_UNKNOWN_ACTION_TYPE_1_FOR_BATCH_FROM_2, - new Object[] {servConn.getName(), Integer.valueOf(actionType), - servConn.getSocketString()})); + new Object[] { + serverConnection.getName(), Integer.valueOf(actionType), + serverConnection.getSocketString()})); stats.incUnknowsOperationsReceived(); } } catch (CancelException e) { if (logger.isDebugEnabled()) { logger.debug( "{} ignoring message of type {} from client {} because shutdown occurred during message processing.", - servConn.getName(), MessageType.getString(msg.getMessageType()), - servConn.getProxyID()); + serverConnection.getName(), MessageType.getString(clientMessage.getMessageType()), + serverConnection.getProxyID()); } - servConn.setFlagProcessMessagesAsFalse(); - servConn.setClientDisconnectedException(e); + serverConnection.setFlagProcessMessagesAsFalse(); + serverConnection.setClientDisconnectedException(e); return; } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // If we have an issue with the PDX registry, stop processing more data if (e.getCause() instanceof PdxRegistryMismatchException) { fatalException = e.getCause(); logger.fatal(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_PDX_CONFIGURATION, - new Object[] {servConn.getMembershipID()}), e.getCause()); + new Object[] { serverConnection.getMembershipID()}), e.getCause()); break; } @@ -772,26 +784,26 @@ public class GatewayReceiverCommand extends BaseCommand { stats.incProcessBatchTime(start - oldStart); } if (fatalException != null) { - servConn.incrementLatestBatchIdReplied(batchId); - writeFatalException(msg, fatalException, servConn, batchId); - servConn.setAsTrue(RESPONDED); + serverConnection.incrementLatestBatchIdReplied(batchId); + writeFatalException(clientMessage, fatalException, serverConnection, batchId); + serverConnection.setAsTrue(RESPONDED); } else if (!exceptions.isEmpty()) { - servConn.incrementLatestBatchIdReplied(batchId); - writeBatchException(msg, exceptions, servConn, batchId); - servConn.setAsTrue(RESPONDED); + serverConnection.incrementLatestBatchIdReplied(batchId); + writeBatchException(clientMessage, exceptions, serverConnection, batchId); + serverConnection.setAsTrue(RESPONDED); } else if (!wroteResponse) { // Increment the batch id unless the received batch id is -1 (a failover // batch) - servConn.incrementLatestBatchIdReplied(batchId); + serverConnection.incrementLatestBatchIdReplied(batchId); - writeReply(msg, servConn, batchId, numberOfEvents); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection, batchId, numberOfEvents); + serverConnection.setAsTrue(RESPONDED); stats.incWriteProcessBatchResponseTime(DistributionStats.getStatTime() - start); if (logger.isDebugEnabled()) { logger.debug( "{}: Sent process batch normal response for batch {} containing {} events ({} bytes) with {} acknowledgement on {}", - servConn.getName(), batchId, numberOfEvents, msg.getPayloadLength(), - (earlyAck ? "early" : "normal"), servConn.getSocketString()); + serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(), + (earlyAck ? "early" : "normal"), serverConnection.getSocketString()); } // logger.warn("Sent process batch normal response for batch " + // batchId + " containing " + numberOfEvents + " events (" +
http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java index 5cb1e41..7017aa8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java @@ -54,17 +54,17 @@ public class Get70 extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long startparam) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long startparam) throws IOException { long start = startparam; Part regionNamePart = null, keyPart = null, valuePart = null; String regionName = null; Object callbackArg = null, key = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - CacheServerStats stats = servConn.getCacheServerStats(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + CacheServerStats stats = serverConnection.getCacheServerStats(); StringId errMessage = null; - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); // requiresResponse = true; { long oldStart = start; @@ -72,18 +72,18 @@ public class Get70 extends BaseCommand { stats.incReadGetRequestTime(start - oldStart); } // Retrieve the data from the message parts - int parts = msg.getNumberOfParts(); - regionNamePart = msg.getPart(0); - keyPart = msg.getPart(1); + int parts = clientMessage.getNumberOfParts(); + regionNamePart = clientMessage.getPart(0); + keyPart = clientMessage.getPart(1); // valuePart = null; (redundant assignment) if (parts > 2) { - valuePart = msg.getPart(2); + valuePart = clientMessage.getPart(2); try { callbackArg = valuePart.getObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); // responded = true; - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -91,15 +91,15 @@ public class Get70 extends BaseCommand { try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); // responded = true; - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { logger.debug("{}: Received 7.0 get request ({} bytes) from {} for region {} key {} txId {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key, - msg.getTransactionId()); + serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), regionName, key, + clientMessage.getTransactionId()); } // Process the get request @@ -113,18 +113,18 @@ public class Get70 extends BaseCommand { errMessage = LocalizedStrings.Request_THE_INPUT_REGION_NAME_FOR_THE_GET_REQUEST_IS_NULL; } String s = errMessage.toLocalizedString(); - logger.warn("{}: {}", servConn.getName(), s); - writeErrorResponse(msg, MessageType.REQUESTDATAERROR, s, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), s); + writeErrorResponse(clientMessage, MessageType.REQUESTDATAERROR, s, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - Region region = servConn.getCache().getRegion(regionName); + Region region = serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = LocalizedStrings.Request__0_WAS_NOT_FOUND_DURING_GET_REQUEST .toLocalizedString(regionName); - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -133,14 +133,14 @@ public class Get70 extends BaseCommand { // for integrated security this.securityService.authorizeRegionRead(regionName, key.toString()); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { getContext = authzRequest.getAuthorize(regionName, key, callbackArg); callbackArg = getContext.getCallbackArg(); } } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -148,10 +148,10 @@ public class Get70 extends BaseCommand { // the value if it is a byte[]. Entry entry; try { - entry = getEntry(region, key, callbackArg, servConn); + entry = getEntry(region, key, callbackArg, serverConnection); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -164,7 +164,7 @@ public class Get70 extends BaseCommand { boolean keyNotPresent = entry.keyNotPresent; try { - AuthorizeRequestPP postAuthzRequest = servConn.getPostAuthzRequest(); + AuthorizeRequestPP postAuthzRequest = serverConnection.getPostAuthzRequest(); if (postAuthzRequest != null) { try { getContext = postAuthzRequest.getAuthorize(regionName, key, data, isObject, getContext); @@ -182,8 +182,8 @@ public class Get70 extends BaseCommand { } } } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -197,23 +197,23 @@ public class Get70 extends BaseCommand { if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion) region; if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { - writeResponseWithRefreshMetadata(data, callbackArg, msg, isObject, servConn, pr, + writeResponseWithRefreshMetadata(data, callbackArg, clientMessage, isObject, serverConnection, pr, pr.getNetworkHopType(), versionTag, keyNotPresent); pr.clearNetworkHopData(); } else { - writeResponse(data, callbackArg, msg, isObject, versionTag, keyNotPresent, servConn); + writeResponse(data, callbackArg, clientMessage, isObject, versionTag, keyNotPresent, serverConnection); } } else { - writeResponse(data, callbackArg, msg, isObject, versionTag, keyNotPresent, servConn); + writeResponse(data, callbackArg, clientMessage, isObject, versionTag, keyNotPresent, serverConnection); } } finally { OffHeapHelper.release(originalData); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { - logger.debug("{}: Wrote get response back to {} for region {} {}", servConn.getName(), - servConn.getSocketString(), regionName, entry); + logger.debug("{}: Wrote get response back to {} for region {} {}", serverConnection.getName(), + serverConnection.getSocketString(), regionName, entry); } stats.incWriteGetResponseTime(DistributionStats.getStatTime() - start); @@ -379,12 +379,12 @@ public class Get70 extends BaseCommand { } @Override - protected void writeReply(Message origMsg, ServerConnection servConn) throws IOException { + protected void writeReply(Message origMsg, ServerConnection serverConnection) throws IOException { throw new UnsupportedOperationException(); } @Override - protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection servConn, + protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection, PartitionedRegion pr, byte nwHop) throws IOException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll.java index 22e63c6..5f7cb29 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll.java @@ -44,33 +44,33 @@ public class GetAll extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keysPart = null; String regionName = null; Object[] keys = null; - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); // Retrieve the region name from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); // Retrieve the keys array from the message parts - keysPart = msg.getPart(1); + keysPart = clientMessage.getPart(1); try { keys = (Object[]) keysPart.getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { StringBuffer buffer = new StringBuffer(); - buffer.append(servConn.getName()).append(": Received getAll request (") - .append(msg.getPayloadLength()).append(" bytes) from ").append(servConn.getSocketString()) - .append(" for region ").append(regionName).append(" keys "); + buffer.append(serverConnection.getName()).append(": Received getAll request (") + .append(clientMessage.getPayloadLength()).append(" bytes) from ").append(serverConnection.getSocketString()) + .append(" for region ").append(regionName).append(" keys "); if (keys != null) { for (int i = 0; i < keys.length; i++) { buffer.append(keys[i]).append(" "); @@ -91,37 +91,37 @@ public class GetAll extends BaseCommand { message = LocalizedStrings.GetAll_THE_INPUT_REGION_NAME_FOR_THE_GETALL_REQUEST_IS_NULL .toLocalizedString(); } - logger.warn("{}: {}", servConn.getName(), message); - writeChunkedErrorResponse(msg, MessageType.GET_ALL_DATA_ERROR, message, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), message); + writeChunkedErrorResponse(clientMessage, MessageType.GET_ALL_DATA_ERROR, message, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = " was not found during getAll request"; - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // Send header - ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); chunkedResponseMsg.setMessageType(MessageType.RESPONSE); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { - fillAndSendGetAllResponseChunks(region, regionName, keys, servConn); - servConn.setAsTrue(RESPONDED); + fillAndSendGetAllResponseChunks(region, regionName, keys, serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // Otherwise, write an exception message and continue - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -142,14 +142,14 @@ public class GetAll extends BaseCommand { numKeys = allKeys.size(); } - ObjectPartList values = new ObjectPartList(maximumChunkSize, keys == null); + ObjectPartList values = new ObjectPartList(MAXIMUM_CHUNK_SIZE, keys == null); AuthorizeRequest authzRequest = servConn.getAuthzRequest(); AuthorizeRequestPP postAuthzRequest = servConn.getPostAuthzRequest(); Request request = (Request) Request.getCommand(); Object[] valueAndIsObject = new Object[3]; for (int i = 0; i < numKeys; i++) { // Send the intermediate chunk if necessary - if (values.size() == maximumChunkSize) { + if (values.size() == MAXIMUM_CHUNK_SIZE) { // Send the chunk and clear the list sendGetAllResponseChunk(region, values, false, servConn); values.clear(); @@ -246,7 +246,7 @@ public class GetAll extends BaseCommand { ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); chunkedResponseMsg.setNumberOfParts(1); chunkedResponseMsg.setLastChunk(lastChunk); - chunkedResponseMsg.addObjPart(list, zipValues); + chunkedResponseMsg.addObjPart(list, false); if (logger.isDebugEnabled()) { logger.debug("{}: Sending {} getAll response chunk for region={} values={} chunk=<{}>", http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll651.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll651.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll651.java index a19d540..b0a1915 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll651.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll651.java @@ -21,7 +21,6 @@ import java.util.Set; import org.apache.geode.cache.Region; import org.apache.geode.cache.operations.GetOperationContext; import org.apache.geode.internal.cache.LocalRegion; -import org.apache.geode.internal.cache.tier.CachedRegionHelper; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -45,33 +44,33 @@ public class GetAll651 extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keysPart = null; String regionName = null; Object[] keys = null; - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); // Retrieve the region name from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); // Retrieve the keys array from the message parts - keysPart = msg.getPart(1); + keysPart = clientMessage.getPart(1); try { keys = (Object[]) keysPart.getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { StringBuffer buffer = new StringBuffer(); - buffer.append(servConn.getName()).append(": Received getAll request (") - .append(msg.getPayloadLength()).append(" bytes) from ").append(servConn.getSocketString()) - .append(" for region ").append(regionName).append(" keys "); + buffer.append(serverConnection.getName()).append(": Received getAll request (") + .append(clientMessage.getPayloadLength()).append(" bytes) from ").append(serverConnection.getSocketString()) + .append(" for region ").append(regionName).append(" keys "); if (keys != null) { for (int i = 0; i < keys.length; i++) { buffer.append(keys[i]).append(" "); @@ -90,37 +89,37 @@ public class GetAll651 extends BaseCommand { message = LocalizedStrings.GetAll_THE_INPUT_REGION_NAME_FOR_THE_GETALL_REQUEST_IS_NULL .toLocalizedString(); } - logger.warn("{}: {}", servConn.getName(), message); - writeChunkedErrorResponse(msg, MessageType.GET_ALL_DATA_ERROR, message, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), message); + writeChunkedErrorResponse(clientMessage, MessageType.GET_ALL_DATA_ERROR, message, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = " was not found during getAll request"; - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // Send header - ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); chunkedResponseMsg.setMessageType(MessageType.RESPONSE); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { - fillAndSendGetAllResponseChunks(region, regionName, keys, servConn); - servConn.setAsTrue(RESPONDED); + fillAndSendGetAllResponseChunks(region, regionName, keys, serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // Otherwise, write an exception message and continue - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -148,7 +147,7 @@ public class GetAll651 extends BaseCommand { final boolean isDebugEnabled = logger.isDebugEnabled(); for (int i = 0; i < numKeys; i++) { // Send the intermediate chunk if necessary - if (values.size() == maximumChunkSize) { + if (values.size() == MAXIMUM_CHUNK_SIZE) { // Send the chunk and clear the list sendGetAllResponseChunk(region, values, false, servConn); values.clear(); @@ -253,7 +252,7 @@ public class GetAll651 extends BaseCommand { * @param includeKeys if the part list should include the keys */ protected ObjectPartList651 getObjectPartsList(boolean includeKeys) { - ObjectPartList651 values = new ObjectPartList651(maximumChunkSize, includeKeys); + ObjectPartList651 values = new ObjectPartList651(MAXIMUM_CHUNK_SIZE, includeKeys); return values; } @@ -262,7 +261,7 @@ public class GetAll651 extends BaseCommand { ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); chunkedResponseMsg.setNumberOfParts(1); chunkedResponseMsg.setLastChunk(lastChunk); - chunkedResponseMsg.addObjPart(list, zipValues); + chunkedResponseMsg.addObjPart(list, false); if (logger.isDebugEnabled()) { logger.debug("{}: Sending {} getAll response chunk for region={} values={} chunk=<{}>", http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll70.java index 154e800..579593f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll70.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll70.java @@ -23,7 +23,6 @@ import org.apache.geode.cache.operations.GetOperationContext; import org.apache.geode.cache.operations.internal.GetOperationContextImpl; import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.LocalRegion; -import org.apache.geode.internal.cache.tier.CachedRegionHelper; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -40,7 +39,6 @@ import org.apache.geode.internal.offheap.OffHeapHelper; import org.apache.geode.internal.offheap.annotations.Retained; import org.apache.geode.internal.security.AuthorizeRequest; import org.apache.geode.internal.security.AuthorizeRequestPP; -import org.apache.geode.internal.security.SecurityService; import org.apache.geode.security.NotAuthorizedException; public class GetAll70 extends BaseCommand { @@ -52,36 +50,36 @@ public class GetAll70 extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keysPart = null; String regionName = null; Object[] keys = null; - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); int partIdx = 0; // Retrieve the region name from the message parts - regionNamePart = msg.getPart(partIdx++); + regionNamePart = clientMessage.getPart(partIdx++); regionName = regionNamePart.getString(); // Retrieve the keys array from the message parts - keysPart = msg.getPart(partIdx++); + keysPart = clientMessage.getPart(partIdx++); try { keys = (Object[]) keysPart.getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean requestSerializedValues; - requestSerializedValues = msg.getPart(partIdx++).getInt() == 1; + requestSerializedValues = clientMessage.getPart(partIdx++).getInt() == 1; if (logger.isDebugEnabled()) { StringBuffer buffer = new StringBuffer(); - buffer.append(servConn.getName()).append(": Received getAll request (") - .append(msg.getPayloadLength()).append(" bytes) from ").append(servConn.getSocketString()) - .append(" for region ").append(regionName).append(" keys "); + buffer.append(serverConnection.getName()).append(": Received getAll request (") + .append(clientMessage.getPayloadLength()).append(" bytes) from ").append(serverConnection.getSocketString()) + .append(" for region ").append(regionName).append(" keys "); if (keys != null) { for (int i = 0; i < keys.length; i++) { buffer.append(keys[i]).append(" "); @@ -100,37 +98,37 @@ public class GetAll70 extends BaseCommand { message = LocalizedStrings.GetAll_THE_INPUT_REGION_NAME_FOR_THE_GETALL_REQUEST_IS_NULL .toLocalizedString(); } - logger.warn("{}: {}", servConn.getName(), message); - writeChunkedErrorResponse(msg, MessageType.GET_ALL_DATA_ERROR, message, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), message); + writeChunkedErrorResponse(clientMessage, MessageType.GET_ALL_DATA_ERROR, message, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = " was not found during getAll request"; - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // Send header - ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); chunkedResponseMsg.setMessageType(MessageType.RESPONSE); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { - fillAndSendGetAllResponseChunks(region, regionName, keys, servConn, requestSerializedValues); - servConn.setAsTrue(RESPONDED); + fillAndSendGetAllResponseChunks(region, regionName, keys, serverConnection, requestSerializedValues); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // Otherwise, write an exception message and continue - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -163,7 +161,7 @@ public class GetAll70 extends BaseCommand { // in the old mode (which may be impossible since we only used that mode pre 7.0) in which the // client told us // to get and return all the keys and values. I think this was used for register interest. - VersionedObjectList values = new VersionedObjectList(maximumChunkSize, keys == null, + VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, keys == null, region.getAttributes().getConcurrencyChecksEnabled(), requestSerializedValues); try { AuthorizeRequest authzRequest = servConn.getAuthzRequest(); @@ -172,7 +170,7 @@ public class GetAll70 extends BaseCommand { final boolean isDebugEnabled = logger.isDebugEnabled(); for (int i = 0; i < numKeys; i++) { // Send the intermediate chunk if necessary - if (values.size() == maximumChunkSize) { + if (values.size() == MAXIMUM_CHUNK_SIZE) { // Send the chunk and clear the list values.setKeys(null); sendGetAllResponseChunk(region, values, false, servConn); http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllForRI.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllForRI.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllForRI.java index d380beb..43d3348 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllForRI.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllForRI.java @@ -38,7 +38,7 @@ public class GetAllForRI extends GetAll651 { @Override protected ObjectPartList651 getObjectPartsList(boolean includeKeys) { - return new SerializedObjectPartList(maximumChunkSize, includeKeys); + return new SerializedObjectPartList(MAXIMUM_CHUNK_SIZE, includeKeys); } http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllWithCallback.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllWithCallback.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllWithCallback.java index 2fb860d..c6663de 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllWithCallback.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllWithCallback.java @@ -22,7 +22,6 @@ import org.apache.geode.cache.Region; import org.apache.geode.cache.operations.GetOperationContext; import org.apache.geode.cache.operations.internal.GetOperationContextImpl; import org.apache.geode.internal.cache.LocalRegion; -import org.apache.geode.internal.cache.tier.CachedRegionHelper; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -57,44 +56,44 @@ public class GetAllWithCallback extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keysPart = null, callbackPart = null; String regionName = null; Object[] keys = null; Object callback = null; - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); int partIdx = 0; // Retrieve the region name from the message parts - regionNamePart = msg.getPart(partIdx++); + regionNamePart = clientMessage.getPart(partIdx++); regionName = regionNamePart.getString(); // Retrieve the keys array from the message parts - keysPart = msg.getPart(partIdx++); + keysPart = clientMessage.getPart(partIdx++); try { keys = (Object[]) keysPart.getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - callbackPart = msg.getPart(partIdx++); + callbackPart = clientMessage.getPart(partIdx++); try { callback = callbackPart.getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { StringBuffer buffer = new StringBuffer(); - buffer.append(servConn.getName()).append(": Received getAll request (") - .append(msg.getPayloadLength()).append(" bytes) from ").append(servConn.getSocketString()) - .append(" for region ").append(regionName).append(" with callback ").append(callback) - .append(" keys "); + buffer.append(serverConnection.getName()).append(": Received getAll request (") + .append(clientMessage.getPayloadLength()).append(" bytes) from ").append(serverConnection.getSocketString()) + .append(" for region ").append(regionName).append(" with callback ").append(callback) + .append(" keys "); if (keys != null) { for (int i = 0; i < keys.length; i++) { buffer.append(keys[i]).append(" "); @@ -114,35 +113,35 @@ public class GetAllWithCallback extends BaseCommand { .toLocalizedString(); } logger.warn(LocalizedMessage.create(LocalizedStrings.TWO_ARG_COLON, - new Object[] {servConn.getName(), message})); - writeChunkedErrorResponse(msg, MessageType.GET_ALL_DATA_ERROR, message, servConn); - servConn.setAsTrue(RESPONDED); + new Object[] { serverConnection.getName(), message})); + writeChunkedErrorResponse(clientMessage, MessageType.GET_ALL_DATA_ERROR, message, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = " was not found during getAll request"; - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // Send header - ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); chunkedResponseMsg.setMessageType(MessageType.RESPONSE); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { - fillAndSendGetAllResponseChunks(region, regionName, keys, servConn, callback); - servConn.setAsTrue(RESPONDED); + fillAndSendGetAllResponseChunks(region, regionName, keys, serverConnection, callback); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // Otherwise, write an exception message and continue - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -154,7 +153,7 @@ public class GetAllWithCallback extends BaseCommand { assert keys != null; int numKeys = keys.length; - VersionedObjectList values = new VersionedObjectList(maximumChunkSize, false, + VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, false, region.getAttributes().getConcurrencyChecksEnabled(), false); try { AuthorizeRequest authzRequest = servConn.getAuthzRequest(); @@ -162,7 +161,7 @@ public class GetAllWithCallback extends BaseCommand { Get70 request = (Get70) Get70.getCommand(); for (int i = 0; i < numKeys; i++) { // Send the intermediate chunk if necessary - if (values.size() == maximumChunkSize) { + if (values.size() == MAXIMUM_CHUNK_SIZE) { // Send the chunk and clear the list sendGetAllResponseChunk(region, values, false, servConn); values.clear(); http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPRMetadataCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPRMetadataCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPRMetadataCommand.java index a3e565d..bcdbd08 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPRMetadataCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPRMetadataCommand.java @@ -49,20 +49,19 @@ public class GetClientPRMetadataCommand extends BaseCommand { private GetClientPRMetadataCommand() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException, InterruptedException { String regionFullPath = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - regionFullPath = msg.getPart(0).getString(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + regionFullPath = clientMessage.getPart(0).getString(); String errMessage = ""; if (regionFullPath == null) { logger.warn(LocalizedMessage .create(LocalizedStrings.GetClientPRMetadata_THE_INPUT_REGION_PATH_IS_NULL)); errMessage = LocalizedStrings.GetClientPRMetadata_THE_INPUT_REGION_PATH_IS_NULL.toLocalizedString(); - writeErrorResponse(msg, MessageType.GET_CLIENT_PR_METADATA_ERROR, errMessage.toString(), - servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.GET_CLIENT_PR_METADATA_ERROR, errMessage.toString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); } else { Region region = crHelper.getRegion(regionFullPath); if (region == null) { @@ -71,13 +70,12 @@ public class GetClientPRMetadataCommand extends BaseCommand { regionFullPath)); errMessage = LocalizedStrings.GetClientPRMetadata_REGION_NOT_FOUND.toLocalizedString() + regionFullPath; - writeErrorResponse(msg, MessageType.GET_CLIENT_PR_METADATA_ERROR, errMessage.toString(), - servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.GET_CLIENT_PR_METADATA_ERROR, errMessage.toString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); } else { try { - Message responseMsg = servConn.getResponseMessage(); - responseMsg.setTransactionId(msg.getTransactionId()); + Message responseMsg = serverConnection.getResponseMessage(); + responseMsg.setTransactionId(clientMessage.getTransactionId()); responseMsg.setMessageType(MessageType.RESPONSE_CLIENT_PR_METADATA); PartitionedRegion prRgion = (PartitionedRegion) region; @@ -93,11 +91,11 @@ public class GetClientPRMetadataCommand extends BaseCommand { } } responseMsg.send(); - msg.clearParts(); + clientMessage.clearParts(); } catch (Exception e) { - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); } finally { - servConn.setAsTrue(Command.RESPONDED); + serverConnection.setAsTrue(Command.RESPONDED); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPRMetadataCommand66.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPRMetadataCommand66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPRMetadataCommand66.java index 3961b19..4c519a9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPRMetadataCommand66.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPRMetadataCommand66.java @@ -47,20 +47,19 @@ public class GetClientPRMetadataCommand66 extends BaseCommand { private GetClientPRMetadataCommand66() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException, InterruptedException { String regionFullPath = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - regionFullPath = msg.getPart(0).getString(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + regionFullPath = clientMessage.getPart(0).getString(); String errMessage = ""; if (regionFullPath == null) { logger.warn(LocalizedMessage .create(LocalizedStrings.GetClientPRMetadata_THE_INPUT_REGION_PATH_IS_NULL)); errMessage = LocalizedStrings.GetClientPRMetadata_THE_INPUT_REGION_PATH_IS_NULL.toLocalizedString(); - writeErrorResponse(msg, MessageType.GET_CLIENT_PR_METADATA_ERROR, errMessage.toString(), - servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.GET_CLIENT_PR_METADATA_ERROR, errMessage.toString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); } else { Region region = crHelper.getRegion(regionFullPath); if (region == null) { @@ -69,13 +68,12 @@ public class GetClientPRMetadataCommand66 extends BaseCommand { regionFullPath)); errMessage = LocalizedStrings.GetClientPRMetadata_REGION_NOT_FOUND.toLocalizedString() + regionFullPath; - writeErrorResponse(msg, MessageType.GET_CLIENT_PR_METADATA_ERROR, errMessage.toString(), - servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.GET_CLIENT_PR_METADATA_ERROR, errMessage.toString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); } else { try { - Message responseMsg = servConn.getResponseMessage(); - responseMsg.setTransactionId(msg.getTransactionId()); + Message responseMsg = serverConnection.getResponseMessage(); + responseMsg.setTransactionId(clientMessage.getTransactionId()); responseMsg.setMessageType(MessageType.RESPONSE_CLIENT_PR_METADATA); PartitionedRegion prRgion = (PartitionedRegion) region; @@ -86,11 +84,11 @@ public class GetClientPRMetadataCommand66 extends BaseCommand { responseMsg.addObjPart(serverLocations); } responseMsg.send(); - msg.clearParts(); + clientMessage.clearParts(); } catch (Exception e) { - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); } finally { - servConn.setAsTrue(Command.RESPONDED); + serverConnection.setAsTrue(Command.RESPONDED); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPartitionAttributesCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPartitionAttributesCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPartitionAttributesCommand.java index 7d5c251..6be9353 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPartitionAttributesCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPartitionAttributesCommand.java @@ -45,22 +45,22 @@ public class GetClientPartitionAttributesCommand extends BaseCommand { @SuppressWarnings("unchecked") @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException, InterruptedException { String regionFullPath = null; - regionFullPath = msg.getPart(0).getString(); + regionFullPath = clientMessage.getPart(0).getString(); String errMessage = ""; if (regionFullPath == null) { logger.warn(LocalizedMessage .create(LocalizedStrings.GetClientPartitionAttributes_THE_INPUT_REGION_PATH_IS_NULL)); errMessage = LocalizedStrings.GetClientPartitionAttributes_THE_INPUT_REGION_PATH_IS_NULL .toLocalizedString(); - writeErrorResponse(msg, MessageType.GET_CLIENT_PARTITION_ATTRIBUTES_ERROR, - errMessage.toString(), servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.GET_CLIENT_PARTITION_ATTRIBUTES_ERROR, + errMessage.toString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - Region region = servConn.getCache().getRegion(regionFullPath); + Region region = serverConnection.getCache().getRegion(regionFullPath); if (region == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.GetClientPartitionAttributes_REGION_NOT_FOUND_FOR_SPECIFIED_REGION_PATH, @@ -68,15 +68,15 @@ public class GetClientPartitionAttributesCommand extends BaseCommand { errMessage = LocalizedStrings.GetClientPartitionAttributes_REGION_NOT_FOUND.toLocalizedString() + regionFullPath; - writeErrorResponse(msg, MessageType.GET_CLIENT_PARTITION_ATTRIBUTES_ERROR, - errMessage.toString(), servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.GET_CLIENT_PARTITION_ATTRIBUTES_ERROR, + errMessage.toString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } try { - Message responseMsg = servConn.getResponseMessage(); - responseMsg.setTransactionId(msg.getTransactionId()); + Message responseMsg = serverConnection.getResponseMessage(); + responseMsg.setTransactionId(clientMessage.getTransactionId()); responseMsg.setMessageType(MessageType.RESPONSE_CLIENT_PARTITION_ATTRIBUTES); PartitionedRegion prRgion = (PartitionedRegion) region; @@ -113,11 +113,11 @@ public class GetClientPartitionAttributesCommand extends BaseCommand { } responseMsg.addObjPart(leaderRegionPath); responseMsg.send(); - msg.clearParts(); + clientMessage.clearParts(); } catch (Exception e) { - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); } finally { - servConn.setAsTrue(Command.RESPONDED); + serverConnection.setAsTrue(Command.RESPONDED); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPartitionAttributesCommand66.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPartitionAttributesCommand66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPartitionAttributesCommand66.java index 209c40c..251f4da 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPartitionAttributesCommand66.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetClientPartitionAttributesCommand66.java @@ -50,22 +50,22 @@ public class GetClientPartitionAttributesCommand66 extends BaseCommand { @SuppressWarnings("unchecked") @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException, InterruptedException { String regionFullPath = null; - regionFullPath = msg.getPart(0).getString(); + regionFullPath = clientMessage.getPart(0).getString(); String errMessage = ""; if (regionFullPath == null) { logger.warn(LocalizedMessage .create(LocalizedStrings.GetClientPartitionAttributes_THE_INPUT_REGION_PATH_IS_NULL)); errMessage = LocalizedStrings.GetClientPartitionAttributes_THE_INPUT_REGION_PATH_IS_NULL .toLocalizedString(); - writeErrorResponse(msg, MessageType.GET_CLIENT_PARTITION_ATTRIBUTES_ERROR, - errMessage.toString(), servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.GET_CLIENT_PARTITION_ATTRIBUTES_ERROR, + errMessage.toString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - Region region = servConn.getCache().getRegion(regionFullPath); + Region region = serverConnection.getCache().getRegion(regionFullPath); if (region == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.GetClientPartitionAttributes_REGION_NOT_FOUND_FOR_SPECIFIED_REGION_PATH, @@ -73,15 +73,15 @@ public class GetClientPartitionAttributesCommand66 extends BaseCommand { errMessage = LocalizedStrings.GetClientPartitionAttributes_REGION_NOT_FOUND.toLocalizedString() + regionFullPath; - writeErrorResponse(msg, MessageType.GET_CLIENT_PARTITION_ATTRIBUTES_ERROR, - errMessage.toString(), servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.GET_CLIENT_PARTITION_ATTRIBUTES_ERROR, + errMessage.toString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } try { - Message responseMsg = servConn.getResponseMessage(); - responseMsg.setTransactionId(msg.getTransactionId()); + Message responseMsg = serverConnection.getResponseMessage(); + responseMsg.setTransactionId(clientMessage.getTransactionId()); responseMsg.setMessageType(MessageType.RESPONSE_CLIENT_PARTITION_ATTRIBUTES); if (!(region instanceof PartitionedRegion)) { @@ -138,11 +138,11 @@ public class GetClientPartitionAttributesCommand66 extends BaseCommand { } } responseMsg.send(); - msg.clearParts(); + clientMessage.clearParts(); } catch (Exception e) { - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); } finally { - servConn.setAsTrue(Command.RESPONDED); + serverConnection.setAsTrue(Command.RESPONDED); } }