http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest.java index 52a929f..afb0f2c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest.java @@ -46,51 +46,51 @@ public class RegisterInterest extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keyPart = null; String regionName = null; Object key = null; - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); // bserverStats.incLong(readDestroyRequestTimeId, // DistributionStats.getStatTime() - start); // bserverStats.incInt(destroyRequestsId, 1); // start = DistributionStats.getStatTime(); // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); InterestResultPolicy policy = null; // Retrieve the interest type - int interestType = msg.getPart(1).getInt(); + int interestType = clientMessage.getPart(1).getInt(); // Retrieve the InterestResultPolicy try { - policy = (InterestResultPolicy) msg.getPart(2).getObject(); + policy = (InterestResultPolicy) clientMessage.getPart(2).getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean isDurable = false; try { - Part durablePart = msg.getPart(3); + Part durablePart = clientMessage.getPart(3); byte[] durablePartBytes = (byte[]) durablePart.getObject(); isDurable = durablePartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // Retrieve the key - keyPart = msg.getPart(4); + keyPart = clientMessage.getPart(4); regionName = regionNamePart.getString(); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -98,21 +98,21 @@ public class RegisterInterest extends BaseCommand { // VJR: Check for a sixth part for client version 6.0.3 onwards for the // time being until refactoring into a new command version. - if (msg.getNumberOfParts() > 5) { + if (clientMessage.getNumberOfParts() > 5) { try { - Part notifyPart = msg.getPart(5); + Part notifyPart = clientMessage.getPart(5); byte[] notifyPartBytes = (byte[]) notifyPart.getObject(); sendUpdatesAsInvalidates = notifyPartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } if (logger.isDebugEnabled()) { logger.debug("{}: Received register interest request ({} bytes) from {} for region {} key {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key); + serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), regionName, key); } // Process the register interest request @@ -126,19 +126,19 @@ public class RegisterInterest extends BaseCommand { message = LocalizedStrings.RegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_REGISTER_INTEREST_REQUEST_IS_NULL; } - logger.warn("{}: {}", servConn.getName(), message.toLocalizedString()); - writeChunkedErrorResponse(msg, MessageType.REGISTER_INTEREST_DATA_ERROR, - message.toLocalizedString(), servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), message.toLocalizedString()); + writeChunkedErrorResponse(clientMessage, MessageType.REGISTER_INTEREST_DATA_ERROR, + message.toLocalizedString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // input key not null - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { logger.info(LocalizedMessage.create( LocalizedStrings.RegisterInterest_0_REGION_NAMED_1_WAS_NOT_FOUND_DURING_REGISTER_INTEREST_REQUEST, - new Object[] {servConn.getName(), regionName})); + new Object[] { serverConnection.getName(), regionName})); // writeChunkedErrorResponse(msg, // MessageType.REGISTER_INTEREST_DATA_ERROR, message); // responded = true; @@ -151,7 +151,7 @@ public class RegisterInterest extends BaseCommand { this.securityService.authorizeRegionRead(regionName, key.toString()); } - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { RegisterInterestOperationContext registerContext = @@ -159,14 +159,14 @@ public class RegisterInterest extends BaseCommand { key = registerContext.getKey(); } } - servConn.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, key, - servConn.getProxyID(), interestType, isDurable, sendUpdatesAsInvalidates, false, 0, true); + serverConnection.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, key, + serverConnection.getProxyID(), interestType, isDurable, sendUpdatesAsInvalidates, false, 0, true); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // Otherwise, write an exception message and continue - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -178,45 +178,45 @@ public class RegisterInterest extends BaseCommand { // start = DistributionStats.getStatTime(); CacheClientProxy ccp = - servConn.getAcceptor().getCacheClientNotifier().getClientProxy(servConn.getProxyID()); + serverConnection.getAcceptor().getCacheClientNotifier().getClientProxy(serverConnection.getProxyID()); if (ccp == null) { // fix for 37593 IOException ioex = new IOException( LocalizedStrings.RegisterInterest_CACHECLIENTPROXY_FOR_THIS_CLIENT_IS_NO_LONGER_ON_THE_SERVER_SO_REGISTERINTEREST_OPERATION_IS_UNSUCCESSFUL .toLocalizedString()); - writeChunkedException(msg, ioex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, ioex, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean isPrimary = ccp.isPrimary(); - ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getRegisterInterestResponseMessage(); if (!isPrimary) { chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_SECONDARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); chunkedResponseMsg.setLastChunk(true); if (logger.isDebugEnabled()) { logger.debug( "{}: Sending register interest response chunk from secondary for region: {} for key: {} chunk=<{}>", - servConn.getName(), regionName, key, chunkedResponseMsg); + serverConnection.getName(), regionName, key, chunkedResponseMsg); } - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } // !isPrimary else { // isPrimary // Send header which describes how many chunks will follow chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_PRIMARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { - fillAndSendRegisterInterestResponseChunks(region, key, interestType, policy, servConn); - servConn.setAsTrue(RESPONDED); + fillAndSendRegisterInterestResponseChunks(region, key, interestType, policy, serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn, chunkedResponseMsg); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection, chunkedResponseMsg); + serverConnection.setAsTrue(RESPONDED); return; } @@ -224,7 +224,7 @@ public class RegisterInterest extends BaseCommand { // logger.debug(getName() + ": Sent chunk (1 of 1) of register interest // response (" + chunkedResponseMsg.getBufferLength() + " bytes) for // region " + regionName + " key " + key); - logger.debug("{}: Sent register interest response for region {} key {}", servConn.getName(), + logger.debug("{}: Sent register interest response for region {} key {}", serverConnection.getName(), regionName, key); } // bserverStats.incLong(writeDestroyResponseTimeId,
http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest61.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest61.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest61.java index 5ddb241..af423ca 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest61.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest61.java @@ -60,67 +60,67 @@ public class RegisterInterest61 extends BaseCommand { RegisterInterest61() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keyPart = null; String regionName = null; Object key = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); // bserverStats.incLong(readDestroyRequestTimeId, // DistributionStats.getStatTime() - start); // bserverStats.incInt(destroyRequestsId, 1); // start = DistributionStats.getStatTime(); // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); InterestResultPolicy policy = null; // Retrieve the interest type - int interestType = msg.getPart(1).getInt(); + int interestType = clientMessage.getPart(1).getInt(); // Retrieve the InterestResultPolicy try { - policy = (InterestResultPolicy) msg.getPart(2).getObject(); + policy = (InterestResultPolicy) clientMessage.getPart(2).getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean isDurable = false; try { - Part durablePart = msg.getPart(3); + Part durablePart = clientMessage.getPart(3); byte[] durablePartBytes = (byte[]) durablePart.getObject(); isDurable = durablePartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // region data policy byte[] regionDataPolicyPartBytes; boolean serializeValues = false; try { - Part regionDataPolicyPart = msg.getPart(msg.getNumberOfParts() - 1); + Part regionDataPolicyPart = clientMessage.getPart(clientMessage.getNumberOfParts() - 1); regionDataPolicyPartBytes = (byte[]) regionDataPolicyPart.getObject(); - if (servConn.getClientVersion().compareTo(Version.GFE_80) >= 0) { + if (serverConnection.getClientVersion().compareTo(Version.GFE_80) >= 0) { // The second byte here is serializeValues serializeValues = regionDataPolicyPartBytes[1] == (byte) 0x01; } } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // Retrieve the key - keyPart = msg.getPart(4); + keyPart = clientMessage.getPart(4); regionName = regionNamePart.getString(); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -128,14 +128,14 @@ public class RegisterInterest61 extends BaseCommand { // VJR: Check for a sixth part for client version 6.0.3 onwards for the // time being until refactoring into a new command version. - if (msg.getNumberOfParts() > 5) { + if (clientMessage.getNumberOfParts() > 5) { try { - Part notifyPart = msg.getPart(5); + Part notifyPart = clientMessage.getPart(5); byte[] notifyPartBytes = (byte[]) notifyPart.getObject(); sendUpdatesAsInvalidates = notifyPartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -143,7 +143,7 @@ public class RegisterInterest61 extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug( "{}: Received register interest 61 request ({} bytes) from {} for region {} key {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key); + serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), regionName, key); } // test hook to trigger vMotion during register Interest @@ -164,19 +164,19 @@ public class RegisterInterest61 extends BaseCommand { message = LocalizedStrings.RegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_REGISTER_INTEREST_REQUEST_IS_NULL; } - logger.warn("{}: {}", servConn.getName(), message.toLocalizedString()); - writeChunkedErrorResponse(msg, MessageType.REGISTER_INTEREST_DATA_ERROR, - message.toLocalizedString(), servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), message.toLocalizedString()); + writeChunkedErrorResponse(clientMessage, MessageType.REGISTER_INTEREST_DATA_ERROR, + message.toLocalizedString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // input key not null - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { logger.info(LocalizedMessage.create( LocalizedStrings.RegisterInterest_0_REGION_NAMED_1_WAS_NOT_FOUND_DURING_REGISTER_INTEREST_REQUEST, - new Object[] {servConn.getName(), regionName})); + new Object[] { serverConnection.getName(), regionName})); // writeChunkedErrorResponse(msg, // MessageType.REGISTER_INTEREST_DATA_ERROR, message); // responded = true; @@ -190,7 +190,7 @@ public class RegisterInterest61 extends BaseCommand { this.securityService.authorizeRegionRead(regionName, key.toString()); } - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { RegisterInterestOperationContext registerContext = @@ -198,15 +198,15 @@ public class RegisterInterest61 extends BaseCommand { key = registerContext.getKey(); } } - servConn.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, key, - servConn.getProxyID(), interestType, isDurable, sendUpdatesAsInvalidates, true, + serverConnection.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, key, + serverConnection.getProxyID(), interestType, isDurable, sendUpdatesAsInvalidates, true, regionDataPolicyPartBytes[0], true); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // Otherwise, write an exception message and continue - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -218,46 +218,46 @@ public class RegisterInterest61 extends BaseCommand { // start = DistributionStats.getStatTime(); CacheClientProxy ccp = - servConn.getAcceptor().getCacheClientNotifier().getClientProxy(servConn.getProxyID()); + serverConnection.getAcceptor().getCacheClientNotifier().getClientProxy(serverConnection.getProxyID()); if (ccp == null) { // fix for 37593 IOException ioex = new IOException( LocalizedStrings.RegisterInterest_CACHECLIENTPROXY_FOR_THIS_CLIENT_IS_NO_LONGER_ON_THE_SERVER_SO_REGISTERINTEREST_OPERATION_IS_UNSUCCESSFUL .toLocalizedString()); - writeChunkedException(msg, ioex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, ioex, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean isPrimary = ccp.isPrimary(); - ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getRegisterInterestResponseMessage(); if (!isPrimary) { chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_SECONDARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); chunkedResponseMsg.setLastChunk(true); if (logger.isDebugEnabled()) { logger.debug( "{}: Sending register interest response chunk from secondary for region: {} for key: {} chunk=<{}>", - servConn.getName(), regionName, key, chunkedResponseMsg); + serverConnection.getName(), regionName, key, chunkedResponseMsg); } - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } // !isPrimary else { // isPrimary // Send header which describes how many chunks will follow chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_PRIMARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { fillAndSendRegisterInterestResponseChunks(region, key, interestType, serializeValues, - policy, servConn); - servConn.setAsTrue(RESPONDED); + policy, serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn, chunkedResponseMsg); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection, chunkedResponseMsg); + serverConnection.setAsTrue(RESPONDED); return; } @@ -265,7 +265,7 @@ public class RegisterInterest61 extends BaseCommand { // logger.debug(getName() + ": Sent chunk (1 of 1) of register interest // response (" + chunkedResponseMsg.getBufferLength() + " bytes) for // region " + regionName + " key " + key); - logger.debug("{}: Sent register interest response for region {} key {}", servConn.getName(), + logger.debug("{}: Sent register interest response for region {} key {}", serverConnection.getName(), regionName, key); } // bserverStats.incLong(writeDestroyResponseTimeId, http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList.java index cd16790..4206e19 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList.java @@ -49,57 +49,57 @@ public class RegisterInterestList extends BaseCommand { RegisterInterestList() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keyPart = null, numberOfKeysPart = null; String regionName = null; Object key = null; InterestResultPolicy policy; List keys = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); int numberOfKeys = 0, partNumber = 0; - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); - ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + ChunkedMessage chunkedResponseMsg = serverConnection.getRegisterInterestResponseMessage(); // bserverStats.incLong(readDestroyRequestTimeId, // DistributionStats.getStatTime() - start); // bserverStats.incInt(destroyRequestsId, 1); // start = DistributionStats.getStatTime(); // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); // Retrieve the InterestResultPolicy try { - policy = (InterestResultPolicy) msg.getPart(1).getObject(); + policy = (InterestResultPolicy) clientMessage.getPart(1).getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean isDurable = false; try { - Part durablePart = msg.getPart(2); + Part durablePart = clientMessage.getPart(2); byte[] durablePartBytes = (byte[]) durablePart.getObject(); isDurable = durablePartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - numberOfKeysPart = msg.getPart(3); + numberOfKeysPart = clientMessage.getPart(3); numberOfKeys = numberOfKeysPart.getInt(); partNumber = 4; keys = new ArrayList(); for (int i = 0; i < numberOfKeys; i++) { - keyPart = msg.getPart(partNumber + i); + keyPart = clientMessage.getPart(partNumber + i); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } keys.add(key); @@ -109,14 +109,14 @@ public class RegisterInterestList extends BaseCommand { // VJR: Check for an extra part for client version 6.0.3 onwards for the // time being until refactoring into a new command version. - if (msg.getNumberOfParts() > (numberOfKeys + partNumber)) { + if (clientMessage.getNumberOfParts() > (numberOfKeys + partNumber)) { try { - Part notifyPart = msg.getPart(numberOfKeys + partNumber); + Part notifyPart = clientMessage.getPart(numberOfKeys + partNumber); byte[] notifyPartBytes = (byte[]) notifyPart.getObject(); sendUpdatesAsInvalidates = notifyPartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -124,7 +124,7 @@ public class RegisterInterestList extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug( "{}: Received register interest request ({} bytes) from {} for the following {} keys in region {}: {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), numberOfKeys, + serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), numberOfKeys, regionName, keys); } @@ -154,25 +154,25 @@ public class RegisterInterestList extends BaseCommand { LocalizedStrings.RegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_REGISTER_INTEREST_REQUEST_IS_NULL; } String s = errMessage.toLocalizedString(); - logger.warn("{}: {}", servConn.getName(), s); - writeChunkedErrorResponse(msg, MessageType.REGISTER_INTEREST_DATA_ERROR, s, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), s); + writeChunkedErrorResponse(clientMessage, MessageType.REGISTER_INTEREST_DATA_ERROR, s, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // key not null - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { logger.info(LocalizedMessage.create( LocalizedStrings.RegisterInterestList_0_REGION_NAMED_1_WAS_NOT_FOUND_DURING_REGISTER_INTEREST_LIST_REQUEST, - new Object[] {servConn.getName(), regionName})); + new Object[] { serverConnection.getName(), regionName})); // writeChunkedErrorResponse(msg, // MessageType.REGISTER_INTEREST_DATA_ERROR, message); // responded = true; } // else { // region not null try { this.securityService.authorizeRegionRead(regionName); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { RegisterInterestOperationContext registerContext = @@ -181,14 +181,14 @@ public class RegisterInterestList extends BaseCommand { } } // Register interest - servConn.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, keys, - servConn.getProxyID(), isDurable, sendUpdatesAsInvalidates, false, 0, true); + serverConnection.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, keys, + serverConnection.getProxyID(), isDurable, sendUpdatesAsInvalidates, false, 0, true); } catch (Exception ex) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, ex); + checkForInterrupt(serverConnection, ex); // Otherwise, write an exception message and continue - writeChunkedException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, ex, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -197,36 +197,36 @@ public class RegisterInterestList extends BaseCommand { // DistributionStats.getStatTime() - start); // start = DistributionStats.getStatTime(); - boolean isPrimary = servConn.getAcceptor().getCacheClientNotifier() - .getClientProxy(servConn.getProxyID()).isPrimary(); + boolean isPrimary = serverConnection.getAcceptor().getCacheClientNotifier() + .getClientProxy(serverConnection.getProxyID()).isPrimary(); if (!isPrimary) { chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_SECONDARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); chunkedResponseMsg.setLastChunk(true); if (logger.isDebugEnabled()) { logger.debug( "{}: Sending register interest response chunk from secondary for region: {} for key: {} chunk=<{}>", - servConn.getName(), regionName, key, chunkedResponseMsg); + serverConnection.getName(), regionName, key, chunkedResponseMsg); } - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } else { // isPrimary // Send header which describes how many chunks will follow chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_PRIMARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { - fillAndSendRegisterInterestResponseChunks(region, keys, InterestType.KEY, policy, servConn); - servConn.setAsTrue(RESPONDED); + fillAndSendRegisterInterestResponseChunks(region, keys, InterestType.KEY, policy, serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // otherwise send the exception back to client - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -236,7 +236,7 @@ public class RegisterInterestList extends BaseCommand { // region " + regionName + " key " + key); logger.debug( "{}: Sent register interest response for the following {} keys in region {}: {}", - servConn.getName(), numberOfKeys, regionName, keys); + serverConnection.getName(), numberOfKeys, regionName, keys); } // bserverStats.incLong(writeDestroyResponseTimeId, // DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList61.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList61.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList61.java index 6e006ca..8eb6c4a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList61.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList61.java @@ -49,67 +49,67 @@ public class RegisterInterestList61 extends BaseCommand { RegisterInterestList61() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keyPart = null, numberOfKeysPart = null; String regionName = null; Object key = null; InterestResultPolicy policy; List keys = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); int numberOfKeys = 0, partNumber = 0; - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); - ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + ChunkedMessage chunkedResponseMsg = serverConnection.getRegisterInterestResponseMessage(); // bserverStats.incLong(readDestroyRequestTimeId, // DistributionStats.getStatTime() - start); // bserverStats.incInt(destroyRequestsId, 1); // start = DistributionStats.getStatTime(); // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); // Retrieve the InterestResultPolicy try { - policy = (InterestResultPolicy) msg.getPart(1).getObject(); + policy = (InterestResultPolicy) clientMessage.getPart(1).getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean isDurable = false; try { - Part durablePart = msg.getPart(2); + Part durablePart = clientMessage.getPart(2); byte[] durablePartBytes = (byte[]) durablePart.getObject(); isDurable = durablePartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // region data policy byte[] regionDataPolicyPartBytes; try { - Part regionDataPolicyPart = msg.getPart(msg.getNumberOfParts() - 1); + Part regionDataPolicyPart = clientMessage.getPart(clientMessage.getNumberOfParts() - 1); regionDataPolicyPartBytes = (byte[]) regionDataPolicyPart.getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - numberOfKeysPart = msg.getPart(3); + numberOfKeysPart = clientMessage.getPart(3); numberOfKeys = numberOfKeysPart.getInt(); partNumber = 4; keys = new ArrayList(); for (int i = 0; i < numberOfKeys; i++) { - keyPart = msg.getPart(partNumber + i); + keyPart = clientMessage.getPart(partNumber + i); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } keys.add(key); @@ -119,14 +119,14 @@ public class RegisterInterestList61 extends BaseCommand { // VJR: Check for an extra part for client version 6.0.3 onwards for the // time being until refactoring into a new command version. - if (msg.getNumberOfParts() > (numberOfKeys + partNumber)) { + if (clientMessage.getNumberOfParts() > (numberOfKeys + partNumber)) { try { - Part notifyPart = msg.getPart(numberOfKeys + partNumber); + Part notifyPart = clientMessage.getPart(numberOfKeys + partNumber); byte[] notifyPartBytes = (byte[]) notifyPart.getObject(); sendUpdatesAsInvalidates = notifyPartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -134,7 +134,7 @@ public class RegisterInterestList61 extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug( "{}: Received register interest 61 request ({} bytes) from {} for the following {} keys in region {}: {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), numberOfKeys, + serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), numberOfKeys, regionName, keys); } @@ -164,25 +164,25 @@ public class RegisterInterestList61 extends BaseCommand { LocalizedStrings.RegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_REGISTER_INTEREST_REQUEST_IS_NULL; } String s = errMessage.toLocalizedString(); - logger.warn("{}: {}", servConn.getName(), s); - writeChunkedErrorResponse(msg, MessageType.REGISTER_INTEREST_DATA_ERROR, s, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), s); + writeChunkedErrorResponse(clientMessage, MessageType.REGISTER_INTEREST_DATA_ERROR, s, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { logger.info(LocalizedMessage.create( LocalizedStrings.RegisterInterestList_0_REGION_NAMED_1_WAS_NOT_FOUND_DURING_REGISTER_INTEREST_LIST_REQUEST, - new Object[] {servConn.getName(), regionName})); + new Object[] { serverConnection.getName(), regionName})); // writeChunkedErrorResponse(msg, // MessageType.REGISTER_INTEREST_DATA_ERROR, message); // responded = true; } // else { // region not null try { this.securityService.authorizeRegionRead(regionName); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { RegisterInterestOperationContext registerContext = @@ -191,15 +191,15 @@ public class RegisterInterestList61 extends BaseCommand { } } // Register interest - servConn.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, keys, - servConn.getProxyID(), isDurable, sendUpdatesAsInvalidates, true, + serverConnection.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, keys, + serverConnection.getProxyID(), isDurable, sendUpdatesAsInvalidates, true, regionDataPolicyPartBytes[0], true); } catch (Exception ex) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, ex); + checkForInterrupt(serverConnection, ex); // Otherwise, write an exception message and continue - writeChunkedException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, ex, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -208,36 +208,36 @@ public class RegisterInterestList61 extends BaseCommand { // DistributionStats.getStatTime() - start); // start = DistributionStats.getStatTime(); - boolean isPrimary = servConn.getAcceptor().getCacheClientNotifier() - .getClientProxy(servConn.getProxyID()).isPrimary(); + boolean isPrimary = serverConnection.getAcceptor().getCacheClientNotifier() + .getClientProxy(serverConnection.getProxyID()).isPrimary(); if (!isPrimary) { chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_SECONDARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); chunkedResponseMsg.setLastChunk(true); if (logger.isDebugEnabled()) { logger.debug( "{}: Sending register interest response chunk from secondary for region: {} for key: {} chunk=<{}>", - servConn.getName(), regionName, key, chunkedResponseMsg); + serverConnection.getName(), regionName, key, chunkedResponseMsg); } - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } else { // isPrimary // Send header which describes how many chunks will follow chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_PRIMARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { - fillAndSendRegisterInterestResponseChunks(region, keys, InterestType.KEY, policy, servConn); - servConn.setAsTrue(RESPONDED); + fillAndSendRegisterInterestResponseChunks(region, keys, InterestType.KEY, policy, serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // otherwise send the exception back to client - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -247,7 +247,7 @@ public class RegisterInterestList61 extends BaseCommand { // region " + regionName + " key " + key); logger.debug( "{}: Sent register interest response for the following {} keys in region {}: {}", - servConn.getName(), numberOfKeys, regionName, keys); + serverConnection.getName(), numberOfKeys, regionName, keys); } // bserverStats.incLong(writeDestroyResponseTimeId, // DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList66.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList66.java index 8a61364..14198cc 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList66.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList66.java @@ -55,87 +55,87 @@ public class RegisterInterestList66 extends BaseCommand { RegisterInterestList66() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keyPart = null;// numberOfKeysPart = null; String regionName = null; Object key = null; InterestResultPolicy policy; List keys = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); int numberOfKeys = 0, partNumber = 0; - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); - ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + ChunkedMessage chunkedResponseMsg = serverConnection.getRegisterInterestResponseMessage(); // bserverStats.incLong(readDestroyRequestTimeId, // DistributionStats.getStatTime() - start); // bserverStats.incInt(destroyRequestsId, 1); // start = DistributionStats.getStatTime(); // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); // Retrieve the InterestResultPolicy try { - policy = (InterestResultPolicy) msg.getPart(1).getObject(); + policy = (InterestResultPolicy) clientMessage.getPart(1).getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean isDurable = false; try { - Part durablePart = msg.getPart(2); + Part durablePart = clientMessage.getPart(2); byte[] durablePartBytes = (byte[]) durablePart.getObject(); isDurable = durablePartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // region data policy byte[] regionDataPolicyPartBytes; boolean serializeValues = false; try { - Part regionDataPolicyPart = msg.getPart(msg.getNumberOfParts() - 1); + Part regionDataPolicyPart = clientMessage.getPart(clientMessage.getNumberOfParts() - 1); regionDataPolicyPartBytes = (byte[]) regionDataPolicyPart.getObject(); - if (servConn.getClientVersion().compareTo(Version.GFE_80) >= 0) { + if (serverConnection.getClientVersion().compareTo(Version.GFE_80) >= 0) { // The second byte here is serializeValues serializeValues = regionDataPolicyPartBytes[1] == (byte) 0x01; } } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } partNumber = 3; - Part list = msg.getPart(partNumber); + Part list = clientMessage.getPart(partNumber); try { keys = (List) list.getObject(); numberOfKeys = keys.size(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean sendUpdatesAsInvalidates = false; try { - Part notifyPart = msg.getPart(partNumber + 1); + Part notifyPart = clientMessage.getPart(partNumber + 1); byte[] notifyPartBytes = (byte[]) notifyPart.getObject(); sendUpdatesAsInvalidates = notifyPartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { logger.debug( "{}: Received register interest 66 request ({} bytes) from {} for the following {} keys in region {}: {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), numberOfKeys, + serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), numberOfKeys, regionName, keys); } @@ -165,24 +165,24 @@ public class RegisterInterestList66 extends BaseCommand { LocalizedStrings.RegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_REGISTER_INTEREST_REQUEST_IS_NULL; } String s = errMessage.toLocalizedString(); - logger.warn("{}: {}", servConn.getName(), s); - writeChunkedErrorResponse(msg, MessageType.REGISTER_INTEREST_DATA_ERROR, s, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), s); + writeChunkedErrorResponse(clientMessage, MessageType.REGISTER_INTEREST_DATA_ERROR, s, serverConnection); + serverConnection.setAsTrue(RESPONDED); } // key not null - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { logger.info(LocalizedMessage.create( LocalizedStrings.RegisterInterestList_0_REGION_NAMED_1_WAS_NOT_FOUND_DURING_REGISTER_INTEREST_LIST_REQUEST, - new Object[] {servConn.getName(), regionName})); + new Object[] { serverConnection.getName(), regionName})); // writeChunkedErrorResponse(msg, // MessageType.REGISTER_INTEREST_DATA_ERROR, message); // responded = true; } // else { // region not null try { this.securityService.authorizeRegionRead(regionName); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { RegisterInterestOperationContext registerContext = @@ -191,15 +191,15 @@ public class RegisterInterestList66 extends BaseCommand { } } // Register interest - servConn.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, keys, - servConn.getProxyID(), isDurable, sendUpdatesAsInvalidates, true, + serverConnection.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, keys, + serverConnection.getProxyID(), isDurable, sendUpdatesAsInvalidates, true, regionDataPolicyPartBytes[0], true); } catch (Exception ex) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, ex); + checkForInterrupt(serverConnection, ex); // Otherwise, write an exception message and continue - writeChunkedException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, ex, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -208,37 +208,37 @@ public class RegisterInterestList66 extends BaseCommand { // DistributionStats.getStatTime() - start); // start = DistributionStats.getStatTime(); - boolean isPrimary = servConn.getAcceptor().getCacheClientNotifier() - .getClientProxy(servConn.getProxyID()).isPrimary(); + boolean isPrimary = serverConnection.getAcceptor().getCacheClientNotifier() + .getClientProxy(serverConnection.getProxyID()).isPrimary(); if (!isPrimary) { chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_SECONDARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); chunkedResponseMsg.setLastChunk(true); if (logger.isDebugEnabled()) { logger.debug( "{}: Sending register interest response chunk from secondary for region: {} for key: {} chunk=<{}>", - servConn.getName(), regionName, key, chunkedResponseMsg); + serverConnection.getName(), regionName, key, chunkedResponseMsg); } - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } else { // isPrimary // Send header which describes how many chunks will follow chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_PRIMARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { fillAndSendRegisterInterestResponseChunks(region, keys, InterestType.KEY, serializeValues, - policy, servConn); - servConn.setAsTrue(RESPONDED); + policy, serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // otherwise send the exception back to client - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -248,7 +248,7 @@ public class RegisterInterestList66 extends BaseCommand { // region " + regionName + " key " + key); logger.debug( "{}: Sent register interest response for the following {} keys in region {}: {}", - servConn.getName(), numberOfKeys, regionName, keys); + serverConnection.getName(), numberOfKeys, regionName, keys); } // bserverStats.incLong(writeDestroyResponseTimeId, // DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveAll.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveAll.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveAll.java index 88386a1..52a1df3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveAll.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveAll.java @@ -61,7 +61,7 @@ public class RemoveAll extends BaseCommand { protected RemoveAll() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long startp) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long startp) throws IOException, InterruptedException { long start = startp; // copy this since we need to modify it Part regionNamePart = null, numberOfKeysPart = null, keyPart = null; @@ -73,11 +73,11 @@ public class RemoveAll extends BaseCommand { VersionedObjectList response = null; StringBuffer errMessage = new StringBuffer(); - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - CacheServerStats stats = servConn.getCacheServerStats(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + CacheServerStats stats = serverConnection.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); { long oldStart = start; start = DistributionStats.getStatTime(); @@ -87,7 +87,7 @@ public class RemoveAll extends BaseCommand { try { // Retrieve the data from the message parts // part 0: region name - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); if (regionName == null) { @@ -95,67 +95,66 @@ public class RemoveAll extends BaseCommand { LocalizedStrings.RemoveAll_THE_INPUT_REGION_NAME_FOR_THE_REMOVEALL_REQUEST_IS_NULL .toLocalizedString(); logger.warn(LocalizedMessage.create(LocalizedStrings.TWO_ARG_COLON, - new Object[] {servConn.getName(), txt})); + new Object[] { serverConnection.getName(), txt})); errMessage.append(txt); - writeChunkedErrorResponse(msg, MessageType.PUT_DATA_ERROR, errMessage.toString(), servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedErrorResponse(clientMessage, MessageType.PUT_DATA_ERROR, errMessage.toString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = " was not found during removeAll request"; - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // part 1: eventID - eventPart = msg.getPart(1); + eventPart = clientMessage.getPart(1); ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm()); long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); - EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId); + EventID eventId = new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId); Breadcrumbs.setEventId(eventId); // part 2: flags - int flags = msg.getPart(2).getInt(); + int flags = clientMessage.getPart(2).getInt(); boolean clientIsEmpty = (flags & PutAllOp.FLAG_EMPTY) != 0; boolean clientHasCCEnabled = (flags & PutAllOp.FLAG_CONCURRENCY_CHECKS) != 0; // part 3: callbackArg - Object callbackArg = msg.getPart(3).getObject(); + Object callbackArg = clientMessage.getPart(3).getObject(); // part 4: number of keys - numberOfKeysPart = msg.getPart(4); + numberOfKeysPart = clientMessage.getPart(4); numberOfKeys = numberOfKeysPart.getInt(); if (logger.isDebugEnabled()) { StringBuilder buffer = new StringBuilder(); - buffer.append(servConn.getName()).append(": Received removeAll request from ") - .append(servConn.getSocketString()).append(" for region ").append(regionName) - .append(callbackArg != null ? (" callbackArg " + callbackArg) : "").append(" with ") - .append(numberOfKeys).append(" keys."); + buffer.append(serverConnection.getName()).append(": Received removeAll request from ") + .append(serverConnection.getSocketString()).append(" for region ").append(regionName) + .append(callbackArg != null ? (" callbackArg " + callbackArg) : "").append(" with ") + .append(numberOfKeys).append(" keys."); logger.debug(buffer); } ArrayList<Object> keys = new ArrayList<Object>(numberOfKeys); ArrayList<VersionTag> retryVersions = new ArrayList<VersionTag>(numberOfKeys); for (int i = 0; i < numberOfKeys; i++) { - keyPart = msg.getPart(5 + i); + keyPart = clientMessage.getPart(5 + i); key = keyPart.getStringOrObject(); if (key == null) { String txt = LocalizedStrings.RemoveAll_ONE_OF_THE_INPUT_KEYS_FOR_THE_REMOVEALL_REQUEST_IS_NULL .toLocalizedString(); logger.warn(LocalizedMessage.create(LocalizedStrings.TWO_ARG_COLON, - new Object[] {servConn.getName(), txt})); + new Object[] { serverConnection.getName(), txt})); errMessage.append(txt); - writeChunkedErrorResponse(msg, MessageType.PUT_DATA_ERROR, errMessage.toString(), - servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedErrorResponse(clientMessage, MessageType.PUT_DATA_ERROR, errMessage.toString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - if (msg.isRetry()) { + if (clientMessage.isRetry()) { // Constuct the thread id/sequence id information for this element of the bulk op // The sequence id is constructed from the base sequence id and the offset @@ -181,15 +180,15 @@ public class RemoveAll extends BaseCommand { keys.add(key); } // for - if (msg.getNumberOfParts() == (5 + numberOfKeys + 1)) {// it means optional timeout has been + if (clientMessage.getNumberOfParts() == (5 + numberOfKeys + 1)) {// it means optional timeout has been // added - int timeout = msg.getPart(5 + numberOfKeys).getInt(); - servConn.setRequestSpecificTimeout(timeout); + int timeout = clientMessage.getPart(5 + numberOfKeys).getInt(); + serverConnection.setRequestSpecificTimeout(timeout); } this.securityService.authorizeRegionWrite(regionName); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { authzRequest.createRegionAuthorize(regionName); @@ -200,7 +199,7 @@ public class RemoveAll extends BaseCommand { } } - response = region.basicBridgeRemoveAll(keys, retryVersions, servConn.getProxyID(), eventId, + response = region.basicBridgeRemoveAll(keys, retryVersions, serverConnection.getProxyID(), eventId, callbackArg); if (!region.getConcurrencyChecksEnabled() || clientIsEmpty || !clientHasCCEnabled) { // the client only needs this if versioning is being used and the client @@ -216,33 +215,33 @@ public class RemoveAll extends BaseCommand { if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion) region; if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { - writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.getNetworkHopType()); + writeReplyWithRefreshMetadata(clientMessage, response, serverConnection, pr, pr.getNetworkHopType()); pr.clearNetworkHopData(); replyWithMetaData = true; } } } catch (RegionDestroyedException rde) { - writeChunkedException(msg, rde, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, rde, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } catch (ResourceException re) { - writeChunkedException(msg, re, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, re, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } catch (PutAllPartialResultException pre) { - writeChunkedException(msg, pre, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, pre, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } catch (Exception ce) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, ce); + checkForInterrupt(serverConnection, ce); // If an exception occurs during the op, preserve the connection - writeChunkedException(msg, ce, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, ce, serverConnection); + serverConnection.setAsTrue(RESPONDED); // if (logger.fineEnabled()) { logger.warn(LocalizedMessage.create(LocalizedStrings.Generic_0_UNEXPECTED_EXCEPTION, - servConn.getName()), ce); + serverConnection.getName()), ce); // } return; } finally { @@ -251,20 +250,20 @@ public class RemoveAll extends BaseCommand { stats.incProcessRemoveAllTime(start - oldStart); } if (logger.isDebugEnabled()) { - logger.debug("{}: Sending removeAll response back to {} for region {}{}", servConn.getName(), - servConn.getSocketString(), regionName, (logger.isTraceEnabled() ? ": " + response : "")); + logger.debug("{}: Sending removeAll response back to {} for region {}{}", serverConnection.getName(), + serverConnection.getSocketString(), regionName, (logger.isTraceEnabled() ? ": " + response : "")); } // Increment statistics and write the reply if (!replyWithMetaData) { - writeReply(msg, response, servConn); + writeReply(clientMessage, response, serverConnection); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); stats.incWriteRemoveAllResponseTime(DistributionStats.getStatTime() - start); } @Override - protected void writeReply(Message origMsg, ServerConnection servConn) throws IOException { + protected void writeReply(Message origMsg, ServerConnection serverConnection) throws IOException { throw new UnsupportedOperationException(); } @@ -285,7 +284,7 @@ public class RemoveAll extends BaseCommand { } replyMsg.sendHeader(); if (listSize > 0) { - int chunkSize = 2 * maximumChunkSize; + int chunkSize = 2 * MAXIMUM_CHUNK_SIZE; // Chunker will stream over the list in its toData method VersionedObjectList.Chunker chunk = new VersionedObjectList.Chunker(response, chunkSize, false, false); @@ -317,7 +316,7 @@ public class RemoveAll extends BaseCommand { } @Override - protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection servConn, + protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection, PartitionedRegion pr, byte nwHop) throws IOException { throw new UnsupportedOperationException(); } @@ -345,7 +344,7 @@ public class RemoveAll extends BaseCommand { replyMsg.setLastChunk(false); replyMsg.sendChunk(servConn); - int chunkSize = 2 * maximumChunkSize; // maximumChunkSize + int chunkSize = 2 * MAXIMUM_CHUNK_SIZE; // MAXIMUM_CHUNK_SIZE // Chunker will stream over the list in its toData method VersionedObjectList.Chunker chunk = new VersionedObjectList.Chunker(response, chunkSize, false, false); @@ -371,7 +370,7 @@ public class RemoveAll extends BaseCommand { } pr.getPrStats().incPRMetaDataSentCount(); if (logger.isTraceEnabled()) { - logger.trace("{}: rpl with REFRESH_METADAT tx: {}", servConn.getName(), + logger.trace("{}: rpl with REFRESH_METADATA tx: {}", servConn.getName(), origMsg.getTransactionId()); } } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveUserAuth.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveUserAuth.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveUserAuth.java index 42a5bec..16333ac 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveUserAuth.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveUserAuth.java @@ -33,9 +33,9 @@ public class RemoveUserAuth extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException, InterruptedException { - boolean isSecureMode = msg.isSecureMode(); + boolean isSecureMode = clientMessage.isSecureMode(); if (!isSecureMode) { // need to throw exception @@ -43,29 +43,29 @@ public class RemoveUserAuth extends BaseCommand { } try { - servConn.setAsTrue(REQUIRES_RESPONSE); - Part keepalivePart = msg.getPart(0); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + Part keepalivePart = clientMessage.getPart(0); byte[] keepaliveByte = keepalivePart.getSerializedForm(); boolean keepalive = (keepaliveByte == null || keepaliveByte[0] == 0) ? false : true; - servConn.getSecurityLogWriter().fine("remove user auth keep alive " + keepalive); - servConn.removeUserAuth(msg, keepalive); - writeReply(msg, servConn); + serverConnection.getSecurityLogWriter().fine("remove user auth keep alive " + keepalive); + serverConnection.removeUserAuth(clientMessage, keepalive); + writeReply(clientMessage, serverConnection); } catch (GemFireSecurityException gfse) { - if (servConn.getSecurityLogWriter().warningEnabled()) { - servConn.getSecurityLogWriter().warning(LocalizedStrings.ONE_ARG, - servConn.getName() + ": Security exception: " + gfse.getMessage()); + if (serverConnection.getSecurityLogWriter().warningEnabled()) { + serverConnection.getSecurityLogWriter().warning(LocalizedStrings.ONE_ARG, + serverConnection.getName() + ": Security exception: " + gfse.getMessage()); } - writeException(msg, gfse, false, servConn); + writeException(clientMessage, gfse, false, serverConnection); } catch (Exception ex) { // TODO Auto-generated catch block - if (servConn.getLogWriter().warningEnabled()) { - servConn.getLogWriter().warning( + if (serverConnection.getLogWriter().warningEnabled()) { + serverConnection.getLogWriter().warning( LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1, - new Object[] {servConn.getProxyID(), ""}, ex); + new Object[] { serverConnection.getProxyID(), ""}, ex); } - writeException(msg, ex, false, servConn); + writeException(clientMessage, ex, false, serverConnection); } finally { - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Request.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Request.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Request.java index f7baba4..964b7a4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Request.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Request.java @@ -52,15 +52,15 @@ public class Request extends BaseCommand { Request() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException { Part regionNamePart = null, keyPart = null, valuePart = null; String regionName = null; Object callbackArg = null, key = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - CacheServerStats stats = servConn.getCacheServerStats(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + CacheServerStats stats = serverConnection.getCacheServerStats(); StringId errMessage = null; - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); // requiresResponse = true; { long oldStart = start; @@ -68,18 +68,18 @@ public class Request extends BaseCommand { stats.incReadGetRequestTime(start - oldStart); } // Retrieve the data from the message parts - int parts = msg.getNumberOfParts(); - regionNamePart = msg.getPart(0); - keyPart = msg.getPart(1); + int parts = clientMessage.getNumberOfParts(); + regionNamePart = clientMessage.getPart(0); + keyPart = clientMessage.getPart(1); // valuePart = null; (redundant assignment) if (parts > 2) { - valuePart = msg.getPart(2); + valuePart = clientMessage.getPart(2); try { callbackArg = valuePart.getObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); // responded = true; - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -87,15 +87,15 @@ public class Request extends BaseCommand { try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); // responded = true; - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { logger.debug("{}: Received get request ({} bytes) from {} for region {} key {} txId {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key, - msg.getTransactionId()); + serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), regionName, key, + clientMessage.getTransactionId()); } // Process the get request @@ -109,31 +109,31 @@ public class Request extends BaseCommand { errMessage = LocalizedStrings.Request_THE_INPUT_REGION_NAME_FOR_THE_GET_REQUEST_IS_NULL; } String s = errMessage.toLocalizedString(); - logger.warn("{}: {}", servConn.getName(), s); - writeErrorResponse(msg, MessageType.REQUESTDATAERROR, s, servConn); + logger.warn("{}: {}", serverConnection.getName(), s); + writeErrorResponse(clientMessage, MessageType.REQUESTDATAERROR, s, serverConnection); // responded = true; - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } else { - Region region = servConn.getCache().getRegion(regionName); + Region region = serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = LocalizedStrings.Request__0_WAS_NOT_FOUND_DURING_GET_REQUEST .toLocalizedString(regionName); - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); } else { GetOperationContext getContext = null; try { this.securityService.authorizeRegionRead(regionName, key.toString()); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { getContext = authzRequest.getAuthorize(regionName, key, callbackArg); callbackArg = getContext.getCallbackArg(); } } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -141,10 +141,10 @@ public class Request extends BaseCommand { // the value if it is a byte[]. Object[] valueAndIsObject = new Object[3]; try { - getValueAndIsObject(region, key, callbackArg, servConn, valueAndIsObject); + getValueAndIsObject(region, key, callbackArg, serverConnection, valueAndIsObject); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -154,7 +154,7 @@ public class Request extends BaseCommand { try { - AuthorizeRequestPP postAuthzRequest = servConn.getPostAuthzRequest(); + AuthorizeRequestPP postAuthzRequest = serverConnection.getPostAuthzRequest(); if (postAuthzRequest != null) { getContext = postAuthzRequest.getAuthorize(regionName, key, data, isObject, getContext); byte[] serializedValue = getContext.getSerializedValue(); @@ -166,8 +166,8 @@ public class Request extends BaseCommand { isObject = getContext.isObject(); } } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } { @@ -179,20 +179,20 @@ public class Request extends BaseCommand { if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion) region; if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { - writeResponseWithRefreshMetadata(data, callbackArg, msg, isObject, servConn, pr, + writeResponseWithRefreshMetadata(data, callbackArg, clientMessage, isObject, serverConnection, pr, pr.getNetworkHopType()); pr.clearNetworkHopData(); } else { - writeResponse(data, callbackArg, msg, isObject, servConn); + writeResponse(data, callbackArg, clientMessage, isObject, serverConnection); } } else { - writeResponse(data, callbackArg, msg, isObject, servConn); + writeResponse(data, callbackArg, clientMessage, isObject, serverConnection); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { logger.debug("{}: Wrote get response back to {} for region {} key {} value: {}", - servConn.getName(), servConn.getSocketString(), regionName, key, data); + serverConnection.getName(), serverConnection.getSocketString(), regionName, key, data); } stats.incWriteGetResponseTime(DistributionStats.getStatTime() - start); } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java index 3fd84d6..3753ed6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java @@ -49,57 +49,57 @@ public class RequestEventValue extends BaseCommand { private RequestEventValue() {} - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException { Part eventIDPart = null, valuePart = null; EventID event = null; Object callbackArg = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); StringBuffer errMessage = new StringBuffer(); - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); // Retrieve the data from the message parts - int parts = msg.getNumberOfParts(); - eventIDPart = msg.getPart(0); + int parts = clientMessage.getNumberOfParts(); + eventIDPart = clientMessage.getPart(0); if (eventIDPart == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.RequestEventValue_0_THE_EVENT_ID_FOR_THE_GET_EVENT_VALUE_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage.append(" The event id for the get event value request is null."); - writeErrorResponse(msg, MessageType.REQUESTDATAERROR, errMessage.toString(), servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.REQUESTDATAERROR, errMessage.toString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); } else { try { event = (EventID) eventIDPart.getObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (parts > 1) { - valuePart = msg.getPart(1); + valuePart = clientMessage.getPart(1); try { if (valuePart != null) { callbackArg = valuePart.getObject(); } } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } if (logger.isTraceEnabled()) { - logger.trace("{}: Received get event value request ({} bytes) from {}", servConn.getName(), - msg.getPayloadLength(), servConn.getSocketString()); + logger.trace("{}: Received get event value request ({} bytes) from {}", serverConnection.getName(), + clientMessage.getPayloadLength(), serverConnection.getSocketString()); } - CacheClientNotifier ccn = servConn.getAcceptor().getCacheClientNotifier(); + CacheClientNotifier ccn = serverConnection.getAcceptor().getCacheClientNotifier(); // Get the ha container. HAContainerWrapper haContainer = (HAContainerWrapper) ccn.getHaContainer(); if (haContainer == null) { String reason = " was not found during get event value request"; - writeRegionDestroyedEx(msg, "ha container", reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, "ha container", reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); } else { Object[] valueAndIsObject = new Object[2]; try { @@ -110,8 +110,8 @@ public class RequestEventValue extends BaseCommand { LocalizedStrings.RequestEventValue_UNABLE_TO_FIND_A_CLIENT_UPDATE_MESSAGE_FOR_0, event)); String msgStr = "No value found for " + event + " in " + haContainer.getName(); - writeErrorResponse(msg, MessageType.REQUEST_EVENT_VALUE_ERROR, msgStr, servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.REQUEST_EVENT_VALUE_ERROR, msgStr, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } else { if (logger.isDebugEnabled()) { @@ -130,20 +130,20 @@ public class RequestEventValue extends BaseCommand { valueAndIsObject[1] = Boolean.valueOf(((ClientUpdateMessageImpl) data).valueIsObject()); } } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } Object data = valueAndIsObject[0]; boolean isObject = (Boolean) valueAndIsObject[1]; - writeResponse(data, callbackArg, msg, isObject, servConn); - servConn.setAsTrue(RESPONDED); - ccn.getClientProxy(servConn.getProxyID()).getStatistics().incDeltaFullMessagesSent(); + writeResponse(data, callbackArg, clientMessage, isObject, serverConnection); + serverConnection.setAsTrue(RESPONDED); + ccn.getClientProxy(serverConnection.getProxyID()).getStatistics().incDeltaFullMessagesSent(); if (logger.isDebugEnabled()) { logger.debug("{}: Wrote get event value response back to {} for ha container {}", - servConn.getName(), servConn.getSocketString(), haContainer.getName()); + serverConnection.getName(), serverConnection.getSocketString(), haContainer.getName()); } } }